46class tcp_acceptor :
public std::enable_shared_from_this<tcp_acceptor> {
48 using endpoint_type = asio::ip::tcp::endpoint;
52 asio::io_context& m_ioc;
53 asio::ip::tcp::acceptor m_acceptor;
54 std::vector<tcp_io_shared_ptr> m_io_handlers;
55 endpoint_type m_acceptor_endp;
56 std::string m_local_port_or_service;
57 std::string m_listen_intf;
62 tcp_acceptor(asio::io_context& ioc,
const endpoint_type& endp,
64 m_entity_common(), m_ioc(ioc), m_acceptor(ioc), m_io_handlers(), m_acceptor_endp(endp),
65 m_local_port_or_service(), m_listen_intf(),
66 m_reuse_addr(reuse_addr), m_shutting_down(
false) { }
69 std::string_view local_port_or_service, std::string_view listen_intf,
71 m_entity_common(), m_ioc(ioc), m_acceptor(ioc), m_io_handlers(), m_acceptor_endp(),
72 m_local_port_or_service(local_port_or_service), m_listen_intf(listen_intf),
73 m_reuse_addr(reuse_addr), m_shutting_down(
false) { }
84 bool is_started()
const noexcept {
return m_entity_common.is_started(); }
87 void visit_socket(F&& f) {
92 std::size_t visit_io_output(F&& func) {
93 auto self = shared_from_this();
94 std::promise<std::size_t> prom;
95 auto fut = prom.get_future();
97 asio::post(m_ioc, [
this, self, &func, p = std::move(prom)] ()
mutable {
99 if (m_shutting_down) {
103 for (
auto& ioh : m_io_handlers) {
104 if (ioh->is_io_started()) {
115 template <
typename F1,
typename F2>
116 std::error_code start(F1&& io_state_chg, F2&& err_func) {
117 auto self = shared_from_this();
118 return m_entity_common.start(std::forward<F1>(io_state_chg), std::forward<F2>(err_func),
119 m_acceptor.get_executor(),
120 [
this, self] () { return do_start(); } );
123 std::error_code stop() {
124 auto self = shared_from_this();
125 return m_entity_common.stop(m_acceptor.get_executor(),
127 close(std::make_error_code(net_ip_errc::tcp_acceptor_stopped));
128 return std::error_code();
135 std::error_code do_start() {
136 if (!m_local_port_or_service.empty()) {
138 auto ret = resolver.
make_endpoints(
true, m_listen_intf, m_local_port_or_service);
143 m_acceptor_endp = ret->cbegin()->endpoint();
144 m_local_port_or_service.clear();
145 m_local_port_or_service.shrink_to_fit();
146 m_listen_intf.clear();
147 m_listen_intf.shrink_to_fit();
150 m_acceptor.open(m_acceptor_endp.protocol(), ec);
156 m_acceptor.set_option(asio::socket_base::reuse_address(
true), ec);
162 m_acceptor.bind(m_acceptor_endp, ec);
167 m_acceptor.listen(asio::socket_base::max_listen_connections, ec);
177 void close(
const std::error_code& err) {
178 m_entity_common.call_error_cb(tcp_io_shared_ptr(), err);
179 if (m_shutting_down) {
182 m_shutting_down =
true;
183 m_entity_common.set_stopped();
186 auto iohs = m_io_handlers;
187 for (
auto& i : iohs) {
192 m_acceptor.close(ec);
194 m_entity_common.call_error_cb(tcp_io_shared_ptr(), ec);
196 m_entity_common.call_error_cb(tcp_io_shared_ptr(),
197 std::make_error_code(net_ip_errc::tcp_acceptor_closed));
202 void start_accept() {
203 using namespace std::placeholders;
205 if (m_shutting_down) {
208 auto self = shared_from_this();
209 m_acceptor.async_accept( [
this, self]
210 (
const std::error_code& err, asio::ip::tcp::socket sock) {
211 if (err || m_shutting_down ) {
214 tcp_io_shared_ptr iop = std::make_shared<tcp_io>(std::move(sock),
215 tcp_io::entity_notifier_cb(std::bind(&tcp_acceptor::notify_me, shared_from_this(), _1, _2)));
216 m_io_handlers.push_back(iop);
220 asio::post(m_ioc, [
this, self, iop] () {
221 m_entity_common.call_io_state_chg_cb(iop, m_io_handlers.size(),
true);
231 void notify_me(std::error_code err, tcp_io_shared_ptr iop) {
232 std::erase_if (m_io_handlers, [iop] (
auto sp) {
return iop == sp; } );
233 m_entity_common.call_error_cb(iop, err);
234 m_entity_common.call_io_state_chg_cb(iop, m_io_handlers.size(),
false);