82 auto it = headers.find(ccf::http::headers::CONTENT_RANGE);
83 if (it == headers.end())
85 throw std::runtime_error(
86 "Response is missing expected content-range header");
89 auto [unit, remaining] = ccf::nonstd::split_1(it->second,
" ");
92 throw std::runtime_error(
93 "Unexpected content-range unit. Only 'bytes' is supported");
96 auto [range, total_size] = ccf::nonstd::split_1(remaining,
"/");
97 auto [range_start, range_end] = ccf::nonstd::split_1(range,
"-");
99 if (range_start.empty() || range_end.empty() || total_size.empty())
101 throw std::runtime_error(fmt::format(
102 "Unsupported content-range header format. Expected 'bytes "
103 "<begin>-<end>/<total>', received: {}",
107 ContentRangeHeader parsed_values{};
110 const auto [p, ec] = std::from_chars(
111 range_start.begin(), range_start.end(), parsed_values.range_start);
112 if (ec != std::errc())
114 throw std::runtime_error(fmt::format(
115 "Could not parse range start ({}) from content-range header: {}",
122 const auto [p, ec] = std::from_chars(
123 range_end.begin(), range_end.end(), parsed_values.inclusive_range_end);
124 if (ec != std::errc())
126 throw std::runtime_error(fmt::format(
127 "Could not parse range end ({}) from content-range header: {}",
134 const auto [p, ec] = std::from_chars(
135 total_size.begin(), total_size.end(), parsed_values.total_size);
136 if (ec != std::errc())
138 throw std::runtime_error(fmt::format(
139 "Could not parse total size ({}) from content-range header: {}",
148 auto length_it = headers.find(ccf::http::headers::CONTENT_LENGTH);
149 if (length_it == headers.end())
151 throw std::runtime_error(
152 "Response is missing expected content-length header");
155 size_t content_length = 0;
158 const auto& length_s = length_it->second;
160 const auto [p, ec] = std::from_chars(
161 length_s.data(), length_s.data() + length_s.size(), content_length);
163 if (ec != std::errc())
165 throw std::runtime_error(fmt::format(
166 "Could not parse length from content-length header: {}",
171 const auto range_length =
172 parsed_values.inclusive_range_end - parsed_values.range_start;
173 if (range_length == content_length)
176 "Server sent an exclusive-end content-range header. "
177 "content-length={}, content-range={}. Adjusting this to local "
178 "inclusive-end representation. This should be a temporary "
179 "mismatch, between 6.x and 7.x nodes in a mixed network",
182 parsed_values.inclusive_range_end -= 1;
184 else if (range_length + 1 == content_length)
187 "Server sent an inclusive-end content-range header. "
188 "content-length={}, content-range={}. This is expected for 7.x to "
195 throw std::runtime_error(fmt::format(
196 "content-range ({}, {} bytes) and content-length ({}) headers do not "
204 return parsed_values;
207 static std::optional<SnapshotResponse> try_fetch_from_peer(
208 const std::string& peer_address,
209 const std::vector<uint8_t>& peer_ca,
211 std::optional<size_t> since_seqno = std::nullopt)
217 CURLOPT_CAINFO_BLOB, peer_ca.data(), peer_ca.size());
219 auto response_body = std::make_unique<ccf::curl::ResponseBody>(max_size);
226 std::string snapshot_url;
227 if (since_seqno.has_value())
229 snapshot_url = fmt::format(
230 "https://{}/node/snapshot?since={}", peer_address, *since_seqno);
234 snapshot_url = fmt::format(
"https://{}/node/snapshot", peer_address);
238 constexpr size_t range_size = 4L * 1024 * 1024;
239 size_t range_start = 0;
240 size_t range_end = range_size;
241 size_t inclusive_range_end = range_end - 1;
242 bool fetched_all =
false;
244 auto process_partial_response =
246 auto content_range = parse_content_range_header(request);
248 if (content_range.range_start != range_start)
250 throw std::runtime_error(fmt::format(
251 "Unexpected range response. Requested bytes {}-{}, received "
252 "range starting at {}",
255 content_range.range_start));
260 if (content_range.inclusive_range_end > inclusive_range_end)
262 throw std::runtime_error(fmt::format(
263 "Unexpected range response. Requested bytes {}-{}, received "
264 "range ending at {}",
267 content_range.inclusive_range_end));
270 const auto content_range_exclusive_range_end =
271 content_range.inclusive_range_end + 1;
273 const auto range_size =
274 content_range_exclusive_range_end - content_range.range_start;
276 "Received {}-byte chunk from {}. Now have {}/{}",
279 content_range_exclusive_range_end,
280 content_range.total_size);
282 if (content_range_exclusive_range_end == content_range.total_size)
289 range_start = range_end;
290 range_end = range_start + range_size;
291 inclusive_range_end = range_end - 1;
295 const auto max_redirects = 20;
296 for (
auto redirect_count = 1; redirect_count <= max_redirects;
300 "Making snapshot discovery request {}/{} to {}",
307 ccf::http::headers::RANGE,
308 fmt::format(
"bytes={}-{}", range_start, inclusive_range_end));
310 CURLcode curl_response = CURLE_FAILED_INIT;
311 long status_code = 0;
312 std::unique_ptr<ccf::curl::CurlRequest> request;
314 [&curl_response, &status_code, &request](
315 std::unique_ptr<ccf::curl::CurlRequest>&& request_,
316 CURLcode curl_response_,
318 curl_response = curl_response_;
319 status_code = status_code_;
320 request = std::move(request_);
324 std::make_unique<ccf::curl::CurlRequest>(
325 std::move(curl_easy),
330 std::move(response_body),
331 std::move(response_callback)));
333 if (curl_response != CURLE_OK)
335 throw std::runtime_error(fmt::format(
336 "Error fetching snapshot redirect from {}: {} ({})",
338 curl_easy_strerror(curl_response),
342 if (status_code == HTTP_STATUS_NOT_FOUND)
348 if (status_code == HTTP_STATUS_PARTIAL_CONTENT)
350 process_partial_response(*request);
352 response_body = std::move(request->get_response_ptr());
353 curl_easy = std::move(request->get_easy_handle_ptr());
360 HTTP_STATUS_PERMANENT_REDIRECT,
361 request->get_response_ptr());
363 char* redirect_url =
nullptr;
365 request->get_easy_handle(), CURLINFO_REDIRECT_URL, &redirect_url);
366 if (redirect_url ==
nullptr)
368 throw std::runtime_error(
369 "Redirect response found, but CURLINFO_REDIRECT_URL returned no "
374 "Snapshot fetch received redirect response with location {}",
376 snapshot_url = redirect_url;
378 response_body = std::move(request->get_response_ptr());
379 curl_easy = std::move(request->get_easy_handle_ptr());
382 response_body->buffer.clear();
389 ccf::http::headers::RANGE,
390 fmt::format(
"bytes={}-{}", range_start, inclusive_range_end));
392 std::unique_ptr<ccf::curl::CurlRequest> snapshot_range_request;
393 CURLcode curl_response = CURLE_OK;
394 long snapshot_range_status_code = 0;
398 std::unique_ptr<ccf::curl::CurlRequest>&& request_,
399 CURLcode curl_response_,
401 snapshot_range_request = std::move(request_);
402 curl_response = curl_response_;
403 snapshot_range_status_code = status_code_;
407 std::make_unique<ccf::curl::CurlRequest>(
408 std::move(curl_easy),
413 std::move(response_body),
414 snapshot_response_callback));
415 if (curl_response != CURLE_OK)
417 throw std::runtime_error(fmt::format(
418 "Error fetching snapshot chunk range from {}: {} ({})",
419 snapshot_range_request->get_url(),
420 curl_easy_strerror(curl_response),
421 snapshot_range_status_code));
424 snapshot_range_request,
425 snapshot_range_status_code,
426 HTTP_STATUS_PARTIAL_CONTENT,
427 snapshot_range_request->get_response_ptr());
429 process_partial_response(*snapshot_range_request);
431 response_body = std::move(snapshot_range_request->get_response_ptr());
432 curl_easy = std::move(snapshot_range_request->get_easy_handle_ptr());
435 const auto url_components = ccf::nonstd::split(snapshot_url,
"/");
436 const std::string snapshot_name(url_components.back());
438 return SnapshotResponse{snapshot_name, std::move(response_body->buffer)};
440 catch (
const std::exception& e)
442 LOG_FAIL_FMT(
"Error during snapshot fetch: {}", e.what());
447 static std::optional<SnapshotResponse> fetch_from_peer(
448 const std::string& peer_address,
449 const std::vector<uint8_t>& peer_ca,
451 size_t retry_delay_ms,
453 std::optional<size_t> since_seqno = std::nullopt)
455 for (
size_t attempt = 0; attempt < max_attempts; ++attempt)
458 "Fetching snapshot from {} since {} (attempt {}/{})",
460 since_seqno.has_value() ? std::to_string(*since_seqno) :
"any",
466 std::this_thread::sleep_for(std::chrono::milliseconds(retry_delay_ms));
470 try_fetch_from_peer(peer_address, peer_ca, max_size, since_seqno);
471 if (response.has_value())
477 "Exceeded maximum snapshot fetch retries ({}), giving up", max_attempts);