From 96476044387e98ee1ee7a6eb992b521bd447813c Mon Sep 17 00:00:00 2001 From: Roland Reichwein Date: Fri, 3 Mar 2023 16:55:33 +0100 Subject: Renamed whiteboard to webchat --- webchat.cpp | 571 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 571 insertions(+) create mode 100644 webchat.cpp (limited to 'webchat.cpp') diff --git a/webchat.cpp b/webchat.cpp new file mode 100644 index 0000000..c752ae0 --- /dev/null +++ b/webchat.cpp @@ -0,0 +1,571 @@ +#include "webchat.h" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "libreichwein/base64.h" +#include "libreichwein/file.h" +#include "libreichwein/tempfile.h" +#include "libreichwein/xml.h" + +#include "config.h" +#include "diff.h" +#include "qrcode.h" +#include "storage.h" + +namespace pt = boost::property_tree; +using namespace std::string_literals; +using namespace std::placeholders; +namespace fs = std::filesystem; +namespace bp = boost::process; + +namespace { + + void usage() { + std::cout << + "Usage: \n" + " whiteboard [options]\n" + "\n" + "Options:\n" + " -c : specify configuration file including path\n" + " -C : clean up database according to timeout rules (config: maxage)\n" + " -h : this help\n" + "\n" + "Without options, whiteboard will be started as websocket application" + << std::endl; + } + +} // namespace + +Whiteboard::Whiteboard() +{ +} + +namespace { + +pt::ptree make_ptree(const std::initializer_list>& key_values) +{ + pt::ptree ptree; + for (const auto& i: key_values) { + ptree.put(fmt::format("serverinfo.{}", i.first), i.second); + } + + return ptree; +} + +std::string make_xml(const std::initializer_list>& key_values) +{ + pt::ptree ptree{make_ptree(key_values)}; + return Reichwein::XML::plain_xml(ptree); +} + +// throws on invalid id +void validate_id(const std::string& id) { + if (!id.size()) + throw std::runtime_error("Invalid id (empty)"); + + if (id.size() > 16) + throw std::runtime_error("Invalid id (size > 16)"); + + for (const auto c: id) { + if (!((c >= '0' && c <= '9') || (c >= 'a' && c <= 'z'))) + throw std::runtime_error("Invalid id char: "s + c); + } +} + +} // namespace + +class session: public std::enable_shared_from_this +{ +public: + using connection = std::shared_ptr>; + + session(ConnectionRegistry& registry, Storage& storage, std::mutex& storage_mutex, boost::asio::ip::tcp::socket socket): + m_registry(registry), + m_storage(storage), + m_storage_mutex(storage_mutex), + m_ws(std::make_shared(std::move(socket))), + m_connection_guard(m_registry, m_ws) + { + } + + ~session() + { + if (m_stats_timer) + m_stats_timer->cancel(); + m_stats_timer = nullptr; + } + + void do_read_handshake() + { + // Set a decorator to change the Server of the handshake + m_ws->set_option(boost::beast::websocket::stream_base::decorator( + [](boost::beast::websocket::response_type& res) + { + res.set(boost::beast::http::field::server, + std::string("Reichwein.IT Whiteboard")); + })); + + boost::beast::http::async_read(m_ws->next_layer(), m_buffer, m_parser, + boost::asio::bind_executor(m_ws->next_layer().get_executor(), boost::beast::bind_front_handler(&session::on_read_handshake, shared_from_this()))); + } + + void on_read_handshake(boost::beast::error_code ec, std::size_t bytes_transferred) + { + boost::ignore_unused(bytes_transferred); + if (ec) { + std::cerr << "Error on session handshake read: " << ec.message() << std::endl; + } else { + do_accept_handshake(); + } + } + + void do_accept_handshake() + { + m_req = m_parser.get(); + + m_ws->async_accept(m_req, boost::beast::bind_front_handler(&session::on_accept_handshake, shared_from_this())); + } + + void on_accept_handshake(boost::beast::error_code ec) + { + if (ec) { + std::cerr << "Error on session handshake accept: " << ec.message() << std::endl; + } else { + do_read(); + } + } + + void do_read() + { + if (m_buffer.size() > 0) { + m_buffer.consume(m_buffer.size()); + } + + m_ws->async_read(m_buffer, boost::beast::bind_front_handler(&session::on_read, shared_from_this())); + } + + void on_read(boost::beast::error_code ec, std::size_t bytes_transferred) + { + boost::ignore_unused(bytes_transferred); + if (ec) { + if (ec != boost::beast::websocket::error::closed) + std::cerr << "Error on session read: " << ec.message() << std::endl; + } else { + do_write(); + } + } + + void do_write() + { + m_ws->text(m_ws->got_text()); + std::string data(boost::asio::buffers_begin(m_buffer.data()), boost::asio::buffers_end(m_buffer.data())); + data = handle_request(data); + if (m_buffer.size() > 0) { + m_buffer.consume(m_buffer.size()); + } + if (data.size() > 0) { + boost::beast::ostream(m_buffer) << data; + m_ws->async_write(m_buffer.data(), + boost::asio::bind_executor(m_ws->next_layer().get_executor(), + boost::beast::bind_front_handler(&session::on_write, shared_from_this()))); + } else { + do_read(); + } + } + + void on_write(boost::beast::error_code ec, std::size_t bytes_transferred) + { + boost::ignore_unused(bytes_transferred); + + if (ec) { + std::cerr << "Error on session write: " << ec.message() << std::endl; + } else { + do_read(); + } + } + + void run() + { + do_read_handshake(); + } + + void on_write_notify(std::shared_ptr data, std::shared_ptr buffer, boost::beast::error_code ec, std::size_t bytes_transferred) + { + boost::ignore_unused(bytes_transferred); + + if (ec) { + std::cerr << "Error on session write notify: " << ec.message() << std::endl; + } + } + + void notify_other_connections_diff(const std::string& id, const Diff& diff) + { + std::for_each(m_registry.begin(id), m_registry.end(id), [&](const connection& ci) + { + if (m_ws != ci) { + pt::ptree ptree {make_ptree({ + {"type", "getdiff"}, + {"revision", std::to_string(m_storage.getRevision(id))}, + {"pos", std::to_string(m_storage.getCursorPos(id)) } + })}; + ptree.put_child("serverinfo.diff", diff.get_structure().get_child("diff")); + auto data{std::make_shared(Reichwein::XML::plain_xml(ptree))}; + auto buffer{std::make_shared(data->data(), data->size())}; + try { + ci->async_write(*buffer, boost::asio::bind_executor(ci->next_layer().get_executor(), + boost::beast::bind_front_handler(&session::on_write_notify, shared_from_this(), data, buffer))); + } catch (const std::exception& ex) { + std::cerr << "Warning: Notify getdiff write for " << ci << " not possible, id " << id << std::endl; + m_registry.dump(); + } + } + }); + } + + void notify_other_connections_pos(const std::string& id) + { + std::for_each(m_registry.begin(id), m_registry.end(id), [&](const connection& ci) + { + if (m_ws != ci) { + auto data{std::make_shared(make_xml({ + {"type", "getpos"}, + {"pos", std::to_string(m_storage.getCursorPos(id)) } + }))}; + auto buffer{std::make_shared(data->data(), data->size())}; + try { + ci->async_write(*buffer, boost::asio::bind_executor(ci->next_layer().get_executor(), + boost::beast::bind_front_handler(&session::on_write_notify, shared_from_this(), data, buffer))); + } catch (const std::exception& ex) { + std::cerr << "Warning: Notify getpos write for " << ci << " not possible, id " << id << std::endl; + m_registry.dump(); + } + } + }); + } + + std::string stats_xml() + { + return make_xml({ + {"type", "stats" }, + {"dbsizegross", std::to_string(m_storage.dbsize_gross()) }, + {"dbsizenet", std::to_string(m_storage.dbsize_net()) }, + {"numberofdocuments", std::to_string(m_storage.getNumberOfDocuments()) }, + {"numberofconnections", std::to_string(m_registry.number_of_connections()) }, + }); + } + + void on_write_stats(std::shared_ptr data, std::shared_ptr buffer, boost::beast::error_code ec, std::size_t bytes_transferred) + { + boost::ignore_unused(bytes_transferred); + + if (ec) { + std::cerr << "Error on session write stats: " << ec.message() << std::endl; + } + } + + void stats_callback(const boost::system::error_code&) + { + if (m_stats_timer) { + auto data{std::make_shared(stats_xml())}; + auto buffer{std::make_shared(data->data(), data->size())}; + m_ws->async_write(*buffer, boost::asio::bind_executor(m_ws->next_layer().get_executor(), + boost::beast::bind_front_handler(&session::on_write_stats, shared_from_this(), data, buffer))); + + m_stats_timer->expires_at(m_stats_timer->expires_at() + boost::asio::chrono::seconds(5)); + m_stats_timer->async_wait(boost::beast::bind_front_handler(&session::stats_callback, this)); + } + } + + void setup_stats_timer() + { + if (!m_stats_timer) { + m_stats_timer = std::make_shared(m_ws->next_layer().get_executor(), boost::asio::chrono::seconds(5)); + m_stats_timer->async_wait(boost::beast::bind_front_handler(&session::stats_callback, this)); + } + } + + std::string handle_request(const std::string& request) + { + try { + std::lock_guard lock(m_storage_mutex); + + pt::ptree xml; + std::istringstream ss{request}; + pt::xml_parser::read_xml(ss, xml); + + std::string command {xml.get("request.command")}; + + if (command == "modify") { + std::string id {xml.get("request.id")}; + validate_id(id); + + int baserev {xml.get("request.baserev")}; + if (baserev != m_storage.getRevision(id)) + return make_xml({{"type", "error"}, {"message", "Bad base revision ("s + std::to_string(baserev) + "). Current: "s + std::to_string(m_storage.getRevision(id)) }}); + + pt::ptree ptree; + ptree.put_child("diff", xml.get_child("request.diff")); + Diff d{ptree}; + if (!d.empty()) { + std::string data {m_storage.getDocument(id)}; + data = d.apply(data); + + m_storage.setDocument(id, data); + m_registry.setId(m_ws, id); + notify_other_connections_diff(id, d); + } + + int pos {xml.get("request.pos")}; + if (m_storage.getCursorPos(id) != pos) { + m_storage.setCursorPos(id, pos); + notify_other_connections_pos(id); + } + return make_xml({{"type", "modify"}, {"revision", std::to_string(m_storage.getRevision(id)) }}); + } else if (command == "cursorpos") { + std::string id {xml.get("request.id")}; + validate_id(id); + int pos {xml.get("request.pos")}; + if (m_storage.getCursorPos(id) != pos) { + m_storage.setCursorPos(id, pos); + notify_other_connections_pos(id); + } + return {}; + } else if (command == "getfile") { + std::string id {xml.get("request.id")}; + validate_id(id); + + std::string filedata; + try { + filedata = m_storage.getDocument(id); + } catch (const std::runtime_error&) { + m_storage.setDocument(id, filedata); + } + + if (filedata.size() > 30000000) + throw std::runtime_error("File too big"); + m_registry.setId(m_ws, id); + + return make_xml({ + {"type", "getfile"}, + {"data", filedata}, + {"revision", std::to_string(m_storage.getRevision(id)) }, + {"pos", std::to_string(m_storage.getCursorPos(id)) } + }); + } else if (command == "getpos") { + std::string id {xml.get("request.id")}; + validate_id(id); + + return make_xml({ + {"type", "getpos"}, + {"pos", std::to_string(m_storage.getCursorPos(id)) } + }); + } else if (command == "newid") { + return make_xml({{"type", "newid"}, {"id", m_storage.generate_id()}}); + } else if (command == "qrcode") { + std::string url{xml.get("request.url")}; + + if (url.size() > 1000) + throw std::runtime_error("URL too big"); + + std::string pngdata {QRCode::getQRCode(url)}; + + return make_xml({{"type", "qrcode"}, {"png", Reichwein::Base64::encode64(pngdata)}}); + } else if (command == "getversion") { + return make_xml({ + {"type", "version"}, + {"version", WHITEBOARD_VERSION } + }); + } else if (command == "getstats") { + setup_stats_timer(); + return stats_xml(); + } else if (command == "pdf") { + std::string id {xml.get("request.id")}; + validate_id(id); + Reichwein::Tempfile mdFilePath{".md"}; + Reichwein::File::setFile(mdFilePath.getPath(), m_storage.getDocument(id)); + Reichwein::Tempfile pdfFilePath{".pdf"}; + int system_result{bp::system("/usr/bin/pandoc", mdFilePath.getPath().generic_string(), "-o", pdfFilePath.getPath().generic_string())}; + if (system_result) + throw std::runtime_error("pandoc returned: "s + std::to_string(system_result)); + std::string pdfdata{Reichwein::File::getFile(pdfFilePath.getPath())}; + return make_xml({{"type", "pdf"}, {"pdf", Reichwein::Base64::encode64(pdfdata)}}); + } else { + throw std::runtime_error("Bad command: "s + command); + } + + } catch (const std::exception& ex) { + return make_xml({{"type", "error"}, {"message", "Message handling error: "s + ex.what()}}); + } + } +private: + ConnectionRegistry& m_registry; + Storage& m_storage; + std::mutex& m_storage_mutex; + connection m_ws; + ConnectionRegistry::RegistryGuard m_connection_guard; + + boost::beast::http::request_parser m_parser; + boost::beast::http::request m_req; + boost::beast::flat_buffer m_buffer; + + std::shared_ptr m_stats_timer{}; +}; + +void Whiteboard::do_accept() +{ + // The new connection gets its own strand + m_acceptor->async_accept(boost::asio::make_strand(*m_ioc), + std::bind(&Whiteboard::on_accept, this, _1, _2)); +} + +void Whiteboard::on_accept(boost::system::error_code ec, boost::asio::ip::tcp::socket socket) +{ + if (ec) { + std::cerr << "Error on accept: " << ec.message() << std::endl; + } else { + if (m_registry.number_of_connections() >= m_config->getMaxConnections()) { + // limit reached + socket.close(); + } else { + std::make_shared(m_registry, *m_storage, m_storage_mutex, std::move(socket))->run(); + } + } + + do_accept(); +} + +// for long running connections, don't timeout them but touch associated ids +// regularly, at cleanup time +void Whiteboard::touch_all_connections() +{ + std::for_each(m_registry.begin(), m_registry.end(), [&](const std::pair& i) + { + m_storage->touchDocument(i.second); + }); +} + +// the actual main() for testability +int Whiteboard::run(int argc, char* argv[]) +{ + try { + bool flag_cleanup{}; + fs::path configFile; + + if (argc == 2) { + if (argv[1] == "-h"s || argv[1] == "-?"s) { + usage(); + exit(0); + } else if (argv[1] == "-C"s) { + flag_cleanup = true; + } + } else if (argc == 3) { + if (argv[1] == "-c"s) { + configFile = argv[2]; + } + } + + if (configFile.empty()) + m_config = std::make_unique(); + else + m_config = std::make_unique(configFile); + + m_storage = std::make_unique(*m_config); + + if (flag_cleanup) { + m_storage->cleanup(); + exit(0); + } + + QRCode::init(); + + auto const address = boost::asio::ip::make_address(m_config->getListenAddress()); + auto const port = static_cast(m_config->getListenPort()); + + // The io_context is required for all I/O + m_ioc = std::make_unique(m_config->getThreads()); + + // for now, just terminate on SIGINT, SIGHUP and SIGTERM + boost::asio::signal_set signals(*m_ioc, SIGINT, SIGTERM, SIGHUP); + signals.async_wait([&](const boost::system::error_code& error, int signal_number){ + std::cout << "Terminating via signal " << signal_number << std::endl; + m_ioc->stop(); + }); + + // Storage cleanup once a day + boost::asio::steady_timer storage_cleanup_timer(*m_ioc, boost::asio::chrono::hours(24)); + std::function storage_cleanup_callback = + [&](const boost::system::error_code& error){ + std::lock_guard lock(m_storage_mutex); + if (!m_storage) + throw std::runtime_error("Storage not initialized"); + touch_all_connections(); + m_storage->cleanup(); + storage_cleanup_timer.expires_at(storage_cleanup_timer.expires_at() + boost::asio::chrono::hours(24)); + storage_cleanup_timer.async_wait(storage_cleanup_callback); + }; + storage_cleanup_timer.async_wait(storage_cleanup_callback); + + // The acceptor receives incoming connections + m_acceptor = std::make_unique(*m_ioc, boost::asio::ip::tcp::endpoint{address, port}); + + do_accept(); + + // Run the I/O service on the requested number of threads + std::vector v; + v.reserve(m_config->getThreads() - 1); + for (auto i = m_config->getThreads() - 1; i > 0; --i) { + v.emplace_back( + [&] + { + m_ioc->run(); + }); + } + m_ioc->run(); + + for (auto& t: v) { + t.join(); + } + + } catch (const std::exception& ex) { + std::cerr << "Error: " << ex.what() << std::endl; + } + + return 0; +} + -- cgit v1.2.3