Wait Queue
Loading...
Searching...
No Matches
Wait Queue, a Multi-Writer / Multi-Reader (MPMC) Thread-Safe Queue

Overview

This class template allows transferring data between threads with queue semantics (push, pop), using C++ std library general facilities (mutex, condition variable). An internal container is managed within an object of this class template.

Multiple writer and reader threads can access a wait_queue object simultaneously. When a value is pushed on the queue by a writer thread, only one reader thread will be notified to consume the value.

One of the template parameters is the container type, allowing customization for specific use cases (see below for additional details). The default container type is std::deque.

A graceful shutdown can be requested using the request_stop method (modeled on the C++ 20 request_stop from std::stop_source). This allows waiting reader threads to be notified for shutdown. Alternatively a std::stop_token can be passed in to the wait_queue constructor, allowing shutdown from outside of the wait_queue object.

wait_queue uses C++ standard library concurrency facilities (e.g. std::mutex, std::condition_variable_any) in its implementation. It is not a lock-free queue, but it has been designed to be used in memory constrained environments or where deterministic performance is needed.

In particular, wait_queue:

  • Has been tested with Martin Moene's ring_span library for the internal container, as well as Justin Masiulis' circular_buffer library. A "ring buffer" or "circular buffer" uses a fixed size container and implies that the wait_queue can be used in environments where dynamic memory management (heap) is not allowed or is problematic. In particular, no heap memory will be directly allocated within the wait_queue object. A ring_span is a view on a container object instead of directly owning the container, so there are differences in construction and container management.
  • Does not throw or catch exceptions anywhere in its code base. If a value being pushed on to the queue throws an exception, it can be caught by the pushing code (or higher up in the call chain). Exceptions may be thrown by C++ std library concurrency calls (std::mutex locks, etc), as specified by the C++ standard, although this usually indicates an application design issue or issues at the operating system level.
  • If the C++ std library concurrency calls become noexcept (instead of throwing an exception), every wait_queue method will become noexcept or conditionally noexcept (depending on the type of the data passed through the wait_queue).

The only requirement on the type passed through a wait_queue is that it supports either copy construction or move construction. In particular, a default constructor is not required (this is enabled by using std::optional, which does not require a default constructor).

The implementation is adapted from the book Concurrency in Action, Practical Multithreading, by Anthony Williams (2012 edition). The core logic in this library is the same as provided by Anthony in his book, but C++ 20 features have been added, the API is significantly changed and additional features added. The name of the utility class template in Anthony's book is threadsafe_queue.

Additional Details

Each method is fully documented in the class documentation. In particular, function arguments, pre-conditions, and return values are all documented.

Once request_stop has been invoked (either through the wait_queue object or from an external std::stop_source), subsequent pushes will not add any elements to the queue and the push methods will return false.

The push methods return a bool to denote whether a value was succesfully queued or whether a shutdown was requested. The pop methods return a std::optional value. For the wait_and_pop method, if the return value is not present it means a shutdown was requested. For the try_pop method, if the return value is not present it means either the queue was empty at that moment, or that a shutdown was requested.

A std::stop_token can be passed in through the constructors, which allows aa external std::stop_source to request_stop. Alternatively, an internal stop_token will be used, allowing the wait_queue request_stop method to be used to shutdown wait_queue processing.

Once a request_stop is called (either externally or through the wait_queue request_stop) all reader threads calling wait_and_pop are notified, and an empty value returned to those threads. Subsequent calls to push will return a false value.

Example usage, default container:

// inside writer thread, assume wq passed in by reference
wq.push(42);
...
// all finished, time to shutdown
wq.request_stop();
// inside reader thread, assume wq passed in by reference
auto rtn_val = wq.wait_and_pop(); // return type is std::optional<int>
if (!rtn_val) { // empty value, request_stop has been called
// time to exit reader thread
}
if (*rtn_val == 42) ...
MPMC thread-safe wait queue with shutdown semantics.
Definition wait_queue.hpp:210
auto push(const T &val) -> bool
Push a value, by copying, to the wait_queue.
Definition wait_queue.hpp:419
auto wait_and_pop() -> std::optional< T >
Pop and return a value from the wait_queue, blocking and waiting for a writer thread to push a value ...
Definition wait_queue.hpp:503

Example usage with ring buffer (from Martin Moene):

const int sz = 20;
int buf[sz];
wq { nonstd::ring_span<int> { buf+0, buf+sz } };
// push and pop same as code with default container

The container type must support the following (depending on which methods are called): default construction, construction with an initial size, push_back (preferably overloaded for both copy and move semantics), emplace_back (with a template parameter pack), front, pop_front, empty, and size. The container must also have a size_type defined. Type constraints and concepts are defined for the various type requirements.

Iterators on a wait_queue are not supported, due to obvious difficulties with maintaining consistency and integrity. The apply method can be used to access the internal data in a threadsafe manner.

Copy and move construction or assignment for the whole queue is disallowed, since the use cases and underlying implications are not clear for those operations. In particular, the exception implications for assigning the internal data from one queue to another is messy, and the general semantics of what it means is not clearly defined. If there is data in one wait_queue that must be copied or moved to another, the apply method can be used or individual push and pop methods called, even if not as efficient as an internal copy or move.

Note
The boost circular_buffer can be used for the container type. Memory is allocated only once, at container construction time. This may be useful for environments where construction can use dynamic memory but a push or pop must not allocate or deallocate memory. If the container type is boost circular_buffer then the default constructor for wait_queue cannot be used (since it would result in a container with an empty capacity).

Thanks go to Lou Langholtz for adding DBC (Design by Contract) assertions.

Authors
Cliff Green, Lou Langholtz, Anthony Williams

Distributed under the Boost Software License, Version 1.0. (See accompanying file LICENSE.txt or copy at http://www.boost.org/LICENSE_1_0.txt)