225 auto params = nlohmann::json::object();
226 params[
"transaction_id"] = target.
to_str();
228 constexpr auto get_tx_status =
"tx";
231 "Waiting for transaction ID {}.{}", target.
view, target.
seqno);
235 const auto response = net_client->get(get_tx_status, params);
242 const auto body = net_client->unpack_body(response);
243 if (response.status != HTTP_STATUS_OK)
245 throw runtime_error(fmt::format(
246 "{} failed with status {}: {}",
248 http_status_str(response.status),
252 const auto tx_id = extract_transaction_id(response);
256 const std::string tx_status = body[
"status"];
257 if (tx_status ==
"Pending" || tx_status ==
"Unknown")
265 this_thread::sleep_for(10us);
268 else if (tx_status ==
"Committed")
271 if (tx_id.has_value())
274 " (headers view: {}, seqno: {})", tx_id->view, tx_id->seqno);
279 if (tx_id.has_value())
288 response.id, {{target.view, target.seqno}}, target.
seqno);
293 else if (tx_status ==
"Invalid")
295 throw std::logic_error(fmt::format(
296 "Transaction {}.{} is now marked as invalid",
302 throw std::logic_error(
303 fmt::format(
"Unhandled tx status: {}", tx_status));
310 size_t highest_local_commit,
311 size_t desired_rounds = 1)
313 TimeDelta end_time_delta = Clock::now() - start_time;
315 const auto rounds = min(max(sends.size(), 1ul), desired_rounds);
316 const auto round_size = sends.size() / rounds;
320 size_t next_recv = 0u;
322 using Latencies = vector<double>;
325 Latencies all_local_commits;
326 Latencies all_global_commits;
329 for (
auto i = next_recv; i < receives.size(); ++i)
331 auto receive = receives[i];
333 if (receive.commit.has_value())
335 if (receive.global_seqno >= highest_local_commit)
338 "Global commit match {} for highest local commit {}",
339 receive.global_seqno,
340 highest_local_commit);
342 duration_cast<milliseconds>(end_time_delta).count() / 1000.0;
344 duration_cast<milliseconds>(receive.receive_time).count() /
346 LOG_INFO_FMT(
"Duration changing from {}s to {}s", was, is);
347 end_time_delta = receive.receive_time;
353 for (
size_t round = 1; round <= rounds; ++round)
355 const auto round_begin = sends.begin() + (round_size * (round - 1));
356 const auto round_end =
357 round == rounds ? sends.end() : round_begin + round_size;
359 Latencies round_local_commit;
360 Latencies round_global_commit;
362 struct PendingGlobalCommit
365 size_t target_commit;
367 vector<PendingGlobalCommit> pending_global_commits;
370 if (receive.global_seqno > 0)
372 auto pending_it = pending_global_commits.begin();
373 while (pending_it != pending_global_commits.end())
375 if (receive.global_seqno >= pending_it->target_commit)
377 round_global_commit.push_back(
378 (receive.receive_time - pending_it->send_time).count());
389 if (pending_it != pending_global_commits.begin())
391 pending_global_commits.erase(
392 pending_global_commits.begin(), pending_it);
397 for (
auto send_it = round_begin; send_it != round_end; ++send_it)
399 const auto& send = *send_it;
402 optional<ReceivedReply> matching_reply;
403 for (
auto i = next_recv; i < receives.size(); ++i)
405 const auto& receive = receives[i];
407 complete_pending(receive);
409 if (receive.rpc_id == send.rpc_id)
411 tx_latency = (receive.receive_time - send.send_time).count();
416 "Calculated a negative latency ({}) for RPC {} - duplicate "
417 "ID causing mismatch?",
423 matching_reply = receive;
429 if (send.expects_commit)
431 if (matching_reply.has_value())
435 round_local_commit.push_back(tx_latency);
437 if (matching_reply->global_seqno >= matching_reply->commit->seqno)
440 round_global_commit.push_back(tx_latency);
444 if (matching_reply->commit->seqno <= highest_local_commit)
447 pending_global_commits.push_back(
448 {send.send_time, matching_reply->commit->seqno});
453 "Ignoring request with ID {} because it committed too late "
456 matching_reply->commit->seqno,
457 highest_local_commit);
464 round_local_commit.push_back(tx_latency);
470 round_local_commit.push_back(tx_latency);
476 for (
auto i = next_recv; i < receives.size(); ++i)
478 if (pending_global_commits.empty())
483 complete_pending(receives[i]);
486 all_local_commits.insert(
487 all_local_commits.end(),
488 round_local_commit.begin(),
489 round_local_commit.end());
490 all_global_commits.insert(
491 all_global_commits.end(),
492 round_global_commit.begin(),
493 round_global_commit.end());
498 {round_begin->rpc_id,
499 (round_end - 1)->rpc_id,
501 measure(round_global_commit)});
506 if (!pending_global_commits.empty())
508 const auto& first = pending_global_commits[0];
509 throw runtime_error(fmt::format(
510 "Still waiting for {} global commits. First expected is {} for "
511 "a transaction sent at {} (NB: Highest local commit is {})",
512 pending_global_commits.size(),
514 first.send_time.count(),
515 highest_local_commit));
519 const auto expected_local_samples = distance(round_begin, round_end);
520 const auto actual_local_samples = round_local_commit.size();
521 if (actual_local_samples != expected_local_samples)
523 throw runtime_error(fmt::format(
524 "Measured {} response times, yet sent {} requests",
525 actual_local_samples,
526 expected_local_samples));
544 const auto sent_path = filename +
"_sent.csv";
545 ofstream sent_csv(sent_path, ofstream::out);
546 if (sent_csv.is_open())
548 sent_csv <<
"sent_sec,idx,method,expects_commit" << endl;
549 for (
const auto& sent : sends)
551 sent_csv << sent.send_time.count() <<
"," << sent.rpc_id <<
","
552 << sent.method <<
"," << sent.expects_commit << endl;
554 LOG_INFO_FMT(
"Wrote {} entries to {}", sends.size(), sent_path);
557 const auto recv_path = filename +
"_recv.csv";
558 ofstream recv_csv(recv_path, ofstream::out);
559 if (recv_csv.is_open())
561 recv_csv <<
"recv_sec,idx,has_commits,commit,view,global_commit"
563 for (
const auto& reply : receives)
565 recv_csv << reply.receive_time.count();
566 recv_csv <<
"," << reply.rpc_id;
567 recv_csv <<
"," << reply.commit.has_value();
569 if (reply.commit.has_value())
571 recv_csv <<
"," << reply.commit->seqno;
572 recv_csv <<
"," << reply.commit->view;
576 recv_csv <<
"," << 0;
577 recv_csv <<
"," << 0;
580 recv_csv <<
"," << reply.global_seqno;
583 LOG_INFO_FMT(
"Wrote {} entries to {}", receives.size(), recv_path);