CCF
Loading...
Searching...
No Matches
timing.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5// CCF
6#include "ccf/tx_id.h"
8
9// STL/3rdparty
10#include <chrono>
11#include <fstream>
12#include <iomanip>
13#include <thread>
14#include <vector>
15
16#define FMT_HEADER_ONLY
17#include <fmt/format.h>
18
19namespace timing
20{
21 struct Measure
22 {
24 double average;
25 double variance;
26 };
27}
28
29FMT_BEGIN_NAMESPACE
30template <>
31struct formatter<timing::Measure>
32{
33 template <typename ParseContext>
34 constexpr auto parse(ParseContext& ctx)
35 {
36 return ctx.begin();
37 }
38
39 template <typename FormatContext>
40 auto format(const timing::Measure& e, FormatContext& ctx) const
41 {
42 return format_to(
43 ctx.out(),
44 "sample_count: {}, average: {}, variance: {}",
46 e.average,
47 e.variance);
48 }
49};
50FMT_END_NAMESPACE
51
52namespace timing
53{
54 using namespace std;
55 using namespace chrono;
56
57 using Clock = high_resolution_clock;
58 using TimeDelta = duration<double>;
59
61 {
63 const std::string method;
64 const size_t rpc_id;
65 const bool expects_commit;
66 };
67
69 {
71 size_t rpc_id;
72 optional<ccf::TxID> commit;
74 };
75
76 std::string timestamp()
77 {
78 std::stringstream ss;
79
80 const auto now = Clock::now();
81 time_t now_tt = now.time_since_epoch().count();
82 tm now_tm;
83 ::localtime_r(&now_tt, &now_tm);
84
85 ss << "[" << std::put_time(&now_tm, "%T.");
86
87 const auto remainder =
88 duration_cast<microseconds>(now.time_since_epoch()) % seconds(1);
89 ss << std::setfill('0') << std::setw(6) << remainder.count() << "] ";
90
91 return ss.str();
92 }
93
94 // NaNs are ignored (treated as though they are not present)
95 Measure measure(const vector<double>& samples)
96 {
97 vector<double> non_nans;
98 non_nans.reserve(samples.size());
99 for (double d : samples)
100 {
101 if (!isnan(d))
102 non_nans.push_back(d);
103 }
104
105 const double average =
106 accumulate(non_nans.begin(), non_nans.end(), 0.0) / non_nans.size();
107
108 vector<double> sq_diffs(non_nans.size());
109 transform(
110 non_nans.begin(), non_nans.end(), sq_diffs.begin(), [average](double d) {
111 return (d - average) * (d - average);
112 });
113
114 const double variance =
115 accumulate(sq_diffs.begin(), sq_diffs.end(), 0.0) / sq_diffs.size();
116
117 return {non_nans.size(), average, variance};
118 }
119
120 ostream& operator<<(ostream& stream, const Measure& m)
121 {
122 stream << m.sample_count << " samples with average latency " << m.average
123 << "s";
124 const auto prev_precision = stream.precision(3);
125 stream << " (variance " << std::scientific << m.variance
126 << std::defaultfloat << ")";
127 stream.precision(prev_precision);
128 return stream;
129 }
130
152
153 static std::optional<ccf::TxID> extract_transaction_id(
154 const client::RpcTlsClient::Response& response)
155 {
156 const auto& h = response.headers;
157 const auto it = h.find(ccf::http::headers::CCF_TX_ID);
158 if (it == h.end())
159 {
160 return std::nullopt;
161 }
162
163 return ccf::TxID::from_str(it->second);
164 }
165
167 {
168 const shared_ptr<client::RpcTlsClient> net_client;
169 time_point<Clock> start_time;
170
171 vector<SentRequest> sends;
172 vector<ReceivedReply> receives;
173
174 bool active = false;
175
176 public:
177 ResponseTimes(const shared_ptr<client::RpcTlsClient>& client) :
178 net_client(client),
179 start_time(Clock::now())
180 {}
181
182 ResponseTimes(const ResponseTimes& other) = default;
183
185 {
186 active = true;
187 start_time = Clock::now();
188 }
189
191 {
192 return active;
193 }
194
196 {
197 active = false;
198 }
199
200 auto get_start_time() const
201 {
202 return start_time;
203 }
204
206 const std::string& method, size_t rpc_id, bool expects_commit)
207 {
208 sends.push_back(
209 {Clock::now() - start_time, method, rpc_id, expects_commit});
210 }
211
213 size_t rpc_id, const optional<ccf::TxID>& tx_id, size_t global_seqno = 0)
214 {
215 receives.push_back(
216 {Clock::now() - start_time, rpc_id, tx_id, global_seqno});
217 }
218
219 // Repeatedly calls GET /tx RPC until the target seqno has been
220 // committed (or will never be committed), returns first confirming
221 // response. Calls record_[send/response], if record is true.
222 // Throws on errors, or if target is rolled back
223 void wait_for_global_commit(const ccf::TxID& target, bool record = true)
224 {
225 auto params = nlohmann::json::object();
226 params["transaction_id"] = target.to_str();
227
228 constexpr auto get_tx_status = "tx";
229
231 "Waiting for transaction ID {}.{}", target.view, target.seqno);
232
233 while (true)
234 {
235 const auto response = net_client->get(get_tx_status, params);
236
237 if (record)
238 {
239 record_send(get_tx_status, response.id, false);
240 }
241
242 const auto body = net_client->unpack_body(response);
243 if (response.status != HTTP_STATUS_OK)
244 {
245 throw runtime_error(fmt::format(
246 "{} failed with status {}: {}",
247 get_tx_status,
248 http_status_str(response.status),
249 body.dump()));
250 }
251
252 const auto tx_id = extract_transaction_id(response);
253
254 // NB: Eventual header re-org should be exposing API types so
255 // they can be consumed cleanly from C++ clients
256 const std::string tx_status = body["status"];
257 if (tx_status == "Pending" || tx_status == "Unknown")
258 {
259 if (record)
260 {
261 record_receive(response.id, tx_id);
262 }
263
264 // Commit is pending, poll again
265 this_thread::sleep_for(10us);
266 continue;
267 }
268 else if (tx_status == "Committed")
269 {
270 LOG_INFO_FMT("Found global commit {}.{}", target.view, target.seqno);
271 if (tx_id.has_value())
272 {
274 " (headers view: {}, seqno: {})", tx_id->view, tx_id->seqno);
275 }
276
277 if (record)
278 {
279 if (tx_id.has_value())
280 {
281 record_receive(response.id, tx_id, target.seqno);
282 }
283 else
284 {
285 // If this response didn't contain commit IDs in headers, we can
286 // still construct them from the body
288 response.id, {{target.view, target.seqno}}, target.seqno);
289 }
290 }
291 return;
292 }
293 else if (tx_status == "Invalid")
294 {
295 throw std::logic_error(fmt::format(
296 "Transaction {}.{} is now marked as invalid",
297 target.view,
298 target.seqno));
299 }
300 else
301 {
302 throw std::logic_error(
303 fmt::format("Unhandled tx status: {}", tx_status));
304 }
305 }
306 }
307
309 bool allow_pending,
310 size_t highest_local_commit,
311 size_t desired_rounds = 1)
312 {
313 TimeDelta end_time_delta = Clock::now() - start_time;
314
315 const auto rounds = min(max(sends.size(), 1ul), desired_rounds);
316 const auto round_size = sends.size() / rounds;
317
318 // Assume we receive responses in the same order requests were sent, then
319 // duplicate IDs shouldn't cause a problem
320 size_t next_recv = 0u;
321
322 using Latencies = vector<double>;
323
324 Results res;
325 Latencies all_local_commits;
326 Latencies all_global_commits;
327
328 // get test duration for last sent message's global commit
329 for (auto i = next_recv; i < receives.size(); ++i)
330 {
331 auto receive = receives[i];
332
333 if (receive.commit.has_value())
334 {
335 if (receive.global_seqno >= highest_local_commit)
336 {
338 "Global commit match {} for highest local commit {}",
339 receive.global_seqno,
340 highest_local_commit);
341 auto was =
342 duration_cast<milliseconds>(end_time_delta).count() / 1000.0;
343 auto is =
344 duration_cast<milliseconds>(receive.receive_time).count() /
345 1000.0;
346 LOG_INFO_FMT("Duration changing from {}s to {}s", was, is);
347 end_time_delta = receive.receive_time;
348 break;
349 }
350 }
351 }
352
353 for (size_t round = 1; round <= rounds; ++round)
354 {
355 const auto round_begin = sends.begin() + (round_size * (round - 1));
356 const auto round_end =
357 round == rounds ? sends.end() : round_begin + round_size;
358
359 Latencies round_local_commit;
360 Latencies round_global_commit;
361
362 struct PendingGlobalCommit
363 {
364 TimeDelta send_time;
365 size_t target_commit;
366 };
367 vector<PendingGlobalCommit> pending_global_commits;
368
369 auto complete_pending = [&](const ReceivedReply& receive) {
370 if (receive.global_seqno > 0)
371 {
372 auto pending_it = pending_global_commits.begin();
373 while (pending_it != pending_global_commits.end())
374 {
375 if (receive.global_seqno >= pending_it->target_commit)
376 {
377 round_global_commit.push_back(
378 (receive.receive_time - pending_it->send_time).count());
379 ++pending_it;
380 }
381 else
382 {
383 // Assuming the target_commits within pending_global_commits are
384 // monotonic, we can break here. If this receive didn't satisfy
385 // the first pending commit, it can't satisfy any later
386 break;
387 }
388 }
389 if (pending_it != pending_global_commits.begin())
390 {
391 pending_global_commits.erase(
392 pending_global_commits.begin(), pending_it);
393 }
394 }
395 };
396
397 for (auto send_it = round_begin; send_it != round_end; ++send_it)
398 {
399 const auto& send = *send_it;
400
401 double tx_latency;
402 optional<ReceivedReply> matching_reply;
403 for (auto i = next_recv; i < receives.size(); ++i)
404 {
405 const auto& receive = receives[i];
406
407 complete_pending(receive);
408
409 if (receive.rpc_id == send.rpc_id)
410 {
411 tx_latency = (receive.receive_time - send.send_time).count();
412
413 if (tx_latency < 0)
414 {
416 "Calculated a negative latency ({}) for RPC {} - duplicate "
417 "ID causing mismatch?",
418 tx_latency,
419 receive.rpc_id);
420 continue;
421 }
422
423 matching_reply = receive;
424 next_recv = i + 1;
425 break;
426 }
427 }
428
429 if (send.expects_commit)
430 {
431 if (matching_reply.has_value())
432 {
433 // Successful write - measure local tx time AND try to find global
434 // commit time
435 round_local_commit.push_back(tx_latency);
436
437 if (matching_reply->global_seqno >= matching_reply->commit->seqno)
438 {
439 // Global commit already already
440 round_global_commit.push_back(tx_latency);
441 }
442 else
443 {
444 if (matching_reply->commit->seqno <= highest_local_commit)
445 {
446 // Store expected global commit to find later
447 pending_global_commits.push_back(
448 {send.send_time, matching_reply->commit->seqno});
449 }
450 else
451 {
453 "Ignoring request with ID {} because it committed too late "
454 "({} > {})",
455 send.rpc_id,
456 matching_reply->commit->seqno,
457 highest_local_commit);
458 }
459 }
460 }
461 else
462 {
463 // Write failed - measure local tx time
464 round_local_commit.push_back(tx_latency);
465 }
466 }
467 else
468 {
469 // Read-only - measure local tx time
470 round_local_commit.push_back(tx_latency);
471 }
472 }
473
474 // After every tracked send has been processed, consider every remaining
475 // receive to satisfy outstanding pending global commits
476 for (auto i = next_recv; i < receives.size(); ++i)
477 {
478 if (pending_global_commits.empty())
479 {
480 break;
481 }
482
483 complete_pending(receives[i]);
484 }
485
486 all_local_commits.insert(
487 all_local_commits.end(),
488 round_local_commit.begin(),
489 round_local_commit.end());
490 all_global_commits.insert(
491 all_global_commits.end(),
492 round_global_commit.begin(),
493 round_global_commit.end());
494
495 if (rounds > 1)
496 {
497 res.per_round.push_back(
498 {round_begin->rpc_id,
499 (round_end - 1)->rpc_id,
500 measure(round_local_commit),
501 measure(round_global_commit)});
502 }
503
504 if (!allow_pending)
505 {
506 if (!pending_global_commits.empty())
507 {
508 const auto& first = pending_global_commits[0];
509 throw runtime_error(fmt::format(
510 "Still waiting for {} global commits. First expected is {} for "
511 "a transaction sent at {} (NB: Highest local commit is {})",
512 pending_global_commits.size(),
513 first.target_commit,
514 first.send_time.count(),
515 highest_local_commit));
516 }
517 }
518
519 const auto expected_local_samples = distance(round_begin, round_end);
520 const auto actual_local_samples = round_local_commit.size();
521 if (actual_local_samples != expected_local_samples)
522 {
523 throw runtime_error(fmt::format(
524 "Measured {} response times, yet sent {} requests",
525 actual_local_samples,
526 expected_local_samples));
527 }
528 }
529
530 res.total_sends = sends.size();
531 res.total_receives = receives.size();
532 res.start_time = start_time;
533 res.duration = end_time_delta;
534
535 res.total_local_commit = measure(all_local_commits);
536 res.total_global_commit = measure(all_global_commits);
537 return res;
538 }
539
540 void write_to_file(const string& filename)
541 {
542 LOG_INFO_FMT("Writing timing data to files");
543
544 const auto sent_path = filename + "_sent.csv";
545 ofstream sent_csv(sent_path, ofstream::out);
546 if (sent_csv.is_open())
547 {
548 sent_csv << "sent_sec,idx,method,expects_commit" << endl;
549 for (const auto& sent : sends)
550 {
551 sent_csv << sent.send_time.count() << "," << sent.rpc_id << ","
552 << sent.method << "," << sent.expects_commit << endl;
553 }
554 LOG_INFO_FMT("Wrote {} entries to {}", sends.size(), sent_path);
555 }
556
557 const auto recv_path = filename + "_recv.csv";
558 ofstream recv_csv(recv_path, ofstream::out);
559 if (recv_csv.is_open())
560 {
561 recv_csv << "recv_sec,idx,has_commits,commit,view,global_commit"
562 << endl;
563 for (const auto& reply : receives)
564 {
565 recv_csv << reply.receive_time.count();
566 recv_csv << "," << reply.rpc_id;
567 recv_csv << "," << reply.commit.has_value();
568
569 if (reply.commit.has_value())
570 {
571 recv_csv << "," << reply.commit->seqno;
572 recv_csv << "," << reply.commit->view;
573 }
574 else
575 {
576 recv_csv << "," << 0;
577 recv_csv << "," << 0;
578 }
579
580 recv_csv << "," << reply.global_seqno;
581 recv_csv << endl;
582 }
583 LOG_INFO_FMT("Wrote {} entries to {}", receives.size(), recv_path);
584 }
585 }
586 };
587}
Definition timing.h:167
void stop_timing()
Definition timing.h:195
void write_to_file(const string &filename)
Definition timing.h:540
ResponseTimes(const shared_ptr< client::RpcTlsClient > &client)
Definition timing.h:177
void start_timing()
Definition timing.h:184
void record_receive(size_t rpc_id, const optional< ccf::TxID > &tx_id, size_t global_seqno=0)
Definition timing.h:212
bool is_timing_active()
Definition timing.h:190
Results produce_results(bool allow_pending, size_t highest_local_commit, size_t desired_rounds=1)
Definition timing.h:308
void record_send(const std::string &method, size_t rpc_id, bool expects_commit)
Definition timing.h:205
void wait_for_global_commit(const ccf::TxID &target, bool record=true)
Definition timing.h:223
auto get_start_time() const
Definition timing.h:200
ResponseTimes(const ResponseTimes &other)=default
#define LOG_INFO_FMT
Definition logger.h:395
#define LOG_DEBUG_FMT
Definition logger.h:380
#define LOG_FAIL_FMT
Definition logger.h:396
Definition perf_client.h:26
STL namespace.
std::ostream & operator<<(std::ostream &os, ccf::NodeStartupState s)
Definition node_startup_state.h:34
Definition timing.h:20
high_resolution_clock Clock
Definition timing.h:57
Measure measure(const vector< double > &samples)
Definition timing.h:95
std::string timestamp()
Definition timing.h:76
duration< double > TimeDelta
Definition timing.h:58
Definition tx_id.h:44
SeqNo seqno
Definition tx_id.h:46
View view
Definition tx_id.h:45
std::string to_str() const
Definition tx_id.h:48
static std::optional< TxID > from_str(const std::string_view &sv)
Definition tx_id.h:53
Definition rpc_tls_client.h:29
ccf::http::HeaderMap headers
Definition rpc_tls_client.h:32
constexpr auto parse(ParseContext &ctx)
Definition timing.h:34
auto format(const timing::Measure &e, FormatContext &ctx) const
Definition timing.h:40
Definition timing.h:22
double variance
Definition timing.h:25
double average
Definition timing.h:24
size_t sample_count
Definition timing.h:23
Definition timing.h:69
size_t global_seqno
Definition timing.h:73
optional< ccf::TxID > commit
Definition timing.h:72
TimeDelta receive_time
Definition timing.h:70
size_t rpc_id
Definition timing.h:71
Definition timing.h:142
Measure local_commit
Definition timing.h:146
Measure global_commit
Definition timing.h:147
size_t begin_rpc_id
Definition timing.h:143
size_t end_rpc_id
Definition timing.h:144
Definition timing.h:132
size_t total_sends
Definition timing.h:133
Measure total_global_commit
Definition timing.h:139
Clock::time_point start_time
Definition timing.h:135
Measure total_local_commit
Definition timing.h:138
TimeDelta duration
Definition timing.h:136
size_t total_receives
Definition timing.h:134
vector< PerRound > per_round
Definition timing.h:150
Definition timing.h:61
const size_t rpc_id
Definition timing.h:64
const bool expects_commit
Definition timing.h:65
const TimeDelta send_time
Definition timing.h:62
const std::string method
Definition timing.h:63