libUTL++
ConcurrentQueue_mutex.h
1 #pragma once
2 
4 
5 #include <libutl/Mutex.h>
6 
8 
9 UTL_NS_BEGIN;
10 
12 
22 
24 template <typename T>
26 {
27 public:
30 
33 
35  void enQ(T value);
36 
38  bool deQ(T& value);
39 
41  void forEach(std::function<void(T)> f) const;
42 
43 private:
44  struct Node
45  {
46  Node()
47  : next(nullptr)
48  {
49  }
50  Node(T val)
51  : value(val)
52  , next(nullptr)
53  {
54  }
55 
56  public:
57  T value;
58  Node* next;
59  };
60 
61 private:
62  char pad0[UTL_CACHE_LINE_SIZE];
63  Mutex _consumerLock;
64  Node* _head;
65  char pad1[UTL_CACHE_LINE_SIZE - sizeof(Node*)];
66  Mutex _producerLock;
67  Node* _tail;
68  char pad2[UTL_CACHE_LINE_SIZE - sizeof(Node*)];
69 };
70 
72 
73 template <typename T>
75 {
76  _head = _tail = new Node();
77 }
78 
80 
81 template <typename T>
83 {
84  while (_head != nullptr)
85  {
86  Node* node = _head;
87  _head = node->next;
88  delete node;
89  }
90 }
91 
93 
94 template <typename T>
95 void
97 {
98  Node* node = new Node(value);
99 
100  // acquire producer lock
101  _producerLock.lock();
102 
103  // tail->node, and tail=node
104  _tail->next = node;
105  _tail = node;
106 
107  // release producer lock
108  _producerLock.unlock();
109 }
110 
112 
113 template <typename T>
114 bool
116 {
117  _consumerLock.lock();
118 
119  // remember head and its next
120  Node* head = _head;
121  Node* next = _head->next;
122 
123  // empty?
124  if (next == nullptr)
125  {
126  _consumerLock.unlock();
127  return false;
128  }
129 
130  // copy value, and advance _head
131  value = next->value;
132  _head = next;
133 
134  // release consumer lock
135  _consumerLock.unlock();
136 
137  // clean up the old head
138  delete head;
139 
140  return true;
141 }
142 
144 
145 template <typename T>
146 void
147 ConcurrentQueue_mutex<T>::forEach(std::function<void(T)> func) const
148 {
149  Node* node = _head;
150  while (true)
151  {
152  node = node->next;
153  if (node == nullptr) break;
154  func(node->value);
155  }
156 }
157 
159 
160 UTL_NS_END;
utl::ConcurrentQueue that uses mutexes for the producer & consumer locks.
MUTual EXclusion device.
Definition: Mutex.h:27