From ca81dcf08d9a3bf49b3d540e3b3b792bfc3b3016 Mon Sep 17 00:00:00 2001 From: Roland Reichwein Date: Sun, 5 Feb 2023 10:09:33 +0100 Subject: Remove websocket mutex in favour of boost strands --- whiteboard.cpp | 41 +++++++++++++++++++++++++---------------- whiteboard.h | 1 - 2 files changed, 25 insertions(+), 17 deletions(-) diff --git a/whiteboard.cpp b/whiteboard.cpp index ea56cc3..922bd37 100644 --- a/whiteboard.cpp +++ b/whiteboard.cpp @@ -95,11 +95,10 @@ class session: public std::enable_shared_from_this public: using connection = std::shared_ptr>; - session(ConnectionRegistry& registry, Storage& storage, std::mutex& storage_mutex, std::mutex& websocket_mutex, boost::asio::ip::tcp::socket socket): + session(ConnectionRegistry& registry, Storage& storage, std::mutex& storage_mutex, boost::asio::ip::tcp::socket socket): m_registry(registry), m_storage(storage), m_storage_mutex(storage_mutex), - m_websocket_mutex(websocket_mutex), m_ws(std::make_shared(std::move(socket))), m_connection_guard(m_registry, m_ws) { @@ -115,7 +114,8 @@ public: std::string("Reichwein.IT Whiteboard")); })); - boost::beast::http::async_read(m_ws->next_layer(), m_buffer, m_parser, boost::beast::bind_front_handler(&session::on_read_handshake, shared_from_this())); + boost::beast::http::async_read(m_ws->next_layer(), m_buffer, m_parser, + boost::asio::bind_executor(m_ws->next_layer().get_executor(), boost::beast::bind_front_handler(&session::on_read_handshake, shared_from_this()))); } void on_read_handshake(boost::beast::error_code ec, std::size_t bytes_transferred) @@ -173,8 +173,9 @@ public: } if (data.size() > 0) { boost::beast::ostream(m_buffer) << data; - std::lock_guard lock(m_websocket_mutex); - m_ws->async_write(m_buffer.data(), boost::beast::bind_front_handler(&session::on_write, shared_from_this())); + m_ws->async_write(m_buffer.data(), + boost::asio::bind_executor(m_ws->next_layer().get_executor(), + boost::beast::bind_front_handler(&session::on_write, shared_from_this()))); } else { do_read(); } @@ -196,22 +197,31 @@ public: do_read_handshake(); } + void on_write_notify(std::shared_ptr data, std::shared_ptr buffer, boost::beast::error_code ec, std::size_t bytes_transferred) + { + boost::ignore_unused(bytes_transferred); + + if (ec) { + std::cerr << "Error on session write notify: " << ec.message() << std::endl; + } + } + void notify_other_connections_diff(const std::string& id, const Diff& diff) { std::for_each(m_registry.begin(id), m_registry.end(id), [&](const connection& ci) { if (m_ws != ci) { - boost::beast::flat_buffer buffer; pt::ptree ptree {make_ptree({ {"type", "getdiff"}, {"revision", std::to_string(m_storage.getRevision(id))}, {"pos", std::to_string(m_storage.getCursorPos(id)) } })}; ptree.put_child("serverinfo.diff", diff.get_structure().get_child("diff")); - boost::beast::ostream(buffer) << Reichwein::XML::plain_xml(ptree); - std::lock_guard lock(m_websocket_mutex); + auto data{std::make_shared(Reichwein::XML::plain_xml(ptree))}; + auto buffer{std::make_shared(data->data(), data->size())}; try { - ci->write(buffer.data()); + ci->async_write(*buffer, boost::asio::bind_executor(ci->next_layer().get_executor(), + boost::beast::bind_front_handler(&session::on_write_notify, shared_from_this(), data, buffer))); } catch (const std::exception& ex) { std::cerr << "Warning: Notify getdiff write for " << ci << " not possible, id " << id << std::endl; m_registry.dump(); @@ -225,14 +235,14 @@ public: std::for_each(m_registry.begin(id), m_registry.end(id), [&](const connection& ci) { if (m_ws != ci) { - boost::beast::flat_buffer buffer; - boost::beast::ostream(buffer) << make_xml({ + auto data{std::make_shared(make_xml({ {"type", "getpos"}, {"pos", std::to_string(m_storage.getCursorPos(id)) } - }); - std::lock_guard lock(m_websocket_mutex); + }))}; + auto buffer{std::make_shared(data->data(), data->size())}; try { - ci->write(buffer.data()); + ci->async_write(*buffer, boost::asio::bind_executor(ci->next_layer().get_executor(), + boost::beast::bind_front_handler(&session::on_write_notify, shared_from_this(), data, buffer))); } catch (const std::exception& ex) { std::cerr << "Warning: Notify getpos write for " << ci << " not possible, id " << id << std::endl; m_registry.dump(); @@ -348,7 +358,6 @@ private: ConnectionRegistry& m_registry; Storage& m_storage; std::mutex& m_storage_mutex; - std::mutex& m_websocket_mutex; connection m_ws; ConnectionRegistry::RegistryGuard m_connection_guard; @@ -370,7 +379,7 @@ void Whiteboard::on_accept(boost::system::error_code ec, boost::asio::ip::tcp::s if (ec) { std::cerr << "Error on accept: " << ec.message() << std::endl; } else { - std::make_shared(m_registry, *m_storage, m_storage_mutex, m_websocket_mutex, std::move(socket))->run(); + std::make_shared(m_registry, *m_storage, m_storage_mutex, std::move(socket))->run(); } do_accept(); diff --git a/whiteboard.h b/whiteboard.h index 15d764a..7648bd4 100644 --- a/whiteboard.h +++ b/whiteboard.h @@ -22,7 +22,6 @@ private: std::unique_ptr m_config; std::unique_ptr m_storage; std::mutex m_storage_mutex; - std::mutex m_websocket_mutex; ConnectionRegistry m_registry; -- cgit v1.2.3