58class tcp_connector :
public std::enable_shared_from_this<tcp_connector> {
60 using endpoint_type = asio::ip::tcp::endpoint;
64 using resolver_results = asio::ip::basic_resolver_results<asio::ip::tcp>;
65 using endpoints = std::vector<endpoint_type>;
66 using endpoints_iter = endpoints::const_iterator;
69 enum conn_state { stopped, resolving, connecting, connected, timeout, closing };
73 asio::ip::tcp::socket m_socket;
74 tcp_io_shared_ptr m_io_handler;
76 endpoints m_endpoints;
77 asio::steady_timer m_timer;
78 std::string m_remote_host;
79 std::string m_remote_port;
81 tcp_connector_timeout_func m_timeout_func;
82 std::size_t m_conn_attempts;
86 template <
typename Iter>
89 tcp_connector_timeout_func tout_func,
95 m_endpoints(beg, end),
99 m_reconn_on_err(reconn_on_err),
100 m_timeout_func(tout_func),
106 std::string_view remote_port, std::string_view remote_host,
107 tcp_connector_timeout_func tout_func,
108 bool reconn_on_err) :
115 m_remote_host(remote_host),
116 m_remote_port(remote_port),
117 m_reconn_on_err(reconn_on_err),
118 m_timeout_func(tout_func),
132 bool is_started()
const noexcept {
return m_entity_common.is_started(); }
134 template <
typename F>
135 void visit_socket(F&& f) {
139 template <
typename F>
140 std::size_t visit_io_output(F&& func) {
141 auto self = shared_from_this();
142 std::promise<std::size_t> prom;
143 auto fut = prom.get_future();
145 asio::post(m_socket.get_executor(), [
this, self, &func, p = std::move(prom)] ()
mutable {
146 if (m_io_handler && m_io_handler->is_io_started()) {
147 func(basic_io_output<tcp_io>(m_io_handler));
158 template <
typename F1,
typename F2>
159 std::error_code start(F1&& io_state_chg, F2&& err_cb) {
160 auto self = shared_from_this();
161 return m_entity_common.start(std::forward<F1>(io_state_chg), std::forward<F2>(err_cb),
162 m_socket.get_executor(),
163 [
this, self] () { return do_start(); } );
166 std::error_code stop() {
167 auto self = shared_from_this();
168 return m_entity_common.stop(m_socket.get_executor(),
170 close(std::make_error_code(net_ip_errc::tcp_connector_stopped));
171 return std::error_code();
179 void clear_strings() noexcept {
180 m_remote_host.clear();
181 m_remote_host.shrink_to_fit();
182 m_remote_port.clear();
183 m_remote_port.shrink_to_fit();
186 std::error_code do_start() {
188 if (m_endpoints.empty()) {
190 m_entity_common.call_error_cb(tcp_io_shared_ptr(),
191 std::make_error_code(net_ip_errc::tcp_connector_resolving_addresses));
192 auto self = shared_from_this();
193 m_resolver.make_endpoints(
false, m_remote_host, m_remote_port,
195 (std::error_code err, resolver_results res) {
196 if (err || m_state != resolving) {
201 for (
const auto& e : res) {
202 m_endpoints.push_back(e.endpoint());
205 start_connect(m_timeout_func);
211 start_connect(m_timeout_func);
215 void close(
const std::error_code& err) {
216 if (m_state == closing || m_state == stopped) {
219 auto sav_state = m_state;
221 m_entity_common.set_stopped();
237 m_entity_common.call_error_cb(tcp_io_shared_ptr(), err);
243 m_io_handler->stop_io();
259 void finish_close(
const std::error_code& err) {
262 m_entity_common.call_error_cb(tcp_io_shared_ptr(), err);
264 m_entity_common.call_error_cb(tcp_io_shared_ptr(),
265 std::make_error_code(net_ip_errc::tcp_connector_closed));
268 void start_connect(tcp_connector_timeout_func tout_func) {
269 m_state = connecting;
271 m_entity_common.call_error_cb(tcp_io_shared_ptr(),
272 std::make_error_code(net_ip_errc::tcp_connector_connecting));
273 auto self = shared_from_this();
274 asio::async_connect(m_socket, m_endpoints.cbegin(), m_endpoints.cend(),
275 [
this, self, tout_func]
276 (
const std::error_code& err, endpoints_iter iter) {
277 handle_connect(err, iter, tout_func);
282 void handle_connect (
const std::error_code& err, endpoints_iter ,
283 tcp_connector_timeout_func tout_func) {
284 using namespace std::placeholders;
286 if (m_state != connecting) {
290 m_entity_common.call_error_cb(tcp_io_shared_ptr(), err);
291 auto opt_timeout = m_timeout_func(m_conn_attempts);
293 close(std::make_error_code(net_ip_errc::tcp_connector_no_reconnect_attempted));
297 m_timer.expires_after(*opt_timeout);
299 catch (
const std::system_error& se) {
304 m_entity_common.call_error_cb(tcp_io_shared_ptr(),
305 std::make_error_code(net_ip_errc::tcp_connector_timeout));
306 auto self = shared_from_this();
307 m_timer.async_wait( [
this, self, tout_func]
308 (
const std::error_code& err) {
309 if (err || m_state != timeout) {
313 start_connect(tout_func);
318 m_io_handler = std::make_shared<tcp_io>(std::move(m_socket),
319 tcp_io::entity_notifier_cb(std::bind(&tcp_connector::notify_me, shared_from_this(), _1, _2)));
323 m_entity_common.call_error_cb(tcp_io_shared_ptr(),
324 std::make_error_code(net_ip_errc::tcp_connector_connected));
325 m_entity_common.call_io_state_chg_cb(m_io_handler, 1,
true);
326 m_conn_attempts = 0u;
329 void notify_me(std::error_code err, tcp_io_shared_ptr iop) {
332 m_io_handler.reset();
333 m_entity_common.call_error_cb(iop, err);
335 m_entity_common.call_io_state_chg_cb(iop, 0,
false);
336 if (m_state == connected && m_reconn_on_err) {
337 start_connect(m_timeout_func);
340 finish_close(std::make_error_code(net_ip_errc::tcp_connector_no_reconnect_attempted));