66class udp_entity_io :
public std::enable_shared_from_this<udp_entity_io> {
68 using endpoint_type = asio::ip::udp::endpoint;
71 using byte_vec = chops::mutable_shared_buffer::byte_vec;
77 asio::io_context& m_ioc;
78 asio::ip::udp::socket m_socket;
79 endpoint_type m_local_endp;
80 endpoint_type m_default_dest_endp;
81 std::string m_local_port_or_service;
82 std::string m_local_intf;
90 endpoint_type m_sender_endp;
95 const endpoint_type& local_endp) noexcept :
96 m_io_common(), m_entity_common(), m_ioc(ioc),
97 m_socket(ioc), m_local_endp(local_endp), m_default_dest_endp(),
98 m_local_port_or_service(), m_local_intf(),
99 m_shutting_down(
false),
100 m_byte_vec(), m_sender_endp()
104 std::string_view local_port_or_service, std::string_view local_intf) noexcept :
105 m_io_common(), m_entity_common(), m_ioc(ioc),
106 m_socket(ioc), m_local_endp(), m_default_dest_endp(),
107 m_local_port_or_service(local_port_or_service), m_local_intf(local_intf),
108 m_shutting_down(
false),
109 m_byte_vec(), m_sender_endp()
124 bool is_started()
const noexcept {
return m_entity_common.is_started(); }
126 bool is_io_started()
const noexcept {
return m_io_common.is_io_started(); }
128 template <
typename F>
129 void visit_socket(F&& f) {
133 template <
typename F>
134 std::size_t visit_io_output(F&& func) {
135 auto self = shared_from_this();
136 std::promise<std::size_t> prom;
137 auto fut = prom.get_future();
139 asio::post(m_socket.get_executor(), [
this, self, &func, p = std::move(prom)] ()
mutable {
140 if (m_io_common.is_io_started()) {
141 func(basic_io_output<udp_entity_io>(weak_from_this()));
153 return m_io_common.get_output_queue_stats();
156 template <
typename F1,
typename F2>
157 std::error_code start(F1&& io_state_chg, F2&& err_cb) {
158 auto self = shared_from_this();
159 return m_entity_common.start(std::forward<F1>(io_state_chg), std::forward<F2>(err_cb),
160 m_socket.get_executor(),
161 [
this, self] () { return do_start(); } );
164 template <
typename MH>
165 bool start_io(std::size_t max_size, MH&& msg_handler) {
166 if (!m_io_common.set_io_started()) {
169 if (m_local_endp == endpoint_type()) {
172 m_byte_vec.resize(max_size);
175 start_read(std::forward<MH>(msg_handler));
179 template <
typename MH>
180 bool start_io(
const endpoint_type& endp, std::size_t max_size, MH&& msg_handler) {
181 if (!m_io_common.set_io_started()) {
184 if (m_local_endp == endpoint_type()) {
187 m_default_dest_endp = endp;
188 m_byte_vec.resize(max_size);
191 start_read(std::forward<MH>(msg_handler));
196 if (!m_io_common.set_io_started()) {
204 bool start_io(
const endpoint_type& endp) {
205 if (!m_io_common.set_io_started()) {
208 m_default_dest_endp = endp;
216 bool ret = !m_io_common.is_io_started();
217 close(std::make_error_code(net_ip_errc::udp_io_handler_stopped));
221 std::error_code stop() {
222 auto self = shared_from_this();
223 return m_entity_common.stop(m_socket.get_executor(),
225 close(std::make_error_code(net_ip_errc::udp_entity_stopped));
226 return std::error_code();
232 bool send(
const chops::const_shared_buffer& buf) {
233 return send(buf, m_default_dest_endp);
236 bool send(
const chops::const_shared_buffer& buf,
const endpoint_type& endp) {
237 if (endp == endpoint_type()) {
240 auto ret = m_io_common.start_write(udp_queue_element(buf, endp),
241 [
this] (
const udp_queue_element& e) {
245 return ret != io_common<udp_queue_element>::write_status::io_stopped;
250 template <
typename MH>
252 auto self { shared_from_this() };
253 m_socket.async_receive_from(
254 asio::mutable_buffer(m_byte_vec.data(), m_byte_vec.size()),
257 (
const std::error_code& err, std::size_t nb)
mutable {
258 handle_read(err, nb, std::move(msg_hdlr));
263 template <
typename MH>
264 void handle_read(
const std::error_code&, std::size_t, MH&&);
266 void start_write(
const udp_queue_element&);
268 void handle_write(
const std::error_code&, std::size_t);
272 std::error_code do_start() {
273 if (!m_local_port_or_service.empty()) {
274 endpoints_resolver<asio::ip::udp> resolver(m_ioc);
275 auto ret = resolver.make_endpoints(
true, m_local_intf, m_local_port_or_service);
280 m_local_endp = ret->cbegin()->endpoint();
281 m_local_port_or_service.clear();
282 m_local_port_or_service.shrink_to_fit();
283 m_local_intf.clear();
284 m_local_intf.shrink_to_fit();
287 m_socket.open(m_local_endp.protocol(), ec);
292 if (m_local_endp != endpoint_type()) {
293 m_socket.bind(m_local_endp, ec);
299 m_entity_common.call_io_state_chg_cb(shared_from_this(), 1,
true);
303 void close(
const std::error_code& err) {
304 auto self { shared_from_this() };
305 m_entity_common.call_error_cb(self, err);
306 if (m_shutting_down) {
309 m_shutting_down =
true;
310 m_io_common.set_io_stopped();
311 m_entity_common.set_stopped();
315 m_entity_common.call_error_cb(self, std::make_error_code(net_ip_errc::udp_entity_closed));
316 m_entity_common.call_io_state_chg_cb(self, 0,
false);