CCF
Loading...
Searching...
No Matches
sub_task_queue.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include <atomic>
6#include <deque>
7#include <mutex>
8
9namespace ccf::tasks
10{
11 // Helper type for OrderedTasks, containing a list of sub-tasks to be
12 // performed in-order. Modifiers return bools indicating whether the caller
13 // is responsible for scheduling a future flush of this queue.
14 template <typename T>
16 {
17 protected:
18 std::mutex pending_mutex;
19 std::deque<T> pending;
20 std::atomic<bool> active;
21 std::atomic<bool> paused;
22
23 public:
24 bool push(T&& t)
25 {
26 std::lock_guard<std::mutex> lock(pending_mutex);
27 const bool ret = pending.empty() && !active.load();
28 pending.emplace_back(std::forward<T>(t));
29 return ret;
30 }
31
32 using Visitor = std::function<void(T&&)>;
33 bool pop_and_visit(Visitor&& visitor)
34 {
35 decltype(pending) local;
36 {
37 std::lock_guard<std::mutex> lock(pending_mutex);
38 active.store(true);
39
40 std::swap(local, pending);
41 }
42
43 auto it = local.begin();
44 while (!paused.load() && it != local.end())
45 {
46 visitor(std::forward<T>(*it));
47 ++it;
48 }
49
50 {
51 std::lock_guard<std::mutex> lock(pending_mutex);
52 if (it != local.end())
53 {
54 // Paused mid-execution - some actions remain that need to be
55 // spliced back onto the front of the pending pending
56 pending.insert(pending.begin(), it, local.end());
57 }
58
59 active.store(false);
60 return !pending.empty() && !paused.load();
61 }
62 }
63
64 void pause()
65 {
66 std::lock_guard<std::mutex> lock(pending_mutex);
67 paused.store(true);
68 }
69
70 bool unpause()
71 {
72 std::lock_guard<std::mutex> lock(pending_mutex);
73 paused.store(false);
74 return !pending.empty() && !active.load();
75 }
76
77 void get_queue_summary(size_t& num_pending, bool& is_active)
78 {
79 std::lock_guard<std::mutex> lock(pending_mutex);
80 num_pending = pending.size();
81 is_active = active.load();
82 }
83 };
84}
Definition sub_task_queue.h:16
bool pop_and_visit(Visitor &&visitor)
Definition sub_task_queue.h:33
void pause()
Definition sub_task_queue.h:64
std::atomic< bool > paused
Definition sub_task_queue.h:21
bool unpause()
Definition sub_task_queue.h:70
std::atomic< bool > active
Definition sub_task_queue.h:20
void get_queue_summary(size_t &num_pending, bool &is_active)
Definition sub_task_queue.h:77
std::function< void(T &&)> Visitor
Definition sub_task_queue.h:32
bool push(T &&t)
Definition sub_task_queue.h:24
std::deque< T > pending
Definition sub_task_queue.h:19
std::mutex pending_mutex
Definition sub_task_queue.h:18
Definition basic_task.h:8