Chops Net IP
Loading...
Searching...
No Matches
msg_handling.hpp
Go to the documentation of this file.
1
40#ifndef MSG_HANDLING_HPP_INCLUDED
41#define MSG_HANDLING_HPP_INCLUDED
42
43#include <string_view>
44#include <cstddef> // std::size_t, std::byte
45#include <cstdint> // std::uint16_t
46#include <vector>
47#include <utility> // std::forward, std::move
48#include <atomic>
49#include <ostream>
50#include <chrono>
51#include <thread>
52#include <future>
53#include <ranges> // std::views::iota
54
55#include <cassert>
56#include <limits>
57
58#include "asio/buffer.hpp"
59#include "asio/ip/udp.hpp" // ip::udp::endpoint
60#include "asio/ip/address.hpp" // make_address
61
62#include "utility/byte_array.hpp"
63
64#include "serialize/extract_append.hpp"
65#include "buffer/shared_buffer.hpp"
66
69
70namespace chops {
71namespace test {
72
73inline std::size_t decode_variable_len_msg_hdr(const std::byte* buf_ptr, std::size_t sz) {
74 assert (sz == 2u);
75 return extract_val<std::endian::big, std::uint16_t>(buf_ptr);
76}
77
78inline chops::mutable_shared_buffer make_body_buf(std::string_view pre,
79 char body_char,
80 std::size_t num_body_chars) {
81 chops::mutable_shared_buffer buf(pre.data(), pre.size());
82 std::string body(num_body_chars, body_char);
83 return buf.append(body.data(), body.size());
84}
85
86inline chops::const_shared_buffer make_variable_len_msg(const chops::mutable_shared_buffer& body) {
87 assert(body.size() < std::numeric_limits<std::uint16_t>::max());
88 std::byte hdr[2];
89 auto sz = append_val<std::endian::big, std::uint16_t>(hdr, static_cast<std::uint16_t>(body.size()));
90 chops::mutable_shared_buffer msg(hdr, 2);
91 return chops::const_shared_buffer(std::move(msg.append(body.data(), body.size())));
92}
93
94inline chops::const_shared_buffer make_cr_lf_text_msg(const chops::mutable_shared_buffer& body) {
95 chops::mutable_shared_buffer msg(body.data(), body.size());
96 auto ba = chops::make_byte_array(0x0D, 0x0A); // CR, LF
97 return chops::const_shared_buffer(std::move(msg.append(ba.data(), ba.size())));
98}
99
100inline chops::const_shared_buffer make_lf_text_msg(const chops::mutable_shared_buffer& body) {
101 chops::mutable_shared_buffer msg(body.data(), body.size());
102 auto ba = chops::make_byte_array(0x0A); // LF
103 return chops::const_shared_buffer(std::move(msg.append(ba.data(), ba.size())));
104}
105
106template <typename F>
107chops::const_shared_buffer make_empty_body_msg(F&& func) {
108 return func( chops::mutable_shared_buffer{ } );
109}
110
111inline auto make_empty_variable_len_msg() { return make_empty_body_msg(make_variable_len_msg); }
112inline auto make_empty_cr_lf_text_msg() { return make_empty_body_msg(make_cr_lf_text_msg); }
113inline auto make_empty_lf_text_msg() { return make_empty_body_msg(make_lf_text_msg); }
114
115using vec_buf = std::vector<chops::const_shared_buffer>;
116
117template <typename F>
118vec_buf make_msg_vec(F&& func, std::string_view pre, char body_char, int num_msgs) {
119 vec_buf vec;
120 for (int i : std::views::iota(0, num_msgs)) {
121 vec.push_back (func(make_body_buf(pre, body_char, i+1)));
122 }
123 return vec;
124}
125
126constexpr std::size_t fixed_size_buf_size = 33u;
127
128inline chops::const_shared_buffer make_fixed_size_buf() {
129 // 33 bytes, mostly consisting of dead beef
130 auto ba = chops::make_byte_array(0xDE, 0xAD, 0xBE, 0xEF, 0xDE, 0xAD, 0xBE, 0xEF,
131 0xDE, 0xAD, 0xBE, 0xEF, 0xDE, 0xAD, 0xBE, 0xEF,
132 0xDE, 0xAD, 0xBE, 0xEF, 0xDE, 0xAD, 0xBE, 0xEF,
133 0xDE, 0xAD, 0xBE, 0xEF, 0xDE, 0xAD, 0xBE, 0xEF,
134 0XCC);
135 assert (ba.size() == fixed_size_buf_size);
136 return chops::const_shared_buffer(ba.data(), ba.size());
137}
138
139inline vec_buf make_fixed_size_msg_vec(int num_msgs) {
140 vec_buf vec;
141 for (int i : std::views::iota(0, num_msgs)) {
142 vec.push_back (make_fixed_size_buf());
143 }
144 return vec;
145}
146
147using test_counter = std::atomic_size_t;
148
149template <typename IOT>
150struct msg_hdlr {
151 using endp_type = typename IOT::endpoint_type;
152 using const_buf = asio::const_buffer;
153
154 bool reply;
155 test_counter& cnt;
156
157 msg_hdlr(bool rep, test_counter& c) : reply(rep), cnt(c) { }
158
159 bool operator()(const_buf buf, chops::net::basic_io_output<IOT> io_out, endp_type endp) {
160 chops::const_shared_buffer sh_buf(buf.data(), buf.size());
161 if (sh_buf.size() > 2) { // not a shutdown message
162 ++cnt;
163 if (reply) {
164 bool r = io_out.send(sh_buf, endp);
165 // assert(r);
166 }
167 return true;
168 }
169 if (reply) {
170 // may not make it back to sender, depending on TCP connection or UDP reliability
171 bool r = io_out.send(sh_buf, endp);
172 // assert(r);
173 }
174 return false;
175 }
176};
177
178using test_prom = std::promise<std::size_t>;
179
180// fixed size msg hdlr does not have end-of-sequence message; instead it
181// pops a future when the expected count is reached
182template <typename IOT>
184 using endp_type = typename IOT::endpoint_type;
185 using const_buf = asio::const_buffer;
186
187 test_prom prom;
188 std::size_t max_cnt;
189 test_counter& cnt;
190
191 fixed_size_msg_hdlr(test_prom p, std::size_t m, test_counter& c) :
192 prom(std::move(p)), max_cnt(m), cnt(c) { }
193
194 bool operator()(const_buf buf, chops::net::basic_io_output<IOT> io_out, endp_type endp) {
195 assert(buf.size() == fixed_size_buf_size);
196 ++cnt;
197 --max_cnt;
198 if (max_cnt == 0u) {
199 prom.set_value(0u);
200 }
201 return true;
202 }
203
204};
205
207private:
208 int m_sleep_time;
209 std::ostream& m_log;
210
211public:
212 poll_output_queue_cond(int sleep_time, std::ostream& log) noexcept :
213 m_sleep_time(sleep_time), m_log(log) { }
214
215 bool operator()(const chops::net::output_queue_stats& stats) const {
216 if (stats.output_queue_size == 0u) {
217 return true;
218 }
219 m_log << "Output queue size: " << stats.output_queue_size << std::endl;
220 std::this_thread::sleep_for(std::chrono::milliseconds(m_sleep_time));
221 return false;
222 }
223
224};
225
226} // end namespace test
227} // end namespace chops
228
229#endif
230
basic_io_output class template, providing send and get_output_queue_stats methods.
The basic_io_output class template provides methods for sending data to an associated network IO hand...
Definition basic_io_output.hpp:57
bool send(const void *buf, std::size_t sz) const
Send a buffer of data through the associated network IO handler.
Definition basic_io_output.hpp:121
Definition msg_handling.hpp:206
Structures containing statistics gathered on internal queues.
output_queue_stats provides information on the internal output queue.
Definition queue_stats.hpp:29
Definition msg_handling.hpp:183
Definition msg_handling.hpp:150