Wait Queue
Loading...
Searching...
No Matches
wait_queue.hpp
1
152#ifndef WAIT_QUEUE_HPP_INCLUDED
153#define WAIT_QUEUE_HPP_INCLUDED
154
155#include <cassert> // assert
156#include <deque>
157#include <mutex> // std::scoped_lock, std::mutex
158#include <condition_variable>
159#include <stop_token> // std::stop_source, std::stop_token
160#include <optional>
161#include <utility> // std::move, std::move_if_noexcept, std::forward
162#include <type_traits> // for requires clauses and noexcept specs
163// #include <concepts>
164
165namespace chops {
166
167// requirements for wait_queue container
168
169template <typename Ctr, typename T>
170concept supports_push_back = requires (Ctr ctr, T val) {
171 ctr.push_back(val);
172};
173
174template <typename Ctr, typename ... Args>
175concept supports_emplace_back = requires (Ctr ctr, Args&& ... args) {
176 ctr.emplace_back(args ...);
177};
178
179template <typename Ctr>
180concept supports_empty = requires (Ctr ctr) {
181 ctr.empty();
182};
183
184template <typename Ctr>
185concept supports_pop_front = requires (Ctr ctr) {
186 ctr.pop_front();
187};
188
189template <typename Ctr>
190concept supports_size = requires (Ctr ctr) {
191 ctr.size();
192};
193
208template <typename T, typename Container = std::deque<T> >
209 requires std::is_copy_constructible_v<T> || std::is_move_constructible_v<T>
211private:
212 mutable std::mutex m_mut;
213 std::optional<std::stop_source> m_stop_src;
214 std::stop_token m_stop_tok;
215 std::condition_variable_any m_data_cond;
216 Container m_data_queue;
217
218 using lock_guard = std::scoped_lock<std::mutex>;
219
220public:
221
222 using size_type = typename Container::size_type;
223 using value_type = T;
224
225public:
226
242 requires std::is_default_constructible_v<Container>
243 // noexcept(std::is_nothrow_constructible_v<Container>)
244 : m_stop_src(std::stop_source{}), m_stop_tok((*m_stop_src).get_token())
245 {
246 assert(empty());
247 assert(size() == size_type(0));
248 assert(!stop_requested());
249 }
250
260 wait_queue(std::stop_token stop_tok)
261 requires std::is_default_constructible_v<Container>
262 // noexcept(std::is_nothrow_constructible_v<Container, std::stop_token>)
263 : m_stop_tok(stop_tok)
264 {
265 assert(empty());
266 assert(size() == size_type(0));
267 }
268
289 wait_queue(Container&& container)
290 requires std::is_move_constructible_v<Container> ||
291 std::is_copy_constructible_v<Container>
292 // noexcept(std::is_nnthrow_constructible_v<Container, Container&&>)
293 : m_stop_src(std::stop_source{}), m_stop_tok((*m_stop_src).get_token()),
294 m_data_queue(std::move(container))
295 {
296 // not easily assertible until contracts added to C++
297 }
298
311 wait_queue(std::stop_token stop_tok, Container&& container)
312 requires std::is_move_constructible_v<Container> ||
313 std::is_copy_constructible_v<Container>
314 // noexcept(std::is_nothrow_constructible_v<Container, std::stop_token, Container&&>)
315 : m_stop_tok(stop_tok), m_data_queue(std::move(container))
316 {
317 }
318
341 wait_queue(size_type sz)
342 requires std::is_constructible_v<Container, size_type>
343 // noexcept(std::is_nothrow_constructible_v<Container, size_type>)
344 : m_stop_src(std::stop_source{}), m_stop_tok((*m_stop_src).get_token()),
345 m_data_queue(sz)
346 {
347 assert((sz != size_type(0)) || empty());
348 assert((size() == size_type(0)) || (size() == sz));
349 }
350
363 wait_queue(std::stop_token stop_tok, size_type sz)
364 requires std::is_constructible_v<Container, std::stop_token, size_type>
365 // noexcept(std::is_nothrow_constructible_v<Container, std::stop_token, size_type>)
366 : m_stop_tok((*m_stop_src).get_token()), m_data_queue(sz)
367 {
368 assert((sz != size_type(0)) || empty());
369 assert((size() == size_type(0)) || (size() == sz));
370 }
371
372 // disallow copy or move construction of the entire object
373 wait_queue(const wait_queue&) = delete;
374 wait_queue(wait_queue&&) = delete;
375
376 // disallow copy or move assigment of the entire object
377 wait_queue& operator=(const wait_queue&) = delete;
378 wait_queue& operator=(wait_queue&&) = delete;
379
380 // modifying methods
381
396 auto request_stop() noexcept
397 -> bool
398 {
399 if (m_stop_src) {
400 return (*m_stop_src).request_stop();
401 }
402 return false;
403 }
404
419 auto push(const T& val) /* noexcept(std::is_nothrow_copy_constructible_v<T>) */
420 -> bool
422
423 {
424 if (m_stop_tok.stop_requested()) {
425 return false;
426 }
427 lock_guard lk{m_mut};
428 m_data_queue.push_back(val);
429 m_data_cond.notify_one();
430 return true;
431
432 }
433
443 auto push(T&& val) /* noexcept(std::is_nothrow_move_constructible_v<T>) */
444 -> bool
446
447 {
448 if (m_stop_tok.stop_requested()) {
449 return false;
450 }
451 lock_guard lk{m_mut};
452 m_data_queue.push_back(std::move(val));
453 m_data_cond.notify_one();
454 return true;
455
456 }
457
474 template <typename ... Args>
475 auto emplace_push(Args &&... args) /* noexcept(std::is_nothrow_constructible_v<T, Args...>)*/
476 -> bool
477 requires supports_emplace_back<Container, Args...>
478
479 {
480 if (m_stop_tok.stop_requested()) {
481 return false;
482 }
483 lock_guard lk{m_mut};
484 m_data_queue.emplace_back(std::forward<Args>(args)...);
485 m_data_cond.notify_one();
486 return true;
487 }
488
503 [[nodiscard]] auto wait_and_pop() /* noexcept(std::is_nothrow_constructible_v<T>) */
504 -> std::optional<T>
506
507 {
508 std::unique_lock<std::mutex> lk{m_mut};
509 if (!m_data_cond.wait ( lk, m_stop_tok, [this] { return !m_data_queue.empty(); } )) {
510 return std::optional<T> {}; // queue was request to stop, no data available
511 }
512 assert(!m_data_queue.empty());
513#ifndef NDEBUG
514 const auto old_size = m_data_queue.size();
515#endif
516 std::optional<T> val {std::move_if_noexcept(m_data_queue.front())}; // move construct if possible
517 m_data_queue.pop_front();
518 assert(m_data_queue.size() + 1u == old_size);
519 return val;
520
521 }
522
534 [[nodiscard]] auto try_pop() /* noexcept(std::is_nothrow_constructible_v<T>) */
535 -> std::optional<T>
537 {
538 if (m_stop_tok.stop_requested()) {
539 return std::optional<T> {};
540 }
541 lock_guard lk{m_mut};
542 if (m_data_queue.empty()) {
543 return std::optional<T> {};
544 }
545#ifndef NDEBUG
546 const auto old_size = m_data_queue.size();
547#endif
548 std::optional<T> val {std::move_if_noexcept(m_data_queue.front())}; // move construct if possible
549 m_data_queue.pop_front();
550 assert(m_data_queue.size() + 1u == old_size);
551 return val;
552
553 }
554
555 // non-modifying methods
556
582 template <typename F>
583 auto apply(F&& func) const /* noexcept(std::is_nothrow_invocable_v<F&&, const T&>) */
584 -> void
585 requires std::is_invocable_v<F, T>
586
587 {
588 lock_guard lk{m_mut};
589 for (const T& elem : m_data_queue) {
590 func(elem);
591 }
592
593 }
594
601 [[nodiscard]] auto stop_requested() const noexcept
602 -> bool
603
604 {
605 return m_stop_tok.stop_requested();
606 }
607
613 [[nodiscard]] auto empty() const /* noexcept */
614 -> bool
615 requires supports_empty<Container>
616
617 {
618 lock_guard lk{m_mut};
619 return m_data_queue.empty();
620
621 }
622
628 [[nodiscard]] auto size() const /* noexcept */
629 -> size_type
630 requires supports_size<Container>
631
632 {
633 lock_guard lk{m_mut};
634 return m_data_queue.size();
635
636 }
637
638};
639
640} // end namespace
641
642#endif
643
MPMC thread-safe wait queue with shutdown semantics.
Definition wait_queue.hpp:210
wait_queue()
Default construct a wait_queue.
Definition wait_queue.hpp:241
auto push(const T &val) -> bool
Push a value, by copying, to the wait_queue.
Definition wait_queue.hpp:419
auto request_stop() noexcept -> bool
Request the wait_queue to stop processing, unless a std::stop_token was passed in to a constructor.
Definition wait_queue.hpp:396
auto push(T &&val) -> bool
Push a value, either by moving or copying, to the wait_queue.
Definition wait_queue.hpp:443
auto emplace_push(Args &&... args) -> bool
Directly construct an object in the underlying container (using the container's emplace_back method) ...
Definition wait_queue.hpp:475
auto stop_requested() const noexcept -> bool
Definition wait_queue.hpp:601
auto size() const -> size_type
Definition wait_queue.hpp:628
auto apply(F &&func) const -> void
Apply a non-modifying function object to all elements of the queue.
Definition wait_queue.hpp:583
wait_queue(Container &&container)
Construct a wait_queue by moving in an already constructed container.
Definition wait_queue.hpp:289
wait_queue(std::stop_token stop_tok, Container &&container)
Definition wait_queue.hpp:311
auto empty() const -> bool
Definition wait_queue.hpp:613
wait_queue(std::stop_token stop_tok)
Construct a wait_queue with an externally provided std::stop_token.
Definition wait_queue.hpp:260
wait_queue(std::stop_token stop_tok, size_type sz)
Construct a wait_queue with an initial size or capacity along with a std::stop_token.
Definition wait_queue.hpp:363
auto try_pop() -> std::optional< T >
Pop and return a value from the wait_queue if an element is immediately available,...
Definition wait_queue.hpp:534
wait_queue(size_type sz)
Construct a wait_queue with an initial size or capacity.
Definition wait_queue.hpp:341
auto wait_and_pop() -> std::optional< T >
Pop and return a value from the wait_queue, blocking and waiting for a writer thread to push a value ...
Definition wait_queue.hpp:503
Definition wait_queue.hpp:175
Definition wait_queue.hpp:180
Definition wait_queue.hpp:185
Definition wait_queue.hpp:170
Definition wait_queue.hpp:190