Chops Net IP
Loading...
Searching...
No Matches
tcp_connector.hpp
Go to the documentation of this file.
1
18#ifndef TCP_CONNECTOR_HPP_INCLUDED
19#define TCP_CONNECTOR_HPP_INCLUDED
20
21#include "asio/ip/tcp.hpp"
22#include "asio/post.hpp"
23#include "asio/connect.hpp"
24#include "asio/io_context.hpp"
25#include "asio/ip/basic_resolver.hpp"
26#include "asio/steady_timer.hpp"
27
28#include <system_error>
29#include <vector>
30#include <memory>
31#include <chrono>
32#include <future>
33
34#include <cstddef> // for std::size_t
35
38
40
43
44// TCP connector has the most complicated states of any of the net entity detail
45// objects. The states transition from stopped to resolving addresses to connecting
46// to connected, then back to connecting or stopped depending on the transition. The
47// closing state is used for errors and shutting down. There is also a timeout state
48// for when a connection is refused. As typical, the shutdown logic is non-trivial.
49//
50// The states and transitions could be implemented with a more formal state transition
51// table, including some nice state transition classes from Boost or elsewhere, but
52// for now everything is hard-coded and manually set.
53
54namespace chops {
55namespace net {
56namespace detail {
57
58class tcp_connector : public std::enable_shared_from_this<tcp_connector> {
59public:
60 using endpoint_type = asio::ip::tcp::endpoint;
61
62private:
64 using resolver_results = asio::ip::basic_resolver_results<asio::ip::tcp>;
65 using endpoints = std::vector<endpoint_type>;
66 using endpoints_iter = endpoints::const_iterator;
67
68private:
69 enum conn_state { stopped, resolving, connecting, connected, timeout, closing };
70
71private:
72 net_entity_common<tcp_io> m_entity_common;
73 asio::ip::tcp::socket m_socket;
74 tcp_io_shared_ptr m_io_handler;
75 resolver_type m_resolver;
76 endpoints m_endpoints;
77 asio::steady_timer m_timer;
78 std::string m_remote_host;
79 std::string m_remote_port;
80 bool m_reconn_on_err;
81 tcp_connector_timeout_func m_timeout_func;
82 std::size_t m_conn_attempts;
83 conn_state m_state;
84
85public:
86 template <typename Iter>
87 tcp_connector(asio::io_context& ioc,
88 Iter beg, Iter end,
89 tcp_connector_timeout_func tout_func,
90 bool reconn_on_err) :
91 m_entity_common(),
92 m_socket(ioc),
93 m_io_handler(),
94 m_resolver(ioc),
95 m_endpoints(beg, end),
96 m_timer(ioc),
97 m_remote_host(),
98 m_remote_port(),
99 m_reconn_on_err(reconn_on_err),
100 m_timeout_func(tout_func),
101 m_conn_attempts(0u),
102 m_state(stopped)
103 { }
104
105 tcp_connector(asio::io_context& ioc,
106 std::string_view remote_port, std::string_view remote_host,
107 tcp_connector_timeout_func tout_func,
108 bool reconn_on_err) :
109 m_entity_common(),
110 m_socket(ioc),
111 m_io_handler(),
112 m_resolver(ioc),
113 m_endpoints(),
114 m_timer(ioc),
115 m_remote_host(remote_host),
116 m_remote_port(remote_port),
117 m_reconn_on_err(reconn_on_err),
118 m_timeout_func(tout_func),
119 m_conn_attempts(0u),
120 m_state(stopped)
121 { }
122
123private:
124 // no copy or assignment semantics for this class
125 tcp_connector(const tcp_connector&) = delete;
126 tcp_connector(tcp_connector&&) = delete;
127 tcp_connector& operator=(const tcp_connector&) = delete;
128 tcp_connector& operator=(tcp_connector&&) = delete;
129
130public:
131
132 bool is_started() const noexcept { return m_entity_common.is_started(); }
133
134 template <typename F>
135 void visit_socket(F&& f) {
136 f(m_socket);
137 }
138
139 template <typename F>
140 std::size_t visit_io_output(F&& func) {
141 auto self = shared_from_this();
142 std::promise<std::size_t> prom;
143 auto fut = prom.get_future();
144 // send to executor for concurrency protection
145 asio::post(m_socket.get_executor(), [this, self, &func, p = std::move(prom)] () mutable {
146 if (m_io_handler && m_io_handler->is_io_started()) {
147 func(basic_io_output<tcp_io>(m_io_handler));
148 p.set_value(1u);
149 }
150 else {
151 p.set_value(0u);
152 }
153 }
154 );
155 return fut.get();
156 }
157
158 template <typename F1, typename F2>
159 std::error_code start(F1&& io_state_chg, F2&& err_cb) {
160 auto self = shared_from_this();
161 return m_entity_common.start(std::forward<F1>(io_state_chg), std::forward<F2>(err_cb),
162 m_socket.get_executor(),
163 [this, self] () { return do_start(); } );
164 }
165
166 std::error_code stop() {
167 auto self = shared_from_this();
168 return m_entity_common.stop(m_socket.get_executor(),
169 [this, self] () {
170 close(std::make_error_code(net_ip_errc::tcp_connector_stopped));
171 return std::error_code();
172 }
173 );
174 }
175
176
177private:
178
179 void clear_strings() noexcept {
180 m_remote_host.clear(); // no longer need the string contents
181 m_remote_host.shrink_to_fit();
182 m_remote_port.clear();
183 m_remote_port.shrink_to_fit();
184 }
185
186 std::error_code do_start() {
187 // empty endpoints container is the indication that a resolve is needed
188 if (m_endpoints.empty()) {
189 m_state = resolving;
190 m_entity_common.call_error_cb(tcp_io_shared_ptr(),
191 std::make_error_code(net_ip_errc::tcp_connector_resolving_addresses));
192 auto self = shared_from_this();
193 m_resolver.make_endpoints(false, m_remote_host, m_remote_port,
194 [this, self]
195 (std::error_code err, resolver_results res) {
196 if (err || m_state != resolving) {
197 m_state = stopped;
198 close(err);
199 return;
200 }
201 for (const auto& e : res) {
202 m_endpoints.push_back(e.endpoint());
203 }
204 clear_strings();
205 start_connect(m_timeout_func);
206 }
207 );
208 return { };
209 }
210 clear_strings();
211 start_connect(m_timeout_func);
212 return { };
213 }
214
215 void close(const std::error_code& err) {
216 if (m_state == closing || m_state == stopped) {
217 return; // already shutting down or stopped, bypass closing again
218 }
219 auto sav_state = m_state;
220 m_state = closing;
221 m_entity_common.set_stopped(); // for internal closes
222 std::error_code ec;
223 m_socket.close(ec);
224 switch (sav_state) {
225 case stopped: {
226 break;
227 }
228 case resolving: {
229 m_resolver.cancel();
230 break;
231 }
232 case connecting: {
233 // socket close should cancel any connect attempts
234 break;
235 }
236 case connected: {
237 m_entity_common.call_error_cb(tcp_io_shared_ptr(), err);
238 // notify_me will be called which will clean up m_io_handler
239 // this logic branch will also call finish_close; check to
240 // make sure m_io_handler is still alive, not cleaned up by
241 // tcp_io object jumping in and calling notify_me first
242 if (m_io_handler) {
243 m_io_handler->stop_io();
244 return;
245 }
246 break;
247 }
248 case timeout: {
249 m_timer.cancel();
250 break;
251 }
252 case closing: {
253 break;
254 }
255 }
256 finish_close(err);
257 }
258
259 void finish_close(const std::error_code& err) {
260 m_state = stopped;
261 if (err) {
262 m_entity_common.call_error_cb(tcp_io_shared_ptr(), err);
263 }
264 m_entity_common.call_error_cb(tcp_io_shared_ptr(),
265 std::make_error_code(net_ip_errc::tcp_connector_closed));
266 }
267
268 void start_connect(tcp_connector_timeout_func tout_func) {
269 m_state = connecting;
270 ++m_conn_attempts;
271 m_entity_common.call_error_cb(tcp_io_shared_ptr(),
272 std::make_error_code(net_ip_errc::tcp_connector_connecting));
273 auto self = shared_from_this();
274 asio::async_connect(m_socket, m_endpoints.cbegin(), m_endpoints.cend(),
275 [this, self, tout_func]
276 (const std::error_code& err, endpoints_iter iter) {
277 handle_connect(err, iter, tout_func);
278 }
279 );
280 }
281
282 void handle_connect (const std::error_code& err, endpoints_iter /* iter */,
283 tcp_connector_timeout_func tout_func) {
284 using namespace std::placeholders;
285
286 if (m_state != connecting) {
287 return;
288 }
289 if (err) {
290 m_entity_common.call_error_cb(tcp_io_shared_ptr(), err);
291 auto opt_timeout = m_timeout_func(m_conn_attempts);
292 if (!opt_timeout) {
293 close(std::make_error_code(net_ip_errc::tcp_connector_no_reconnect_attempted));
294 return;
295 }
296 try {
297 m_timer.expires_after(*opt_timeout);
298 }
299 catch (const std::system_error& se) {
300 close(se.code());
301 return;
302 }
303 m_state = timeout;
304 m_entity_common.call_error_cb(tcp_io_shared_ptr(),
305 std::make_error_code(net_ip_errc::tcp_connector_timeout));
306 auto self = shared_from_this();
307 m_timer.async_wait( [this, self, tout_func]
308 (const std::error_code& err) {
309 if (err || m_state != timeout) {
310 close(err);
311 return;
312 }
313 start_connect(tout_func);
314 }
315 );
316 return;
317 }
318 m_io_handler = std::make_shared<tcp_io>(std::move(m_socket),
319 tcp_io::entity_notifier_cb(std::bind(&tcp_connector::notify_me, shared_from_this(), _1, _2)));
320 m_state = connected;
321 // this is only called after an async connect so no danger of invoking app code during the
322 // start method call
323 m_entity_common.call_error_cb(tcp_io_shared_ptr(),
324 std::make_error_code(net_ip_errc::tcp_connector_connected));
325 m_entity_common.call_io_state_chg_cb(m_io_handler, 1, true);
326 m_conn_attempts = 0u;
327 }
328
329 void notify_me(std::error_code err, tcp_io_shared_ptr iop) {
330 // two ways to get here: tcp_io object closed from error, or tcp_io object closed
331 // from stop_io, either by tcp_connector stop, or directly by app
332 m_io_handler.reset();
333 m_entity_common.call_error_cb(iop, err);
334 // notify app of tcp_io object shutting down
335 m_entity_common.call_io_state_chg_cb(iop, 0, false);
336 if (m_state == connected && m_reconn_on_err) {
337 start_connect(m_timeout_func);
338 return;
339 }
340 finish_close(std::make_error_code(net_ip_errc::tcp_connector_no_reconnect_attempted));
341 }
342
343};
344
345using tcp_connector_shared_ptr = std::shared_ptr<tcp_connector>;
346using tcp_connector_weak_ptr = std::weak_ptr<tcp_connector>;
347
348} // end detail namespace
349} // end net namespace
350} // end chops namespace
351
352#endif
353
basic_io_output class template, providing send and get_output_queue_stats methods.
Definition net_entity_common.hpp:49
Definition tcp_connector.hpp:58
Class to convert network host names and ports into Asio endpoint objects.
Common code, factored out, for TCP acceptor, TCP connector, and UDP net entity handlers.
Definition tcp_io_test.cpp:56
Classes that implement a connect timeout function object interface for the tcp_connector detail class...
Internal handler class for TCP stream input and output.