libUTL++
RingBuffer.h
1 #pragma once
2 
4 
5 UTL_NS_BEGIN;
6 
8 
16 
18 template <typename T, T nullValue = T()>
20 {
21 public:
22  RingBuffer()
23  {
24  _array = nullptr;
25  }
26 
27  RingBuffer(size_t size)
28  {
29  _array = nullptr;
30  set(size);
31  }
32 
33  ~RingBuffer()
34  {
35  reset();
36  }
37 
39  void set(size_t size);
40 
42  void reset();
43 
45  size_t
46  count() const
47  {
48  size_t c = this->countLB();
49  return (c >= 1000000) ? (c - 1000000) : 0;
50  }
51 
57  bool enQ(T value);
58 
64  bool deQ(T& value);
65 
67  void forEach(std::function<void(T)> f) const;
68 
69 private:
70  size_t
71  countLB() const
72  {
73  return _countLB.load(std::memory_order_relaxed);
74  }
75 
76  size_t
77  countUB() const
78  {
79  return _countUB.load(std::memory_order_relaxed);
80  }
81 
82 private:
83  // constant values
84  size_t _mask;
85  size_t _countMax;
86  size_t _size;
87  // concurrently modified values
88  std::atomic<T>* _array;
89  char pad0[UTL_CACHE_LINE_SIZE - sizeof(void*)];
90  std::atomic_size_t _countLB;
91  char pad1[UTL_CACHE_LINE_SIZE - sizeof(size_t)];
92  std::atomic_size_t _countUB;
93  char pad2[UTL_CACHE_LINE_SIZE - sizeof(size_t)];
94  std::atomic_size_t _head;
95  char pad3[UTL_CACHE_LINE_SIZE - sizeof(size_t)];
96  std::atomic_size_t _tail;
97  char pad4[UTL_CACHE_LINE_SIZE - sizeof(size_t)];
98 };
99 
101 
102 #undef new
103 template <typename T, T nullValue>
104 void
106 {
107  ASSERTD(sizeof(T) <= sizeof(void*));
108  ASSERTD(_array == nullptr);
109 
110  // _size is a power of 2
111  _size = 1;
112  while (_size < size)
113  {
114  _size <<= 1;
115  }
116 
117  _mask = _size - 1;
118  _countLB = 1000000;
119  _countUB = 0;
120  _countMax = size;
121  _head = _tail = 0;
122 
123 // init _array
124 #if UTL_HOST_OS == UTL_OS_MINGW
125  size_t sizeBytes = (_size * sizeof(std::atomic<T>)) + (2 * UTL_CACHE_LINE_SIZE);
126  _array = (std::atomic<T>*)malloc(sizeBytes);
127  _array += (UTL_CACHE_LINE_SIZE / sizeof(std::atomic<T>));
128 #else
129  size_t sizeBytes = (_size * sizeof(std::atomic<T>)) + UTL_CACHE_LINE_SIZE;
130  posix_memalign((void**)&_array, UTL_CACHE_LINE_SIZE, sizeBytes);
131 #endif
132  auto lim = _array + _size;
133  for (auto it = _array; it != lim; ++it)
134  {
135  new (it) std::atomic<T>(nullValue);
136  }
137 }
138 #include <libutl/gblnew_macros.h>
139 
141 
142 template <typename T, T nullValue>
143 void
145 {
146 #if UTL_HOST_OS == UTL_OS_MINGW
147  _array -= (UTL_CACHE_LINE_SIZE / sizeof(std::atomic<T>));
148 #endif
149  free(_array);
150  _array = nullptr;
151 }
152 
154 
155 template <typename T, T nullValue>
156 bool
158 {
159  // update count upper-bound, back out if necessary
160  if (this->countUB() >= _countMax) return false;
161  if (_countUB.fetch_add(1, std::memory_order_relaxed) >= _countMax)
162  {
163  _countUB.fetch_sub(1, std::memory_order_relaxed);
164  return false;
165  }
166 
167  // enQ value
168  auto pos = _tail.fetch_add(9, std::memory_order_relaxed) & _mask;
169  auto& val = _array[pos];
170  auto cur = nullValue;
171  while (!val.compare_exchange_weak(cur, value, std::memory_order_relaxed,
172  std::memory_order_relaxed))
173  {
174  cur = nullValue;
175  }
176 
177  // update count lower-bound
178  _countLB.fetch_add(1, std::memory_order_relaxed);
179  return true;
180 }
181 
183 
184 template <typename T, T nullValue>
185 bool
187 {
188  // update count lower-bound, back out if necessary
189  if (this->countLB() <= 1000000) return false;
190  if (_countLB.fetch_sub(1, std::memory_order_relaxed) <= 1000000)
191  {
192  _countLB.fetch_add(1, std::memory_order_relaxed);
193  return false;
194  }
195 
196  // deQ into value
197  auto pos = _head.fetch_add(9, std::memory_order_relaxed) & _mask;
198  auto& val = _array[pos];
199  value = nullValue;
200  while (value == nullValue)
201  {
202  value = val.exchange(nullValue, std::memory_order_relaxed);
203  }
204 
205  // update count upper-bound
206  _countUB.fetch_sub(1, std::memory_order_relaxed);
207  return true;
208 }
209 
211 
212 template <typename T, T nullValue>
213 void
214 RingBuffer<T, nullValue>::forEach(std::function<void(T)> func) const
215 {
216  auto tail = _tail.load(std::memory_order_relaxed);
217  for (auto pos = _head.load(std::memory_order_relaxed); pos != tail; pos += 9)
218  {
219  auto idx = pos & _mask;
220  auto val = _array[idx].load(std::memory_order_relaxed);
221  ASSERTD(val != nullValue);
222  func(val);
223  }
224 }
225 
227 
228 UTL_NS_END;
size_t count() const
Sample a lower-bound on the number of contained items.
Definition: RingBuffer.h:46
Thread-safe fixed-size queue.
Definition: RingBuffer.h:19
#define ASSERTD
Do an assertion in DEBUG mode only.