CCF
Loading...
Searching...
No Matches
sub_task_queue.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include <atomic>
6#include <deque>
7#include <mutex>
8
9namespace ccf::tasks
10{
11 // Helper type for OrderedTasks, containing a list of sub-tasks to be
12 // performed in-order. Modifiers return bools indicating whether the caller
13 // should schedule processing of this queue *now* (eg, enqueue the owning
14 // runner), based on the current state.
15 template <typename T>
17 {
18 protected:
19 std::mutex pending_mutex;
20 std::deque<T> pending;
21 std::atomic<bool> active;
22 std::atomic<bool> paused;
23
24 public:
25 // Enqueue a new sub-task.
26 //
27 // Returns true iff this call made the queue non-empty and the queue was
28 // not already active. Callers should interpret a true return as "schedule
29 // processing of this queue now" (eg, enqueue the parent runner).
30 bool push(T&& t)
31 {
32 std::lock_guard<std::mutex> lock(pending_mutex);
33 const bool ret = pending.empty() && !active.load();
34 pending.emplace_back(std::forward<T>(t));
35 return ret;
36 }
37
38 using Visitor = std::function<void(T&&)>;
39 bool pop_and_visit(Visitor&& visitor)
40 {
41 decltype(pending) local;
42 {
43 std::lock_guard<std::mutex> lock(pending_mutex);
44 active.store(true);
45
46 std::swap(local, pending);
47 }
48
49 auto it = local.begin();
50 while (!paused.load() && it != local.end())
51 {
52 visitor(std::forward<T>(*it));
53 ++it;
54 }
55
56 {
57 std::lock_guard<std::mutex> lock(pending_mutex);
58 if (it != local.end())
59 {
60 // Paused mid-execution - some actions remain that need to be
61 // spliced back onto the front of the pending pending
62 pending.insert(pending.begin(), it, local.end());
63 }
64
65 active.store(false);
66 return !pending.empty() && !paused.load();
67 }
68 }
69
70 void pause()
71 {
72 std::lock_guard<std::mutex> lock(pending_mutex);
73 paused.store(true);
74 }
75
76 bool unpause()
77 {
78 std::lock_guard<std::mutex> lock(pending_mutex);
79 paused.store(false);
80 return !pending.empty() && !active.load();
81 }
82
84 size_t& num_pending, bool& is_active, bool& is_paused)
85 {
86 std::lock_guard<std::mutex> lock(pending_mutex);
87 num_pending = pending.size();
88 is_active = active.load();
89 is_paused = paused.load();
90 }
91 };
92}
Definition sub_task_queue.h:17
bool pop_and_visit(Visitor &&visitor)
Definition sub_task_queue.h:39
void pause()
Definition sub_task_queue.h:70
std::atomic< bool > paused
Definition sub_task_queue.h:22
bool unpause()
Definition sub_task_queue.h:76
std::atomic< bool > active
Definition sub_task_queue.h:21
void get_queue_summary(size_t &num_pending, bool &is_active, bool &is_paused)
Definition sub_task_queue.h:83
std::function< void(T &&)> Visitor
Definition sub_task_queue.h:38
bool push(T &&t)
Definition sub_task_queue.h:30
std::deque< T > pending
Definition sub_task_queue.h:20
std::mutex pending_mutex
Definition sub_task_queue.h:19
Definition basic_task.h:8