CCF
Loading...
Searching...
No Matches
fetch.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/ds/nonstd.h"
6#include "ccf/rest_verb.h"
8#include "http/curl.h"
9#include "http/http_builder.h"
10
11#include <charconv>
12#include <curl/curl.h>
13#include <llhttp/llhttp.h>
14#include <memory>
15#include <optional>
16#include <span>
17#include <stdexcept>
18#include <string>
19#include <vector>
20
21#define EXPECT_HTTP_RESPONSE_STATUS( \
22 request, status_code, expected, response_body) \
23 do \
24 { \
25 if (status_code != expected) \
26 { \
27 std::string error_message; \
28 /* Only include response body for 4xx and 5xx status codes */ \
29 if (status_code >= 400 && status_code < 600) \
30 { \
31 std::string response_content; \
32 if (response_body != nullptr) \
33 { \
34 response_content.assign( \
35 response_body->buffer.begin(), response_body->buffer.end()); \
36 } \
37 else \
38 { \
39 response_content = "(no response body)"; \
40 } \
41 error_message = fmt::format( \
42 "Expected {} response from {} {}, instead received {} ({})", \
43 ccf::http_status_str(expected), \
44 request->get_method().c_str(), \
45 request->get_url(), \
46 status_code, \
47 response_content); \
48 } \
49 else \
50 { \
51 error_message = fmt::format( \
52 "Expected {} response from {} {}, instead received {}", \
53 ccf::http_status_str(expected), \
54 request->get_method().c_str(), \
55 request->get_url(), \
56 status_code); \
57 } \
58 throw std::runtime_error(error_message); \
59 } \
60 } while (0)
61
62namespace snapshots
63{
65 {
66 std::string snapshot_name;
67 std::vector<uint8_t> snapshot_data;
68 };
69
71 {
74 size_t total_size;
75 };
76
77 static ContentRangeHeader parse_content_range_header(
78 const ccf::curl::CurlRequest& request)
79 {
80 const auto& headers = request.get_response_headers();
81
82 auto it = headers.find(ccf::http::headers::CONTENT_RANGE);
83 if (it == headers.end())
84 {
85 throw std::runtime_error(
86 "Response is missing expected content-range header");
87 }
88
89 auto [unit, remaining] = ccf::nonstd::split_1(it->second, " ");
90 if (unit != "bytes")
91 {
92 throw std::runtime_error(
93 "Unexpected content-range unit. Only 'bytes' is supported");
94 }
95
96 auto [range, total_size] = ccf::nonstd::split_1(remaining, "/");
97 auto [range_start, range_end] = ccf::nonstd::split_1(range, "-");
98
99 if (range_start.empty() || range_end.empty() || total_size.empty())
100 {
101 throw std::runtime_error(fmt::format(
102 "Unsupported content-range header format. Expected 'bytes "
103 "<begin>-<end>/<total>', received: {}",
104 it->second));
105 }
106
107 ContentRangeHeader parsed_values{};
108
109 {
110 const auto [p, ec] = std::from_chars(
111 range_start.begin(), range_start.end(), parsed_values.range_start);
112 if (ec != std::errc())
113 {
114 throw std::runtime_error(fmt::format(
115 "Could not parse range start ({}) from content-range header: {}",
116 range_start,
117 it->second));
118 }
119 }
120
121 {
122 const auto [p, ec] = std::from_chars(
123 range_end.begin(), range_end.end(), parsed_values.inclusive_range_end);
124 if (ec != std::errc())
125 {
126 throw std::runtime_error(fmt::format(
127 "Could not parse range end ({}) from content-range header: {}",
128 range_end,
129 it->second));
130 }
131 }
132
133 {
134 const auto [p, ec] = std::from_chars(
135 total_size.begin(), total_size.end(), parsed_values.total_size);
136 if (ec != std::errc())
137 {
138 throw std::runtime_error(fmt::format(
139 "Could not parse total size ({}) from content-range header: {}",
140 total_size,
141 it->second));
142 }
143 }
144
145 {
146 // Use content-length to determine whether the sender used an exclusive or
147 // inclusive range end
148 auto length_it = headers.find(ccf::http::headers::CONTENT_LENGTH);
149 if (length_it == headers.end())
150 {
151 throw std::runtime_error(
152 "Response is missing expected content-length header");
153 }
154
155 size_t content_length = 0;
156
157 {
158 const auto& length_s = length_it->second;
159
160 const auto [p, ec] = std::from_chars(
161 length_s.data(), length_s.data() + length_s.size(), content_length);
162
163 if (ec != std::errc())
164 {
165 throw std::runtime_error(fmt::format(
166 "Could not parse length from content-length header: {}",
167 length_it->second));
168 }
169 }
170
171 const auto range_length =
172 parsed_values.inclusive_range_end - parsed_values.range_start;
173 if (range_length == content_length)
174 {
176 "Server sent an exclusive-end content-range header. "
177 "content-length={}, content-range={}. Adjusting this to local "
178 "inclusive-end representation. This should be a temporary "
179 "mismatch, between 6.x and 7.x nodes in a mixed network",
180 length_it->second,
181 it->second);
182 parsed_values.inclusive_range_end -= 1;
183 }
184 else if (range_length + 1 == content_length)
185 {
187 "Server sent an inclusive-end content-range header. "
188 "content-length={}, content-range={}. This is expected for 7.x to "
189 "7.x nodes",
190 length_it->second,
191 it->second);
192 }
193 else
194 {
195 throw std::runtime_error(fmt::format(
196 "content-range ({}, {} bytes) and content-length ({}) headers do not "
197 "agree",
198 it->second,
199 range_length + 1,
200 length_it->second));
201 }
202 }
203
204 return parsed_values;
205 }
206
207 static std::optional<SnapshotResponse> try_fetch_from_peer(
208 const std::string& peer_address,
209 const std::vector<uint8_t>& peer_ca,
210 size_t max_size,
211 std::optional<size_t> since_seqno = std::nullopt)
212 {
213 try
214 {
215 ccf::curl::UniqueCURL curl_easy;
216 curl_easy.set_blob_opt(
217 CURLOPT_CAINFO_BLOB, peer_ca.data(), peer_ca.size());
218
219 auto response_body = std::make_unique<ccf::curl::ResponseBody>(max_size);
220
221 // Get snapshot. This may be redirected multiple times, and we follow
222 // these redirects ourself so we can extract the final URL. Once the
223 // redirects terminate, the final response is likely to be extremely
224 // large so is fetched over multiple requests for a sub-range, returning
225 // PARTIAL_CONTENT each time.
226 std::string snapshot_url;
227 if (since_seqno.has_value())
228 {
229 snapshot_url = fmt::format(
230 "https://{}/node/snapshot?since={}", peer_address, *since_seqno);
231 }
232 else
233 {
234 snapshot_url = fmt::format("https://{}/node/snapshot", peer_address);
235 }
236
237 // Fetch 4MB chunks at a time
238 constexpr size_t range_size = 4L * 1024 * 1024;
239 size_t range_start = 0;
240 size_t range_end = range_size;
241 size_t inclusive_range_end = range_end - 1;
242 bool fetched_all = false;
243
244 auto process_partial_response =
245 [&](const ccf::curl::CurlRequest& request) {
246 auto content_range = parse_content_range_header(request);
247
248 if (content_range.range_start != range_start)
249 {
250 throw std::runtime_error(fmt::format(
251 "Unexpected range response. Requested bytes {}-{}, received "
252 "range starting at {}",
253 range_start,
254 range_end,
255 content_range.range_start));
256 }
257
258 // The server may give us _less_ than we requested (since they know
259 // where the file ends), but should never give us more
260 if (content_range.inclusive_range_end > inclusive_range_end)
261 {
262 throw std::runtime_error(fmt::format(
263 "Unexpected range response. Requested bytes {}-{}, received "
264 "range ending at {}",
265 range_start,
266 inclusive_range_end,
267 content_range.inclusive_range_end));
268 }
269
270 const auto content_range_exclusive_range_end =
271 content_range.inclusive_range_end + 1;
272
273 const auto range_size =
274 content_range_exclusive_range_end - content_range.range_start;
276 "Received {}-byte chunk from {}. Now have {}/{}",
277 range_size,
278 request.get_url(),
279 content_range_exclusive_range_end,
280 content_range.total_size);
281
282 if (content_range_exclusive_range_end == content_range.total_size)
283 {
284 fetched_all = true;
285 }
286 else
287 {
288 // Advance range for next request
289 range_start = range_end;
290 range_end = range_start + range_size;
291 inclusive_range_end = range_end - 1;
292 }
293 };
294
295 const auto max_redirects = 20;
296 for (auto redirect_count = 1; redirect_count <= max_redirects;
297 ++redirect_count)
298 {
300 "Making snapshot discovery request {}/{} to {}",
301 redirect_count,
302 max_redirects,
303 snapshot_url);
304
306 headers.append(
307 ccf::http::headers::RANGE,
308 fmt::format("bytes={}-{}", range_start, inclusive_range_end));
309
310 CURLcode curl_response = CURLE_FAILED_INIT;
311 long status_code = 0;
312 std::unique_ptr<ccf::curl::CurlRequest> request;
314 [&curl_response, &status_code, &request](
315 std::unique_ptr<ccf::curl::CurlRequest>&& request_,
316 CURLcode curl_response_,
317 long status_code_) {
318 curl_response = curl_response_;
319 status_code = status_code_;
320 request = std::move(request_);
321 };
322
324 std::make_unique<ccf::curl::CurlRequest>(
325 std::move(curl_easy),
326 HTTP_GET,
327 snapshot_url,
328 std::move(headers),
329 nullptr, // No request body
330 std::move(response_body),
331 std::move(response_callback)));
332
333 if (curl_response != CURLE_OK)
334 {
335 throw std::runtime_error(fmt::format(
336 "Error fetching snapshot redirect from {}: {} ({})",
337 request->get_url(),
338 curl_easy_strerror(curl_response),
339 status_code));
340 }
341
342 if (status_code == HTTP_STATUS_NOT_FOUND)
343 {
344 LOG_INFO_FMT("Peer has no suitable snapshot");
345 return std::nullopt;
346 }
347
348 if (status_code == HTTP_STATUS_PARTIAL_CONTENT)
349 {
350 process_partial_response(*request);
351
352 response_body = std::move(request->get_response_ptr());
353 curl_easy = std::move(request->get_easy_handle_ptr());
354 break;
355 }
356
358 request,
359 status_code,
360 HTTP_STATUS_PERMANENT_REDIRECT,
361 request->get_response_ptr());
362
363 char* redirect_url = nullptr;
365 request->get_easy_handle(), CURLINFO_REDIRECT_URL, &redirect_url);
366 if (redirect_url == nullptr)
367 {
368 throw std::runtime_error(
369 "Redirect response found, but CURLINFO_REDIRECT_URL returned no "
370 "value");
371 }
372
374 "Snapshot fetch received redirect response with location {}",
375 redirect_url);
376 snapshot_url = redirect_url;
377
378 response_body = std::move(request->get_response_ptr());
379 curl_easy = std::move(request->get_easy_handle_ptr());
380
381 // Ignore any body from redirect responses
382 response_body->buffer.clear();
383 }
384
385 while (!fetched_all)
386 {
388 headers.append(
389 ccf::http::headers::RANGE,
390 fmt::format("bytes={}-{}", range_start, inclusive_range_end));
391
392 std::unique_ptr<ccf::curl::CurlRequest> snapshot_range_request;
393 CURLcode curl_response = CURLE_OK;
394 long snapshot_range_status_code = 0;
395
396 ccf::curl::CurlRequest::ResponseCallback snapshot_response_callback =
397 [&](
398 std::unique_ptr<ccf::curl::CurlRequest>&& request_,
399 CURLcode curl_response_,
400 long status_code_) {
401 snapshot_range_request = std::move(request_);
402 curl_response = curl_response_;
403 snapshot_range_status_code = status_code_;
404 };
405
407 std::make_unique<ccf::curl::CurlRequest>(
408 std::move(curl_easy),
409 HTTP_GET,
410 snapshot_url,
411 std::move(headers),
412 nullptr, // No request body
413 std::move(response_body),
414 snapshot_response_callback));
415 if (curl_response != CURLE_OK)
416 {
417 throw std::runtime_error(fmt::format(
418 "Error fetching snapshot chunk range from {}: {} ({})",
419 snapshot_range_request->get_url(),
420 curl_easy_strerror(curl_response),
421 snapshot_range_status_code));
422 }
424 snapshot_range_request,
425 snapshot_range_status_code,
426 HTTP_STATUS_PARTIAL_CONTENT,
427 snapshot_range_request->get_response_ptr());
428
429 process_partial_response(*snapshot_range_request);
430
431 response_body = std::move(snapshot_range_request->get_response_ptr());
432 curl_easy = std::move(snapshot_range_request->get_easy_handle_ptr());
433 }
434
435 const auto url_components = ccf::nonstd::split(snapshot_url, "/");
436 const std::string snapshot_name(url_components.back());
437
438 return SnapshotResponse{snapshot_name, std::move(response_body->buffer)};
439 }
440 catch (const std::exception& e)
441 {
442 LOG_FAIL_FMT("Error during snapshot fetch: {}", e.what());
443 return std::nullopt;
444 }
445 }
446
447 static std::optional<SnapshotResponse> fetch_from_peer(
448 const std::string& peer_address,
449 const std::vector<uint8_t>& peer_ca,
450 size_t max_attempts,
451 size_t retry_delay_ms,
452 size_t max_size,
453 std::optional<size_t> since_seqno = std::nullopt)
454 {
455 for (size_t attempt = 0; attempt < max_attempts; ++attempt)
456 {
458 "Fetching snapshot from {} since {} (attempt {}/{})",
459 peer_address,
460 since_seqno.has_value() ? std::to_string(*since_seqno) : "any",
461 attempt + 1,
462 max_attempts);
463
464 if (attempt > 0)
465 {
466 std::this_thread::sleep_for(std::chrono::milliseconds(retry_delay_ms));
467 }
468
469 auto response =
470 try_fetch_from_peer(peer_address, peer_ca, max_size, since_seqno);
471 if (response.has_value())
472 {
473 return response;
474 }
475 }
477 "Exceeded maximum snapshot fetch retries ({}), giving up", max_attempts);
478 return std::nullopt;
479 }
480}
Definition curl.h:374
static void synchronous_perform(std::unique_ptr< CurlRequest > &&request)
Definition curl.h:490
const ResponseHeaders::HeaderMap & get_response_headers() const
Definition curl.h:539
std::string get_url() const
Definition curl.h:524
std::function< void(std::unique_ptr< CurlRequest > &&request, CURLcode curl_response_code, long status_code)> ResponseCallback
Definition curl.h:379
Definition curl.h:53
void set_blob_opt(auto option, const uint8_t *data, size_t length)
Definition curl.h:85
Definition curl.h:149
void append(const char *str)
Definition curl.h:165
#define CHECK_CURL_EASY_GETINFO(handle, info, arg)
Definition curl.h:35
#define EXPECT_HTTP_RESPONSE_STATUS(request, status_code, expected, response_body)
Definition fetch.h:21
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
Definition fetch.h:63
Definition fetch.h:71
size_t range_start
Definition fetch.h:72
size_t total_size
Definition fetch.h:74
size_t inclusive_range_end
Definition fetch.h:73
Definition fetch.h:65
std::string snapshot_name
Definition fetch.h:66
std::vector< uint8_t > snapshot_data
Definition fetch.h:67