#include "webchat.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "libreichwein/base64.h" #include "libreichwein/file.h" #include "libreichwein/tempfile.h" #include "libreichwein/xml.h" #include "config.h" #include "diff.h" #include "qrcode.h" #include "storage.h" namespace pt = boost::property_tree; using namespace std::string_literals; using namespace std::placeholders; namespace fs = std::filesystem; namespace bp = boost::process; namespace { void usage() { std::cout << "Usage: \n" " whiteboard [options]\n" "\n" "Options:\n" " -c : specify configuration file including path\n" " -C : clean up database according to timeout rules (config: maxage)\n" " -h : this help\n" "\n" "Without options, whiteboard will be started as websocket application" << std::endl; } } // namespace Whiteboard::Whiteboard() { } namespace { pt::ptree make_ptree(const std::initializer_list>& key_values) { pt::ptree ptree; for (const auto& i: key_values) { ptree.put(fmt::format("serverinfo.{}", i.first), i.second); } return ptree; } std::string make_xml(const std::initializer_list>& key_values) { pt::ptree ptree{make_ptree(key_values)}; return Reichwein::XML::plain_xml(ptree); } // throws on invalid id void validate_id(const std::string& id) { if (!id.size()) throw std::runtime_error("Invalid id (empty)"); if (id.size() > 16) throw std::runtime_error("Invalid id (size > 16)"); for (const auto c: id) { if (!((c >= '0' && c <= '9') || (c >= 'a' && c <= 'z'))) throw std::runtime_error("Invalid id char: "s + c); } } } // namespace class session: public std::enable_shared_from_this { public: using connection = std::shared_ptr>; session(ConnectionRegistry& registry, Storage& storage, std::mutex& storage_mutex, boost::asio::ip::tcp::socket socket): m_registry(registry), m_storage(storage), m_storage_mutex(storage_mutex), m_ws(std::make_shared(std::move(socket))), m_connection_guard(m_registry, m_ws) { } ~session() { if (m_stats_timer) m_stats_timer->cancel(); m_stats_timer = nullptr; } void do_read_handshake() { // Set a decorator to change the Server of the handshake m_ws->set_option(boost::beast::websocket::stream_base::decorator( [](boost::beast::websocket::response_type& res) { res.set(boost::beast::http::field::server, std::string("Reichwein.IT Whiteboard")); })); boost::beast::http::async_read(m_ws->next_layer(), m_buffer, m_parser, boost::asio::bind_executor(m_ws->next_layer().get_executor(), boost::beast::bind_front_handler(&session::on_read_handshake, shared_from_this()))); } void on_read_handshake(boost::beast::error_code ec, std::size_t bytes_transferred) { boost::ignore_unused(bytes_transferred); if (ec) { std::cerr << "Error on session handshake read: " << ec.message() << std::endl; } else { do_accept_handshake(); } } void do_accept_handshake() { m_req = m_parser.get(); m_ws->async_accept(m_req, boost::beast::bind_front_handler(&session::on_accept_handshake, shared_from_this())); } void on_accept_handshake(boost::beast::error_code ec) { if (ec) { std::cerr << "Error on session handshake accept: " << ec.message() << std::endl; } else { do_read(); } } void do_read() { if (m_buffer.size() > 0) { m_buffer.consume(m_buffer.size()); } m_ws->async_read(m_buffer, boost::beast::bind_front_handler(&session::on_read, shared_from_this())); } void on_read(boost::beast::error_code ec, std::size_t bytes_transferred) { boost::ignore_unused(bytes_transferred); if (ec) { if (ec != boost::beast::websocket::error::closed) std::cerr << "Error on session read: " << ec.message() << std::endl; } else { do_write(); } } void do_write() { m_ws->text(m_ws->got_text()); std::string data(boost::asio::buffers_begin(m_buffer.data()), boost::asio::buffers_end(m_buffer.data())); data = handle_request(data); if (m_buffer.size() > 0) { m_buffer.consume(m_buffer.size()); } if (data.size() > 0) { boost::beast::ostream(m_buffer) << data; m_ws->async_write(m_buffer.data(), boost::asio::bind_executor(m_ws->next_layer().get_executor(), boost::beast::bind_front_handler(&session::on_write, shared_from_this()))); } else { do_read(); } } void on_write(boost::beast::error_code ec, std::size_t bytes_transferred) { boost::ignore_unused(bytes_transferred); if (ec) { std::cerr << "Error on session write: " << ec.message() << std::endl; } else { do_read(); } } void run() { do_read_handshake(); } void on_write_notify(std::shared_ptr data, std::shared_ptr buffer, boost::beast::error_code ec, std::size_t bytes_transferred) { boost::ignore_unused(bytes_transferred); if (ec) { std::cerr << "Error on session write notify: " << ec.message() << std::endl; } } void notify_other_connections_diff(const std::string& id, const Diff& diff) { std::for_each(m_registry.begin(id), m_registry.end(id), [&](const connection& ci) { if (m_ws != ci) { pt::ptree ptree {make_ptree({ {"type", "getdiff"}, {"revision", std::to_string(m_storage.getRevision(id))}, {"pos", std::to_string(m_storage.getCursorPos(id)) } })}; ptree.put_child("serverinfo.diff", diff.get_structure().get_child("diff")); auto data{std::make_shared(Reichwein::XML::plain_xml(ptree))}; auto buffer{std::make_shared(data->data(), data->size())}; try { ci->async_write(*buffer, boost::asio::bind_executor(ci->next_layer().get_executor(), boost::beast::bind_front_handler(&session::on_write_notify, shared_from_this(), data, buffer))); } catch (const std::exception& ex) { std::cerr << "Warning: Notify getdiff write for " << ci << " not possible, id " << id << std::endl; m_registry.dump(); } } }); } void notify_other_connections_pos(const std::string& id) { std::for_each(m_registry.begin(id), m_registry.end(id), [&](const connection& ci) { if (m_ws != ci) { auto data{std::make_shared(make_xml({ {"type", "getpos"}, {"pos", std::to_string(m_storage.getCursorPos(id)) } }))}; auto buffer{std::make_shared(data->data(), data->size())}; try { ci->async_write(*buffer, boost::asio::bind_executor(ci->next_layer().get_executor(), boost::beast::bind_front_handler(&session::on_write_notify, shared_from_this(), data, buffer))); } catch (const std::exception& ex) { std::cerr << "Warning: Notify getpos write for " << ci << " not possible, id " << id << std::endl; m_registry.dump(); } } }); } std::string stats_xml() { return make_xml({ {"type", "stats" }, {"dbsizegross", std::to_string(m_storage.dbsize_gross()) }, {"dbsizenet", std::to_string(m_storage.dbsize_net()) }, {"numberofdocuments", std::to_string(m_storage.getNumberOfDocuments()) }, {"numberofconnections", std::to_string(m_registry.number_of_connections()) }, }); } void on_write_stats(std::shared_ptr data, std::shared_ptr buffer, boost::beast::error_code ec, std::size_t bytes_transferred) { boost::ignore_unused(bytes_transferred); if (ec) { std::cerr << "Error on session write stats: " << ec.message() << std::endl; } } void stats_callback(const boost::system::error_code&) { if (m_stats_timer) { auto data{std::make_shared(stats_xml())}; auto buffer{std::make_shared(data->data(), data->size())}; m_ws->async_write(*buffer, boost::asio::bind_executor(m_ws->next_layer().get_executor(), boost::beast::bind_front_handler(&session::on_write_stats, shared_from_this(), data, buffer))); m_stats_timer->expires_at(m_stats_timer->expires_at() + boost::asio::chrono::seconds(5)); m_stats_timer->async_wait(boost::beast::bind_front_handler(&session::stats_callback, this)); } } void setup_stats_timer() { if (!m_stats_timer) { m_stats_timer = std::make_shared(m_ws->next_layer().get_executor(), boost::asio::chrono::seconds(5)); m_stats_timer->async_wait(boost::beast::bind_front_handler(&session::stats_callback, this)); } } std::string handle_request(const std::string& request) { try { std::lock_guard lock(m_storage_mutex); pt::ptree xml; std::istringstream ss{request}; pt::xml_parser::read_xml(ss, xml); std::string command {xml.get("request.command")}; if (command == "modify") { std::string id {xml.get("request.id")}; validate_id(id); int baserev {xml.get("request.baserev")}; if (baserev != m_storage.getRevision(id)) return make_xml({{"type", "error"}, {"message", "Bad base revision ("s + std::to_string(baserev) + "). Current: "s + std::to_string(m_storage.getRevision(id)) }}); pt::ptree ptree; ptree.put_child("diff", xml.get_child("request.diff")); Diff d{ptree}; if (!d.empty()) { std::string data {m_storage.getDocument(id)}; data = d.apply(data); m_storage.setDocument(id, data); m_registry.setId(m_ws, id); notify_other_connections_diff(id, d); } int pos {xml.get("request.pos")}; if (m_storage.getCursorPos(id) != pos) { m_storage.setCursorPos(id, pos); notify_other_connections_pos(id); } return make_xml({{"type", "modify"}, {"revision", std::to_string(m_storage.getRevision(id)) }}); } else if (command == "cursorpos") { std::string id {xml.get("request.id")}; validate_id(id); int pos {xml.get("request.pos")}; if (m_storage.getCursorPos(id) != pos) { m_storage.setCursorPos(id, pos); notify_other_connections_pos(id); } return {}; } else if (command == "getfile") { std::string id {xml.get("request.id")}; validate_id(id); std::string filedata; try { filedata = m_storage.getDocument(id); } catch (const std::runtime_error&) { m_storage.setDocument(id, filedata); } if (filedata.size() > 30000000) throw std::runtime_error("File too big"); m_registry.setId(m_ws, id); return make_xml({ {"type", "getfile"}, {"data", filedata}, {"revision", std::to_string(m_storage.getRevision(id)) }, {"pos", std::to_string(m_storage.getCursorPos(id)) } }); } else if (command == "getpos") { std::string id {xml.get("request.id")}; validate_id(id); return make_xml({ {"type", "getpos"}, {"pos", std::to_string(m_storage.getCursorPos(id)) } }); } else if (command == "newid") { return make_xml({{"type", "newid"}, {"id", m_storage.generate_id()}}); } else if (command == "qrcode") { std::string url{xml.get("request.url")}; if (url.size() > 1000) throw std::runtime_error("URL too big"); std::string pngdata {QRCode::getQRCode(url)}; return make_xml({{"type", "qrcode"}, {"png", Reichwein::Base64::encode64(pngdata)}}); } else if (command == "getversion") { return make_xml({ {"type", "version"}, {"version", WHITEBOARD_VERSION } }); } else if (command == "getstats") { setup_stats_timer(); return stats_xml(); } else if (command == "pdf") { std::string id {xml.get("request.id")}; validate_id(id); Reichwein::Tempfile mdFilePath{".md"}; Reichwein::File::setFile(mdFilePath.getPath(), m_storage.getDocument(id)); Reichwein::Tempfile pdfFilePath{".pdf"}; int system_result{bp::system("/usr/bin/pandoc", mdFilePath.getPath().generic_string(), "-o", pdfFilePath.getPath().generic_string())}; if (system_result) throw std::runtime_error("pandoc returned: "s + std::to_string(system_result)); std::string pdfdata{Reichwein::File::getFile(pdfFilePath.getPath())}; return make_xml({{"type", "pdf"}, {"pdf", Reichwein::Base64::encode64(pdfdata)}}); } else { throw std::runtime_error("Bad command: "s + command); } } catch (const std::exception& ex) { return make_xml({{"type", "error"}, {"message", "Message handling error: "s + ex.what()}}); } } private: ConnectionRegistry& m_registry; Storage& m_storage; std::mutex& m_storage_mutex; connection m_ws; ConnectionRegistry::RegistryGuard m_connection_guard; boost::beast::http::request_parser m_parser; boost::beast::http::request m_req; boost::beast::flat_buffer m_buffer; std::shared_ptr m_stats_timer{}; }; void Whiteboard::do_accept() { // The new connection gets its own strand m_acceptor->async_accept(boost::asio::make_strand(*m_ioc), std::bind(&Whiteboard::on_accept, this, _1, _2)); } void Whiteboard::on_accept(boost::system::error_code ec, boost::asio::ip::tcp::socket socket) { if (ec) { std::cerr << "Error on accept: " << ec.message() << std::endl; } else { if (m_registry.number_of_connections() >= m_config->getMaxConnections()) { // limit reached socket.close(); } else { std::make_shared(m_registry, *m_storage, m_storage_mutex, std::move(socket))->run(); } } do_accept(); } // for long running connections, don't timeout them but touch associated ids // regularly, at cleanup time void Whiteboard::touch_all_connections() { std::for_each(m_registry.begin(), m_registry.end(), [&](const std::pair& i) { m_storage->touchDocument(i.second); }); } // the actual main() for testability int Whiteboard::run(int argc, char* argv[]) { try { bool flag_cleanup{}; fs::path configFile; if (argc == 2) { if (argv[1] == "-h"s || argv[1] == "-?"s) { usage(); exit(0); } else if (argv[1] == "-C"s) { flag_cleanup = true; } } else if (argc == 3) { if (argv[1] == "-c"s) { configFile = argv[2]; } } if (configFile.empty()) m_config = std::make_unique(); else m_config = std::make_unique(configFile); m_storage = std::make_unique(*m_config); if (flag_cleanup) { m_storage->cleanup(); exit(0); } QRCode::init(); auto const address = boost::asio::ip::make_address(m_config->getListenAddress()); auto const port = static_cast(m_config->getListenPort()); // The io_context is required for all I/O m_ioc = std::make_unique(m_config->getThreads()); // for now, just terminate on SIGINT, SIGHUP and SIGTERM boost::asio::signal_set signals(*m_ioc, SIGINT, SIGTERM, SIGHUP); signals.async_wait([&](const boost::system::error_code& error, int signal_number){ std::cout << "Terminating via signal " << signal_number << std::endl; m_ioc->stop(); }); // Storage cleanup once a day boost::asio::steady_timer storage_cleanup_timer(*m_ioc, boost::asio::chrono::hours(24)); std::function storage_cleanup_callback = [&](const boost::system::error_code& error){ std::lock_guard lock(m_storage_mutex); if (!m_storage) throw std::runtime_error("Storage not initialized"); touch_all_connections(); m_storage->cleanup(); storage_cleanup_timer.expires_at(storage_cleanup_timer.expires_at() + boost::asio::chrono::hours(24)); storage_cleanup_timer.async_wait(storage_cleanup_callback); }; storage_cleanup_timer.async_wait(storage_cleanup_callback); // The acceptor receives incoming connections m_acceptor = std::make_unique(*m_ioc, boost::asio::ip::tcp::endpoint{address, port}); do_accept(); // Run the I/O service on the requested number of threads std::vector v; v.reserve(m_config->getThreads() - 1); for (auto i = m_config->getThreads() - 1; i > 0; --i) { v.emplace_back( [&] { m_ioc->run(); }); } m_ioc->run(); for (auto& t: v) { t.join(); } } catch (const std::exception& ex) { std::cerr << "Error: " << ex.what() << std::endl; } return 0; }