152#ifndef WAIT_QUEUE_HPP_INCLUDED
153#define WAIT_QUEUE_HPP_INCLUDED
158#include <condition_variable>
162#include <type_traits>
169template <
typename Ctr,
typename T>
174template <
typename Ctr,
typename ... Args>
176 ctr.emplace_back(args ...);
179template <
typename Ctr>
184template <
typename Ctr>
189template <
typename Ctr>
208template <
typename T,
typename Container = std::deque<T> >
209 requires std::is_copy_constructible_v<T> || std::is_move_constructible_v<T>
212 mutable std::mutex m_mut;
213 std::optional<std::stop_source> m_stop_src;
214 std::stop_token m_stop_tok;
215 std::condition_variable_any m_data_cond;
216 Container m_data_queue;
218 using lock_guard = std::scoped_lock<std::mutex>;
222 using size_type =
typename Container::size_type;
223 using value_type = T;
242 requires std::is_default_constructible_v<Container>
244 : m_stop_src(std::stop_source{}), m_stop_tok((*m_stop_src).get_token())
247 assert(
size() == size_type(0));
261 requires std::is_default_constructible_v<Container>
263 : m_stop_tok(stop_tok)
266 assert(
size() == size_type(0));
290 requires std::is_move_constructible_v<Container> ||
291 std::is_copy_constructible_v<Container>
293 : m_stop_src(std::stop_source{}), m_stop_tok((*m_stop_src).get_token()),
294 m_data_queue(std::move(container))
312 requires std::is_move_constructible_v<Container> ||
313 std::is_copy_constructible_v<Container>
315 : m_stop_tok(stop_tok), m_data_queue(std::move(container))
342 requires std::is_constructible_v<Container, size_type>
344 : m_stop_src(std::stop_source{}), m_stop_tok((*m_stop_src).get_token()),
347 assert((sz != size_type(0)) ||
empty());
348 assert((
size() == size_type(0)) || (
size() == sz));
364 requires std::is_constructible_v<Container, std::stop_token, size_type>
366 : m_stop_tok((*m_stop_src).get_token()), m_data_queue(sz)
368 assert((sz != size_type(0)) ||
empty());
369 assert((
size() == size_type(0)) || (
size() == sz));
400 return (*m_stop_src).request_stop();
424 if (m_stop_tok.stop_requested()) {
427 lock_guard lk{m_mut};
428 m_data_queue.push_back(val);
429 m_data_cond.notify_one();
448 if (m_stop_tok.stop_requested()) {
451 lock_guard lk{m_mut};
452 m_data_queue.push_back(std::move(val));
453 m_data_cond.notify_one();
474 template <
typename ... Args>
480 if (m_stop_tok.stop_requested()) {
483 lock_guard lk{m_mut};
484 m_data_queue.emplace_back(std::forward<Args>(args)...);
485 m_data_cond.notify_one();
508 std::unique_lock<std::mutex> lk{m_mut};
509 if (!m_data_cond.wait ( lk, m_stop_tok, [
this] { return !m_data_queue.empty(); } )) {
510 return std::optional<T> {};
512 assert(!m_data_queue.empty());
514 const auto old_size = m_data_queue.size();
516 std::optional<T> val {std::move_if_noexcept(m_data_queue.front())};
517 m_data_queue.pop_front();
518 assert(m_data_queue.size() + 1u == old_size);
538 if (m_stop_tok.stop_requested()) {
539 return std::optional<T> {};
541 lock_guard lk{m_mut};
542 if (m_data_queue.empty()) {
543 return std::optional<T> {};
546 const auto old_size = m_data_queue.size();
548 std::optional<T> val {std::move_if_noexcept(m_data_queue.front())};
549 m_data_queue.pop_front();
550 assert(m_data_queue.size() + 1u == old_size);
582 template <
typename F>
585 requires std::is_invocable_v<F, T>
588 lock_guard lk{m_mut};
589 for (
const T& elem : m_data_queue) {
605 return m_stop_tok.stop_requested();
618 lock_guard lk{m_mut};
619 return m_data_queue.empty();
628 [[nodiscard]]
auto size() const
633 lock_guard lk{m_mut};
634 return m_data_queue.size();
MPMC thread-safe wait queue with shutdown semantics.
Definition wait_queue.hpp:210
wait_queue()
Default construct a wait_queue.
Definition wait_queue.hpp:241
auto push(const T &val) -> bool
Push a value, by copying, to the wait_queue.
Definition wait_queue.hpp:419
auto request_stop() noexcept -> bool
Request the wait_queue to stop processing, unless a std::stop_token was passed in to a constructor.
Definition wait_queue.hpp:396
auto push(T &&val) -> bool
Push a value, either by moving or copying, to the wait_queue.
Definition wait_queue.hpp:443
auto emplace_push(Args &&... args) -> bool
Directly construct an object in the underlying container (using the container's emplace_back method) ...
Definition wait_queue.hpp:475
auto stop_requested() const noexcept -> bool
Definition wait_queue.hpp:601
auto size() const -> size_type
Definition wait_queue.hpp:628
auto apply(F &&func) const -> void
Apply a non-modifying function object to all elements of the queue.
Definition wait_queue.hpp:583
wait_queue(Container &&container)
Construct a wait_queue by moving in an already constructed container.
Definition wait_queue.hpp:289
wait_queue(std::stop_token stop_tok, Container &&container)
Definition wait_queue.hpp:311
auto empty() const -> bool
Definition wait_queue.hpp:613
wait_queue(std::stop_token stop_tok)
Construct a wait_queue with an externally provided std::stop_token.
Definition wait_queue.hpp:260
wait_queue(std::stop_token stop_tok, size_type sz)
Construct a wait_queue with an initial size or capacity along with a std::stop_token.
Definition wait_queue.hpp:363
auto try_pop() -> std::optional< T >
Pop and return a value from the wait_queue if an element is immediately available,...
Definition wait_queue.hpp:534
wait_queue(size_type sz)
Construct a wait_queue with an initial size or capacity.
Definition wait_queue.hpp:341
auto wait_and_pop() -> std::optional< T >
Pop and return a value from the wait_queue, blocking and waiting for a writer thread to push a value ...
Definition wait_queue.hpp:503
Definition wait_queue.hpp:175
Definition wait_queue.hpp:180
Definition wait_queue.hpp:185
Definition wait_queue.hpp:170
Definition wait_queue.hpp:190