58class tcp_io :
public std::enable_shared_from_this<tcp_io> {
60 using endpoint_type = asio::ip::tcp::endpoint;
61 using entity_notifier_cb = std::function<void (std::error_code, std::shared_ptr<tcp_io>)>;
64 using byte_vec = chops::mutable_shared_buffer::byte_vec;
68 asio::ip::tcp::socket m_socket;
70 entity_notifier_cb m_notifier_cb;
71 endpoint_type m_remote_endp;
80 tcp_io(asio::ip::tcp::socket sock, entity_notifier_cb cb) noexcept :
81 m_socket(std::move(sock)), m_io_common(),
82 m_notifier_cb(cb), m_remote_endp(),
97 void visit_socket(F&& f) {
102 return m_io_common.get_output_queue_stats();
105 bool is_io_started()
const noexcept {
return m_io_common.is_io_started(); }
107 template <
typename MH,
typename MF>
108 bool start_io(std::size_t header_size, MH&& msg_handler, MF&& msg_frame) {
109 if (!start_io_setup()) {
112 m_byte_vec.resize(header_size);
113 start_read(asio::mutable_buffer(m_byte_vec.data(), m_byte_vec.size()), header_size,
114 std::forward<MH>(msg_handler), std::forward<MF>(msg_frame));
118 template <
typename MH>
119 bool start_io(std::size_t header_size, MH&& msg_handler, hdr_decoder_func func) {
120 return start_io(header_size, std::forward<MH>(msg_handler),
124 template <
typename MH>
125 bool start_io(std::string_view delimiter, MH&& msg_handler) {
126 if (!start_io_setup()) {
130 start_read_until(std::string(delimiter), std::forward<MH>(msg_handler));
134 template <
typename MH>
135 bool start_io(std::size_t read_size, MH&& msg_handler) {
136 return start_io(read_size, std::forward<MH>(msg_handler), null_msg_frame);
140 return start_io(1, null_msg_hdlr<tcp_io>, null_msg_frame);
146 if (!m_io_common.is_io_started()) {
150 m_io_common.set_io_started();
152 close(std::make_error_code(net_ip_errc::tcp_io_handler_stopped));
157 bool send(
const chops::const_shared_buffer& buf) {
158 auto ret = m_io_common.start_write(buf,
159 [
this] (
const chops::const_shared_buffer& b) {
166 bool send(
const chops::const_shared_buffer& buf,
const endpoint_type&) {
171 void close(
const std::error_code& err) {
172 if (!m_io_common.set_io_stopped()) {
177 m_socket.shutdown(asio::ip::tcp::socket::shutdown_receive, ec);
180 m_notifier_cb(err, shared_from_this());
185 bool start_io_setup() {
186 if (!m_io_common.set_io_started()) {
190 m_remote_endp = m_socket.remote_endpoint(ec);
198 template <
typename MH,
typename MF>
199 void start_read(asio::mutable_buffer mbuf, std::size_t hdr_size, MH&&
msg_hdlr, MF&& msg_frame) {
200 auto self { shared_from_this() };
201 asio::async_read(m_socket, mbuf,
202 [
this, self, hdr_size, mbuf,
msg_hdlr = std::move(
msg_hdlr), msg_frame = std::move(msg_frame)]
203 (
const std::error_code& err, std::size_t nb)
mutable {
204 handle_read(mbuf, hdr_size, err, nb, std::move(
msg_hdlr), std::move(msg_frame));
209 template <
typename MH,
typename MF>
210 void handle_read(asio::mutable_buffer, std::size_t,
211 const std::error_code&, std::size_t, MH&&, MF&&);
213 template <
typename MH>
214 void start_read_until(std::string delim, MH&&
msg_hdlr) {
215 auto self { shared_from_this() };
216 asio::async_read_until(m_socket, asio::dynamic_buffer(m_byte_vec), delim,
218 (
const std::error_code& err, std::size_t nb)
mutable {
219 handle_read_until(delim, err, nb, std::move(
msg_hdlr));
224 template <
typename MH>
225 void handle_read_until(std::string,
const std::error_code&, std::size_t, MH&&);
227 void start_write(
const chops::const_shared_buffer&);
229 void handle_write(
const std::error_code&, std::size_t);