Chops Net IP
Loading...
Searching...
No Matches
tcp_io.hpp
Go to the documentation of this file.
1
18#ifndef TCP_IO_HPP_INCLUDED
19#define TCP_IO_HPP_INCLUDED
20
21#include "asio/io_context.hpp"
22#include "asio/any_io_executor.hpp"
23#include "asio/read.hpp"
24#include "asio/read_until.hpp"
25#include "asio/write.hpp"
26#include "asio/ip/tcp.hpp"
27#include "asio/buffer.hpp"
28
29#include <memory> // std::shared_ptr, std::enable_shared_from_this
30#include <system_error>
31
32#include <cstddef> // std::size_t
33#include <utility> // std::forward, std::move
34#include <string>
35#include <string_view>
36#include <functional> // std::function
37
41
44
45#include "buffer/shared_buffer.hpp"
46
47namespace chops {
48namespace net {
49namespace detail {
50
51inline std::size_t null_msg_frame (asio::mutable_buffer) noexcept { return 0u; }
52
53template <typename IOT>
54bool null_msg_hdlr (asio::const_buffer, basic_io_output<IOT>, asio::ip::tcp::endpoint) {
55 return true;
56}
57
58class tcp_io : public std::enable_shared_from_this<tcp_io> {
59public:
60 using endpoint_type = asio::ip::tcp::endpoint;
61 using entity_notifier_cb = std::function<void (std::error_code, std::shared_ptr<tcp_io>)>;
62
63private:
64 using byte_vec = chops::mutable_shared_buffer::byte_vec;
65
66private:
67
68 asio::ip::tcp::socket m_socket;
70 entity_notifier_cb m_notifier_cb;
71 endpoint_type m_remote_endp;
72
73 // the following member is only used for read processing; it could be
74 // moved through handlers, but is a member for simplicity and to reduce
75 // moving
76 byte_vec m_byte_vec;
77
78public:
79
80 tcp_io(asio::ip::tcp::socket sock, entity_notifier_cb cb) noexcept :
81 m_socket(std::move(sock)), m_io_common(),
82 m_notifier_cb(cb), m_remote_endp(),
83 m_byte_vec() { }
84
85private:
86 // no copy or assignment semantics for this class
87 tcp_io(const tcp_io&) = delete;
88 tcp_io(tcp_io&&) = delete;
89 tcp_io& operator=(const tcp_io&) = delete;
90 tcp_io& operator=(tcp_io&&) = delete;
91
92public:
93 // all of the methods in this public section can be called through a basic_io_interface
94 // or basic_io_output
95
96 template <typename F>
97 void visit_socket(F&& f) {
98 f(m_socket);
99 }
100
101 output_queue_stats get_output_queue_stats() const noexcept {
102 return m_io_common.get_output_queue_stats();
103 }
104
105 bool is_io_started() const noexcept { return m_io_common.is_io_started(); }
106
107 template <typename MH, typename MF>
108 bool start_io(std::size_t header_size, MH&& msg_handler, MF&& msg_frame) {
109 if (!start_io_setup()) {
110 return false;
111 }
112 m_byte_vec.resize(header_size);
113 start_read(asio::mutable_buffer(m_byte_vec.data(), m_byte_vec.size()), header_size,
114 std::forward<MH>(msg_handler), std::forward<MF>(msg_frame));
115 return true;
116 }
117
118 template <typename MH>
119 bool start_io(std::size_t header_size, MH&& msg_handler, hdr_decoder_func func) {
120 return start_io(header_size, std::forward<MH>(msg_handler),
122 }
123
124 template <typename MH>
125 bool start_io(std::string_view delimiter, MH&& msg_handler) {
126 if (!start_io_setup()) {
127 return false;
128 }
129 // not sure of delimiter std::string_view lifetime, so create string
130 start_read_until(std::string(delimiter), std::forward<MH>(msg_handler));
131 return true;
132 }
133
134 template <typename MH>
135 bool start_io(std::size_t read_size, MH&& msg_handler) {
136 return start_io(read_size, std::forward<MH>(msg_handler), null_msg_frame);
137 }
138
139 bool start_io() {
140 return start_io(1, null_msg_hdlr<tcp_io>, null_msg_frame);
141 }
142
143
144 bool stop_io() {
145 bool ret = true;
146 if (!m_io_common.is_io_started()) {
147 // handle degenerate case where start_io never called - close the open socket,
148 // notify acceptor or connector so tcp_io object can be removed
149 ret = false;
150 m_io_common.set_io_started();
151 }
152 close(std::make_error_code(net_ip_errc::tcp_io_handler_stopped));
153 return ret;
154 }
155
156 // io_common has concurrency protection
157 bool send(const chops::const_shared_buffer& buf) {
158 auto ret = m_io_common.start_write(buf,
159 [this] (const chops::const_shared_buffer& b) {
160 start_write(b);
161 }
162 );
164 }
165
166 bool send(const chops::const_shared_buffer& buf, const endpoint_type&) {
167 return send(buf);
168 }
169
170private:
171 void close(const std::error_code& err) {
172 if (!m_io_common.set_io_stopped()) {
173 return; // already stopped, short circuit any late handler callbacks
174 }
175 m_io_common.clear();
176 std::error_code ec;
177 m_socket.shutdown(asio::ip::tcp::socket::shutdown_receive, ec);
178 m_socket.close(ec);
179 // notify the acceptor or connector that this tcp_io object is closed
180 m_notifier_cb(err, shared_from_this());
181 }
182
183private:
184
185 bool start_io_setup() {
186 if (!m_io_common.set_io_started()) { // concurrency protected
187 return false;
188 }
189 std::error_code ec;
190 m_remote_endp = m_socket.remote_endpoint(ec);
191 if (ec) {
192 close(ec);
193 return false;
194 }
195 return true;
196 }
197
198 template <typename MH, typename MF>
199 void start_read(asio::mutable_buffer mbuf, std::size_t hdr_size, MH&& msg_hdlr, MF&& msg_frame) {
200 auto self { shared_from_this() };
201 asio::async_read(m_socket, mbuf,
202 [this, self, hdr_size, mbuf, msg_hdlr = std::move(msg_hdlr), msg_frame = std::move(msg_frame)]
203 (const std::error_code& err, std::size_t nb) mutable {
204 handle_read(mbuf, hdr_size, err, nb, std::move(msg_hdlr), std::move(msg_frame));
205 }
206 );
207 }
208
209 template <typename MH, typename MF>
210 void handle_read(asio::mutable_buffer, std::size_t,
211 const std::error_code&, std::size_t, MH&&, MF&&);
212
213 template <typename MH>
214 void start_read_until(std::string delim, MH&& msg_hdlr) {
215 auto self { shared_from_this() };
216 asio::async_read_until(m_socket, asio::dynamic_buffer(m_byte_vec), delim,
217 [this, self, delim, msg_hdlr = std::move(msg_hdlr)]
218 (const std::error_code& err, std::size_t nb) mutable {
219 handle_read_until(delim, err, nb, std::move(msg_hdlr));
220 }
221 );
222 }
223
224 template <typename MH>
225 void handle_read_until(std::string, const std::error_code&, std::size_t, MH&&);
226
227 void start_write(const chops::const_shared_buffer&);
228
229 void handle_write(const std::error_code&, std::size_t);
230
231};
232
233// method implementations, just to make the class declaration a little more readable
234
235template <typename MH, typename MF>
236void tcp_io::handle_read(asio::mutable_buffer mbuf, std::size_t hdr_size,
237 const std::error_code& err, std::size_t /* num_bytes */,
238 MH&& msg_hdlr, MF&& msg_frame) {
239
240 if (err) {
241 close(err);
242 return;
243 }
244 // assert num_bytes == mbuf.size()
245 std::size_t next_read_size = msg_frame(mbuf);
246 if (next_read_size == 0u) { // msg fully received, now invoke message handler
247 if (!msg_hdlr(asio::const_buffer(m_byte_vec.data(), m_byte_vec.size()),
248 basic_io_output<tcp_io>(weak_from_this()), m_remote_endp)) {
249 auto self { shared_from_this() };
250 // message handler not happy, tear everything down, post function object
251 // instead of directly calling close to give a return message a possibility
252 // of getting through
253 asio::post(m_socket.get_executor(), [this, self] () {
254 close(std::make_error_code(net_ip_errc::message_handler_terminated)); } );
255 return;
256 }
257 m_byte_vec.resize(hdr_size);
258 mbuf = asio::mutable_buffer(m_byte_vec.data(), m_byte_vec.size());
259 }
260 else {
261 std::size_t old_size = m_byte_vec.size();
262 m_byte_vec.resize(old_size + next_read_size);
263 mbuf = asio::mutable_buffer(m_byte_vec.data() + old_size, next_read_size);
264 }
265 start_read(mbuf, hdr_size, std::forward<MH>(msg_hdlr), std::forward<MF>(msg_frame));
266}
267
268template <typename MH>
269void tcp_io::handle_read_until(std::string delim, const std::error_code& err,
270 std::size_t num_bytes, MH&& msg_hdlr) {
271
272 if (err) {
273 close(err);
274 return;
275 }
276 // beginning of m_byte_vec to num_bytes is buf, includes delimiter bytes
277 if (!msg_hdlr(asio::const_buffer(m_byte_vec.data(), num_bytes),
278 basic_io_output<tcp_io>(weak_from_this()), m_remote_endp)) {
279 auto self { shared_from_this() };
280 asio::post(m_socket.get_executor(), [this, self] () {
281 close(std::make_error_code(net_ip_errc::message_handler_terminated)); } );
282 return;
283 }
284 m_byte_vec.erase(m_byte_vec.begin(), m_byte_vec.begin() + num_bytes);
285 start_read_until(delim, std::forward<MH>(msg_hdlr));
286}
287
288
289inline void tcp_io::start_write(const chops::const_shared_buffer& buf) {
290 auto self { shared_from_this() };
291 asio::async_write(m_socket, asio::const_buffer(buf.data(), buf.size()),
292 [this, self] (const std::error_code& err, std::size_t nb) {
293 handle_write(err, nb);
294 }
295 );
296}
297
298inline void tcp_io::handle_write(const std::error_code& err, std::size_t /* num_bytes */) {
299 if (err) {
300 // read pops first, so usually no error is needed in write handlers
301 close(err);
302 return;
303 }
304 m_io_common.write_next_elem([this] (const chops::const_shared_buffer& buf) {
305 start_write(buf);
306 }
307 );
308}
309
310using tcp_io_shared_ptr = std::shared_ptr<tcp_io>;
311using tcp_io_weak_ptr = std::weak_ptr<tcp_io>;
312
313} // end detail namespace
314
315} // end net namespace
316} // end chops namespace
317
318#endif
319
basic_io_output class template, providing send and get_output_queue_stats methods.
The basic_io_output class template provides methods for sending data to an associated network IO hand...
Definition basic_io_output.hpp:57
Definition io_common.hpp:37
Definition tcp_io.hpp:58
Function object class used in the basic_io_interface start_io methods, implements a common message fr...
Definition simple_variable_len_msg_frame.hpp:59
Common code, factored out, for TCP and UDP io handlers.
Error codes, exception class, and error category within Chops net_ip library.
Structures containing statistics gathered on internal queues.
Function object class and declaration for simple variable length TCP message framing.
output_queue_stats provides information on the internal output queue.
Definition queue_stats.hpp:29
Definition tcp_dsr.cpp:96