libUTL++
ConcurrentQueue.h
1 #pragma once
2 
4 
5 UTL_NS_BEGIN;
6 
8 
16 
18 template <typename T>
20 {
21 public:
24 
26  ~ConcurrentQueue();
27 
29  void enQ(T value);
30 
32  bool deQ(T& value);
33 
35  void forEach(std::function<void(T)> f) const;
36 
37 private:
38  struct Node
39  {
40  Node()
41  : next(nullptr)
42  {
43  }
44  Node(T val)
45  : value(val)
46  , next(nullptr)
47  {
48  }
49 
50  public:
51  T value;
52  Node* next;
53  };
54 
55 private:
56  char pad0[UTL_CACHE_LINE_SIZE];
57  Node* _head;
58  char pad1[UTL_CACHE_LINE_SIZE - sizeof(Node*)];
59  std::atomic_bool _consumerLock;
60  char pad2[UTL_CACHE_LINE_SIZE - sizeof(bool)];
61  Node* _tail;
62  char pad3[UTL_CACHE_LINE_SIZE - sizeof(Node*)];
63  std::atomic_bool _producerLock;
64  char pad4[UTL_CACHE_LINE_SIZE - sizeof(bool)];
65 };
66 
68 
69 template <typename T>
71  : _consumerLock(false)
72  , _producerLock(false)
73 {
74  _head = _tail = new Node();
75 }
76 
78 
79 template <typename T>
81 {
82  while (_head != nullptr)
83  {
84  Node* node = _head;
85  _head = node->next;
86  delete node;
87  }
88 }
89 
91 
92 template <typename T>
93 void
95 {
96  Node* node = new Node(value);
97 
98  // acquire producer lock
99  bool cur = false;
100  while (!_producerLock.compare_exchange_weak(cur, true, std::memory_order_acquire,
101  std::memory_order_relaxed))
102  {
103  cur = false;
104  }
105 
106  // link from tail, become new tail
107  _tail->next = node;
108  _tail = node;
109 
110  // release producer lock
111  _producerLock.store(false, std::memory_order_release);
112 }
113 
115 
116 template <typename T>
117 bool
119 {
120  // acquire consumer lock
121  bool cur = false;
122  while (!_consumerLock.compare_exchange_weak(cur, true, std::memory_order_acquire,
123  std::memory_order_relaxed))
124  {
125  cur = false;
126  }
127 
128  // remember head and its next
129  Node* head = _head;
130  Node* next = _head->next;
131 
132  // empty?
133  if (next == nullptr)
134  {
135  _consumerLock.store(false, std::memory_order_relaxed);
136  return false;
137  }
138 
139  // copy value, and advance _head
140  value = next->value;
141  _head = next;
142 
143  // release consumer lock
144  _consumerLock.store(false, std::memory_order_release);
145 
146  // clean up the old head
147  delete head;
148 
149  return true;
150 }
151 
153 
154 template <typename T>
155 void
156 ConcurrentQueue<T>::forEach(std::function<void(T)> func) const
157 {
158  Node* node = _head;
159  while (true)
160  {
161  node = node->next;
162  if (node == nullptr) break;
163  func(node->value);
164  }
165 }
166 
168 
169 UTL_NS_END;
void forEach(std::function< void(T)> f) const
Execute the given function on each contained item (not thread-safe!).
~ConcurrentQueue()
Destructor.
Thread-safe queue structure.
bool deQ(T &value)
Dequeue an object.
void enQ(T value)
Queue an object.