summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorRoland Reichwein <mail@reichwein.it>2023-02-05 10:09:33 +0100
committerRoland Reichwein <mail@reichwein.it>2023-02-05 10:09:33 +0100
commitca81dcf08d9a3bf49b3d540e3b3b792bfc3b3016 (patch)
tree161fb4b98aae491541d82080b1dd2fd75d120c23
parent2d48c70f2f76d90e2e3e6c1badec7a4f0e6c623b (diff)
Remove websocket mutex in favour of boost strands
-rw-r--r--whiteboard.cpp41
-rw-r--r--whiteboard.h1
2 files changed, 25 insertions, 17 deletions
diff --git a/whiteboard.cpp b/whiteboard.cpp
index ea56cc3..922bd37 100644
--- a/whiteboard.cpp
+++ b/whiteboard.cpp
@@ -95,11 +95,10 @@ class session: public std::enable_shared_from_this<session>
public:
using connection = std::shared_ptr<boost::beast::websocket::stream<boost::asio::ip::tcp::socket>>;
- session(ConnectionRegistry& registry, Storage& storage, std::mutex& storage_mutex, std::mutex& websocket_mutex, boost::asio::ip::tcp::socket socket):
+ session(ConnectionRegistry& registry, Storage& storage, std::mutex& storage_mutex, boost::asio::ip::tcp::socket socket):
m_registry(registry),
m_storage(storage),
m_storage_mutex(storage_mutex),
- m_websocket_mutex(websocket_mutex),
m_ws(std::make_shared<connection::element_type>(std::move(socket))),
m_connection_guard(m_registry, m_ws)
{
@@ -115,7 +114,8 @@ public:
std::string("Reichwein.IT Whiteboard"));
}));
- boost::beast::http::async_read(m_ws->next_layer(), m_buffer, m_parser, boost::beast::bind_front_handler(&session::on_read_handshake, shared_from_this()));
+ boost::beast::http::async_read(m_ws->next_layer(), m_buffer, m_parser,
+ boost::asio::bind_executor(m_ws->next_layer().get_executor(), boost::beast::bind_front_handler(&session::on_read_handshake, shared_from_this())));
}
void on_read_handshake(boost::beast::error_code ec, std::size_t bytes_transferred)
@@ -173,8 +173,9 @@ public:
}
if (data.size() > 0) {
boost::beast::ostream(m_buffer) << data;
- std::lock_guard<std::mutex> lock(m_websocket_mutex);
- m_ws->async_write(m_buffer.data(), boost::beast::bind_front_handler(&session::on_write, shared_from_this()));
+ m_ws->async_write(m_buffer.data(),
+ boost::asio::bind_executor(m_ws->next_layer().get_executor(),
+ boost::beast::bind_front_handler(&session::on_write, shared_from_this())));
} else {
do_read();
}
@@ -196,22 +197,31 @@ public:
do_read_handshake();
}
+ void on_write_notify(std::shared_ptr<std::string> data, std::shared_ptr<boost::asio::const_buffer> buffer, boost::beast::error_code ec, std::size_t bytes_transferred)
+ {
+ boost::ignore_unused(bytes_transferred);
+
+ if (ec) {
+ std::cerr << "Error on session write notify: " << ec.message() << std::endl;
+ }
+ }
+
void notify_other_connections_diff(const std::string& id, const Diff& diff)
{
std::for_each(m_registry.begin(id), m_registry.end(id), [&](const connection& ci)
{
if (m_ws != ci) {
- boost::beast::flat_buffer buffer;
pt::ptree ptree {make_ptree({
{"type", "getdiff"},
{"revision", std::to_string(m_storage.getRevision(id))},
{"pos", std::to_string(m_storage.getCursorPos(id)) }
})};
ptree.put_child("serverinfo.diff", diff.get_structure().get_child("diff"));
- boost::beast::ostream(buffer) << Reichwein::XML::plain_xml(ptree);
- std::lock_guard<std::mutex> lock(m_websocket_mutex);
+ auto data{std::make_shared<std::string>(Reichwein::XML::plain_xml(ptree))};
+ auto buffer{std::make_shared<boost::asio::const_buffer>(data->data(), data->size())};
try {
- ci->write(buffer.data());
+ ci->async_write(*buffer, boost::asio::bind_executor(ci->next_layer().get_executor(),
+ boost::beast::bind_front_handler(&session::on_write_notify, shared_from_this(), data, buffer)));
} catch (const std::exception& ex) {
std::cerr << "Warning: Notify getdiff write for " << ci << " not possible, id " << id << std::endl;
m_registry.dump();
@@ -225,14 +235,14 @@ public:
std::for_each(m_registry.begin(id), m_registry.end(id), [&](const connection& ci)
{
if (m_ws != ci) {
- boost::beast::flat_buffer buffer;
- boost::beast::ostream(buffer) << make_xml({
+ auto data{std::make_shared<std::string>(make_xml({
{"type", "getpos"},
{"pos", std::to_string(m_storage.getCursorPos(id)) }
- });
- std::lock_guard<std::mutex> lock(m_websocket_mutex);
+ }))};
+ auto buffer{std::make_shared<boost::asio::const_buffer>(data->data(), data->size())};
try {
- ci->write(buffer.data());
+ ci->async_write(*buffer, boost::asio::bind_executor(ci->next_layer().get_executor(),
+ boost::beast::bind_front_handler(&session::on_write_notify, shared_from_this(), data, buffer)));
} catch (const std::exception& ex) {
std::cerr << "Warning: Notify getpos write for " << ci << " not possible, id " << id << std::endl;
m_registry.dump();
@@ -348,7 +358,6 @@ private:
ConnectionRegistry& m_registry;
Storage& m_storage;
std::mutex& m_storage_mutex;
- std::mutex& m_websocket_mutex;
connection m_ws;
ConnectionRegistry::RegistryGuard m_connection_guard;
@@ -370,7 +379,7 @@ void Whiteboard::on_accept(boost::system::error_code ec, boost::asio::ip::tcp::s
if (ec) {
std::cerr << "Error on accept: " << ec.message() << std::endl;
} else {
- std::make_shared<session>(m_registry, *m_storage, m_storage_mutex, m_websocket_mutex, std::move(socket))->run();
+ std::make_shared<session>(m_registry, *m_storage, m_storage_mutex, std::move(socket))->run();
}
do_accept();
diff --git a/whiteboard.h b/whiteboard.h
index 15d764a..7648bd4 100644
--- a/whiteboard.h
+++ b/whiteboard.h
@@ -22,7 +22,6 @@ private:
std::unique_ptr<Config> m_config;
std::unique_ptr<Storage> m_storage;
std::mutex m_storage_mutex;
- std::mutex m_websocket_mutex;
ConnectionRegistry m_registry;