Chops Net IP
Loading...
Searching...
No Matches
udp_entity_io.hpp
Go to the documentation of this file.
1
18#ifndef UDP_ENTITY_IO_HPP_INCLUDED
19#define UDP_ENTITY_IO_HPP_INCLUDED
20
21#include "asio/io_context.hpp"
22#include "asio/post.hpp"
23#include "asio/ip/udp.hpp"
24#include "asio/buffer.hpp"
25
26#include <memory> // std::shared_ptr, std::enable_shared_from_this
27#include <system_error>
28
29#include <cstddef> // std::size_t
30#include <utility> // std::forward, std::move
31#include <functional> // std::function
32#include <future>
33
36
39
42
43#include "buffer/shared_buffer.hpp"
44
46// #include <iostream>
47
48namespace chops {
49namespace net {
50namespace detail {
51
53 const_shared_buffer m_buf;
54 asio::ip::udp::endpoint m_endp;
55
56 udp_queue_element (const const_shared_buffer& buf,
57 const asio::ip::udp::endpoint& endp) noexcept :
58 m_buf(buf), m_endp(endp) { }
59
60 std::size_t size() const noexcept {
61 return m_buf.size();
62 }
63
64};
65
66class udp_entity_io : public std::enable_shared_from_this<udp_entity_io> {
67public:
68 using endpoint_type = asio::ip::udp::endpoint;
69
70private:
71 using byte_vec = chops::mutable_shared_buffer::byte_vec;
72
73private:
74
77 asio::io_context& m_ioc;
78 asio::ip::udp::socket m_socket;
79 endpoint_type m_local_endp;
80 endpoint_type m_default_dest_endp;
81 std::string m_local_port_or_service;
82 std::string m_local_intf;
83 bool m_shutting_down;
84
85 // TODO: multicast stuff
86
87 // following members could be passed through handler, but are members for
88 // simplicity and less copying
89 byte_vec m_byte_vec;
90 endpoint_type m_sender_endp;
91
92public:
93
94 udp_entity_io(asio::io_context& ioc,
95 const endpoint_type& local_endp) noexcept :
96 m_io_common(), m_entity_common(), m_ioc(ioc),
97 m_socket(ioc), m_local_endp(local_endp), m_default_dest_endp(),
98 m_local_port_or_service(), m_local_intf(),
99 m_shutting_down(false),
100 m_byte_vec(), m_sender_endp()
101 { }
102
103 udp_entity_io(asio::io_context& ioc,
104 std::string_view local_port_or_service, std::string_view local_intf) noexcept :
105 m_io_common(), m_entity_common(), m_ioc(ioc),
106 m_socket(ioc), m_local_endp(), m_default_dest_endp(),
107 m_local_port_or_service(local_port_or_service), m_local_intf(local_intf),
108 m_shutting_down(false),
109 m_byte_vec(), m_sender_endp()
110 { }
111
112private:
113 // no copy or assignment semantics for this class
114 udp_entity_io(const udp_entity_io&) = delete;
115 udp_entity_io(udp_entity_io&&) = delete;
116 udp_entity_io& operator=(const udp_entity_io&) = delete;
117 udp_entity_io& operator=(udp_entity_io&&) = delete;
118
119public:
120
121 // all of the methods in this public section can be called through either an io_interface
122 // or a net_entity
123
124 bool is_started() const noexcept { return m_entity_common.is_started(); }
125
126 bool is_io_started() const noexcept { return m_io_common.is_io_started(); }
127
128 template <typename F>
129 void visit_socket(F&& f) {
130 f(m_socket);
131 }
132
133 template <typename F>
134 std::size_t visit_io_output(F&& func) {
135 auto self = shared_from_this();
136 std::promise<std::size_t> prom;
137 auto fut = prom.get_future();
138 // send to executor for concurrency protection
139 asio::post(m_socket.get_executor(), [this, self, &func, p = std::move(prom)] () mutable {
140 if (m_io_common.is_io_started()) {
141 func(basic_io_output<udp_entity_io>(weak_from_this()));
142 p.set_value(1u);
143 }
144 else {
145 p.set_value(0u);
146 }
147 }
148 );
149 return fut.get();
150 }
151
152 output_queue_stats get_output_queue_stats() const noexcept {
153 return m_io_common.get_output_queue_stats();
154 }
155
156 template <typename F1, typename F2>
157 std::error_code start(F1&& io_state_chg, F2&& err_cb) {
158 auto self = shared_from_this();
159 return m_entity_common.start(std::forward<F1>(io_state_chg), std::forward<F2>(err_cb),
160 m_socket.get_executor(),
161 [this, self] () { return do_start(); } );
162 }
163
164 template <typename MH>
165 bool start_io(std::size_t max_size, MH&& msg_handler) {
166 if (!m_io_common.set_io_started()) { // concurrency protected
167 return false;
168 }
169 if (m_local_endp == endpoint_type()) { // mismatch between start_io and initialized UDP entity
170 return false;
171 }
172 m_byte_vec.resize(max_size);
173// std::cerr << "Inside start_io AAA, ready to start read, buf resized to: " << max_size <<
174// ", local endp: " << m_local_endp << ", default dest endp: " << m_default_dest_endp << std::endl;
175 start_read(std::forward<MH>(msg_handler));
176 return true;
177 }
178
179 template <typename MH>
180 bool start_io(const endpoint_type& endp, std::size_t max_size, MH&& msg_handler) {
181 if (!m_io_common.set_io_started()) { // concurrency protected
182 return false;
183 }
184 if (m_local_endp == endpoint_type()) {
185 return false;
186 }
187 m_default_dest_endp = endp;
188 m_byte_vec.resize(max_size);
189// std::cerr << "Inside start_io BBB, ready to start read, buf resized to: " << max_size <<
190// ", local endp: " << m_local_endp << ", default dest endp: " << m_default_dest_endp << std::endl;
191 start_read(std::forward<MH>(msg_handler));
192 return true;
193 }
194
195 bool start_io() {
196 if (!m_io_common.set_io_started()) { // concurrency protected
197 return false;
198 }
199// std::cerr << "Inside start_io no read CCC" <<
200// ", local endp: " << m_local_endp << ", default dest endp: " << m_default_dest_endp << std::endl;
201 return true;
202 }
203
204 bool start_io(const endpoint_type& endp) {
205 if (!m_io_common.set_io_started()) { // concurrency protected
206 return false;
207 }
208 m_default_dest_endp = endp;
209// std::cerr << "Inside start_io no read DDD" <<
210// ", local endp: " << m_local_endp << ", default dest endp: " << m_default_dest_endp << std::endl;
211 return true;
212 }
213
214 bool stop_io() {
215 // handle start_io never called - close the open socket, etc
216 bool ret = !m_io_common.is_io_started();
217 close(std::make_error_code(net_ip_errc::udp_io_handler_stopped));
218 return ret;
219 }
220
221 std::error_code stop() {
222 auto self = shared_from_this();
223 return m_entity_common.stop(m_socket.get_executor(),
224 [this, self] () {
225 close(std::make_error_code(net_ip_errc::udp_entity_stopped));
226 return std::error_code();
227 }
228 );
229 }
230
231 // io_common has concurrency protection
232 bool send(const chops::const_shared_buffer& buf) {
233 return send(buf, m_default_dest_endp);
234 }
235
236 bool send(const chops::const_shared_buffer& buf, const endpoint_type& endp) {
237 if (endp == endpoint_type()) { // mismatch between start_io and send
238 return false;
239 }
240 auto ret = m_io_common.start_write(udp_queue_element(buf, endp),
241 [this] (const udp_queue_element& e) {
242 start_write(e);
243 }
244 );
245 return ret != io_common<udp_queue_element>::write_status::io_stopped;
246 }
247
248private:
249
250 template <typename MH>
251 void start_read(MH&& msg_hdlr) {
252 auto self { shared_from_this() };
253 m_socket.async_receive_from(
254 asio::mutable_buffer(m_byte_vec.data(), m_byte_vec.size()),
255 m_sender_endp,
256 [this, self, msg_hdlr = std::move(msg_hdlr)]
257 (const std::error_code& err, std::size_t nb) mutable {
258 handle_read(err, nb, std::move(msg_hdlr));
259 }
260 );
261 }
262
263 template <typename MH>
264 void handle_read(const std::error_code&, std::size_t, MH&&);
265
266 void start_write(const udp_queue_element&);
267
268 void handle_write(const std::error_code&, std::size_t);
269
270private:
271
272 std::error_code do_start() {
273 if (!m_local_port_or_service.empty()) {
274 endpoints_resolver<asio::ip::udp> resolver(m_ioc);
275 auto ret = resolver.make_endpoints(true, m_local_intf, m_local_port_or_service);
276 if (!ret) {
277 close(ret.error());
278 return ret.error();
279 }
280 m_local_endp = ret->cbegin()->endpoint();
281 m_local_port_or_service.clear();
282 m_local_port_or_service.shrink_to_fit();
283 m_local_intf.clear();
284 m_local_intf.shrink_to_fit();
285 }
286 std::error_code ec;
287 m_socket.open(m_local_endp.protocol(), ec);
288 if (ec) {
289 close(ec);
290 return ec;
291 }
292 if (m_local_endp != endpoint_type()) { // local bind needed
293 m_socket.bind(m_local_endp, ec);
294 if (ec) {
295 close(ec);
296 return ec;
297 }
298 }
299 m_entity_common.call_io_state_chg_cb(shared_from_this(), 1, true);
300 return { };
301 }
302
303 void close(const std::error_code& err) {
304 auto self { shared_from_this() };
305 m_entity_common.call_error_cb(self, err);
306 if (m_shutting_down) { // already been through close once
307 return;
308 }
309 m_shutting_down = true;
310 m_io_common.set_io_stopped();
311 m_entity_common.set_stopped();
312 m_io_common.clear();
313 std::error_code ec;
314 m_socket.close(ec);
315 m_entity_common.call_error_cb(self, std::make_error_code(net_ip_errc::udp_entity_closed));
316 m_entity_common.call_io_state_chg_cb(self, 0, false);
317 }
318
319};
320
321// method implementations, split out just to make the class declaration a little more readable
322
323template <typename MH>
324void udp_entity_io::handle_read(const std::error_code& err,
325 std::size_t num_bytes, MH&& msg_hdlr) {
326
327 if (err) {
328 close(err);
329 return;
330 }
331 if (!msg_hdlr(asio::const_buffer(m_byte_vec.data(), num_bytes),
332 basic_io_output<udp_entity_io>(weak_from_this()), m_sender_endp)) {
333 // message handler not happy, tear everything down
334 close(std::make_error_code(net_ip_errc::message_handler_terminated));
335 return;
336 }
337 start_read(std::forward<MH>(msg_hdlr));
338}
339
340inline void udp_entity_io::start_write(const udp_queue_element& e) {
341 auto self { shared_from_this() };
342// if (e.m_endp == asio::ip::udp::endpoint()) {
343// std::cerr << "Ack! Empty endpoint in UDP write" << std::endl;
344// }
345 m_socket.async_send_to(asio::const_buffer(e.m_buf.data(), e.m_buf.size()), e.m_endp,
346 [this, self] (const std::error_code& err, std::size_t nb) {
347 handle_write(err, nb);
348 }
349 );
350}
351
352inline void udp_entity_io::handle_write(const std::error_code& err, std::size_t /* num_bytes */) {
353 if (err) {
354 close(err);
355 return;
356 }
357 m_io_common.write_next_elem([this] (const udp_queue_element& e) {
358 start_write(e);
359 }
360 );
361}
362
363using udp_entity_io_shared_ptr = std::shared_ptr<udp_entity_io>;
364using udp_entity_io_weak_ptr = std::weak_ptr<udp_entity_io>;
365
366} // end detail namespace
367
368} // end net namespace
369} // end chops namespace
370
371#endif
372
basic_io_output class template, providing send and get_output_queue_stats methods.
Definition io_common.hpp:37
Definition net_entity_common.hpp:49
Definition udp_entity_io.hpp:66
Class to convert network host names and ports into Asio endpoint objects.
Common code, factored out, for TCP and UDP io handlers.
Common code, factored out, for TCP acceptor, TCP connector, and UDP net entity handlers.
Error codes, exception class, and error category within Chops net_ip library.
Structures containing statistics gathered on internal queues.
Definition udp_entity_io.hpp:52
output_queue_stats provides information on the internal output queue.
Definition queue_stats.hpp:29
Definition tcp_dsr.cpp:96