summaryrefslogtreecommitdiffhomepage
path: root/webchat.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'webchat.cpp')
-rw-r--r--webchat.cpp571
1 files changed, 571 insertions, 0 deletions
diff --git a/webchat.cpp b/webchat.cpp
new file mode 100644
index 0000000..c752ae0
--- /dev/null
+++ b/webchat.cpp
@@ -0,0 +1,571 @@
+#include "webchat.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <dirent.h>
+#include <sys/types.h>
+
+#include <chrono>
+#include <initializer_list>
+#include <iostream>
+#include <functional>
+#include <filesystem>
+#include <memory>
+#include <mutex>
+#include <regex>
+#include <sstream>
+#include <string>
+#include <thread>
+#include <unordered_map>
+
+#include <boost/algorithm/string/predicate.hpp>
+#include <boost/algorithm/string/trim.hpp>
+#include <boost/property_tree/xml_parser.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/beast/core.hpp>
+#include <boost/beast/http.hpp>
+#include <boost/beast/websocket.hpp>
+#include <boost/beast/version.hpp>
+#include <boost/asio/buffer.hpp>
+#include <boost/asio/buffers_iterator.hpp>
+#include <boost/asio/connect.hpp>
+#include <boost/asio/ip/tcp.hpp>
+#include <boost/asio/signal_set.hpp>
+#include <boost/asio/strand.hpp>
+#include <boost/process.hpp>
+
+#include <fmt/core.h>
+
+#include "libreichwein/base64.h"
+#include "libreichwein/file.h"
+#include "libreichwein/tempfile.h"
+#include "libreichwein/xml.h"
+
+#include "config.h"
+#include "diff.h"
+#include "qrcode.h"
+#include "storage.h"
+
+namespace pt = boost::property_tree;
+using namespace std::string_literals;
+using namespace std::placeholders;
+namespace fs = std::filesystem;
+namespace bp = boost::process;
+
+namespace {
+
+ void usage() {
+ std::cout <<
+ "Usage: \n"
+ " whiteboard [options]\n"
+ "\n"
+ "Options:\n"
+ " -c <path> : specify configuration file including path\n"
+ " -C : clean up database according to timeout rules (config: maxage)\n"
+ " -h : this help\n"
+ "\n"
+ "Without options, whiteboard will be started as websocket application"
+ << std::endl;
+ }
+
+} // namespace
+
+Whiteboard::Whiteboard()
+{
+}
+
+namespace {
+
+pt::ptree make_ptree(const std::initializer_list<std::pair<std::string, std::string>>& key_values)
+{
+ pt::ptree ptree;
+ for (const auto& i: key_values) {
+ ptree.put(fmt::format("serverinfo.{}", i.first), i.second);
+ }
+
+ return ptree;
+}
+
+std::string make_xml(const std::initializer_list<std::pair<std::string, std::string>>& key_values)
+{
+ pt::ptree ptree{make_ptree(key_values)};
+ return Reichwein::XML::plain_xml(ptree);
+}
+
+// throws on invalid id
+void validate_id(const std::string& id) {
+ if (!id.size())
+ throw std::runtime_error("Invalid id (empty)");
+
+ if (id.size() > 16)
+ throw std::runtime_error("Invalid id (size > 16)");
+
+ for (const auto c: id) {
+ if (!((c >= '0' && c <= '9') || (c >= 'a' && c <= 'z')))
+ throw std::runtime_error("Invalid id char: "s + c);
+ }
+}
+
+} // namespace
+
+class session: public std::enable_shared_from_this<session>
+{
+public:
+ using connection = std::shared_ptr<boost::beast::websocket::stream<boost::asio::ip::tcp::socket>>;
+
+ session(ConnectionRegistry& registry, Storage& storage, std::mutex& storage_mutex, boost::asio::ip::tcp::socket socket):
+ m_registry(registry),
+ m_storage(storage),
+ m_storage_mutex(storage_mutex),
+ m_ws(std::make_shared<connection::element_type>(std::move(socket))),
+ m_connection_guard(m_registry, m_ws)
+ {
+ }
+
+ ~session()
+ {
+ if (m_stats_timer)
+ m_stats_timer->cancel();
+ m_stats_timer = nullptr;
+ }
+
+ void do_read_handshake()
+ {
+ // Set a decorator to change the Server of the handshake
+ m_ws->set_option(boost::beast::websocket::stream_base::decorator(
+ [](boost::beast::websocket::response_type& res)
+ {
+ res.set(boost::beast::http::field::server,
+ std::string("Reichwein.IT Whiteboard"));
+ }));
+
+ boost::beast::http::async_read(m_ws->next_layer(), m_buffer, m_parser,
+ boost::asio::bind_executor(m_ws->next_layer().get_executor(), boost::beast::bind_front_handler(&session::on_read_handshake, shared_from_this())));
+ }
+
+ void on_read_handshake(boost::beast::error_code ec, std::size_t bytes_transferred)
+ {
+ boost::ignore_unused(bytes_transferred);
+ if (ec) {
+ std::cerr << "Error on session handshake read: " << ec.message() << std::endl;
+ } else {
+ do_accept_handshake();
+ }
+ }
+
+ void do_accept_handshake()
+ {
+ m_req = m_parser.get();
+
+ m_ws->async_accept(m_req, boost::beast::bind_front_handler(&session::on_accept_handshake, shared_from_this()));
+ }
+
+ void on_accept_handshake(boost::beast::error_code ec)
+ {
+ if (ec) {
+ std::cerr << "Error on session handshake accept: " << ec.message() << std::endl;
+ } else {
+ do_read();
+ }
+ }
+
+ void do_read()
+ {
+ if (m_buffer.size() > 0) {
+ m_buffer.consume(m_buffer.size());
+ }
+
+ m_ws->async_read(m_buffer, boost::beast::bind_front_handler(&session::on_read, shared_from_this()));
+ }
+
+ void on_read(boost::beast::error_code ec, std::size_t bytes_transferred)
+ {
+ boost::ignore_unused(bytes_transferred);
+ if (ec) {
+ if (ec != boost::beast::websocket::error::closed)
+ std::cerr << "Error on session read: " << ec.message() << std::endl;
+ } else {
+ do_write();
+ }
+ }
+
+ void do_write()
+ {
+ m_ws->text(m_ws->got_text());
+ std::string data(boost::asio::buffers_begin(m_buffer.data()), boost::asio::buffers_end(m_buffer.data()));
+ data = handle_request(data);
+ if (m_buffer.size() > 0) {
+ m_buffer.consume(m_buffer.size());
+ }
+ if (data.size() > 0) {
+ boost::beast::ostream(m_buffer) << data;
+ m_ws->async_write(m_buffer.data(),
+ boost::asio::bind_executor(m_ws->next_layer().get_executor(),
+ boost::beast::bind_front_handler(&session::on_write, shared_from_this())));
+ } else {
+ do_read();
+ }
+ }
+
+ void on_write(boost::beast::error_code ec, std::size_t bytes_transferred)
+ {
+ boost::ignore_unused(bytes_transferred);
+
+ if (ec) {
+ std::cerr << "Error on session write: " << ec.message() << std::endl;
+ } else {
+ do_read();
+ }
+ }
+
+ void run()
+ {
+ do_read_handshake();
+ }
+
+ void on_write_notify(std::shared_ptr<std::string> data, std::shared_ptr<boost::asio::const_buffer> buffer, boost::beast::error_code ec, std::size_t bytes_transferred)
+ {
+ boost::ignore_unused(bytes_transferred);
+
+ if (ec) {
+ std::cerr << "Error on session write notify: " << ec.message() << std::endl;
+ }
+ }
+
+ void notify_other_connections_diff(const std::string& id, const Diff& diff)
+ {
+ std::for_each(m_registry.begin(id), m_registry.end(id), [&](const connection& ci)
+ {
+ if (m_ws != ci) {
+ pt::ptree ptree {make_ptree({
+ {"type", "getdiff"},
+ {"revision", std::to_string(m_storage.getRevision(id))},
+ {"pos", std::to_string(m_storage.getCursorPos(id)) }
+ })};
+ ptree.put_child("serverinfo.diff", diff.get_structure().get_child("diff"));
+ auto data{std::make_shared<std::string>(Reichwein::XML::plain_xml(ptree))};
+ auto buffer{std::make_shared<boost::asio::const_buffer>(data->data(), data->size())};
+ try {
+ ci->async_write(*buffer, boost::asio::bind_executor(ci->next_layer().get_executor(),
+ boost::beast::bind_front_handler(&session::on_write_notify, shared_from_this(), data, buffer)));
+ } catch (const std::exception& ex) {
+ std::cerr << "Warning: Notify getdiff write for " << ci << " not possible, id " << id << std::endl;
+ m_registry.dump();
+ }
+ }
+ });
+ }
+
+ void notify_other_connections_pos(const std::string& id)
+ {
+ std::for_each(m_registry.begin(id), m_registry.end(id), [&](const connection& ci)
+ {
+ if (m_ws != ci) {
+ auto data{std::make_shared<std::string>(make_xml({
+ {"type", "getpos"},
+ {"pos", std::to_string(m_storage.getCursorPos(id)) }
+ }))};
+ auto buffer{std::make_shared<boost::asio::const_buffer>(data->data(), data->size())};
+ try {
+ ci->async_write(*buffer, boost::asio::bind_executor(ci->next_layer().get_executor(),
+ boost::beast::bind_front_handler(&session::on_write_notify, shared_from_this(), data, buffer)));
+ } catch (const std::exception& ex) {
+ std::cerr << "Warning: Notify getpos write for " << ci << " not possible, id " << id << std::endl;
+ m_registry.dump();
+ }
+ }
+ });
+ }
+
+ std::string stats_xml()
+ {
+ return make_xml({
+ {"type", "stats" },
+ {"dbsizegross", std::to_string(m_storage.dbsize_gross()) },
+ {"dbsizenet", std::to_string(m_storage.dbsize_net()) },
+ {"numberofdocuments", std::to_string(m_storage.getNumberOfDocuments()) },
+ {"numberofconnections", std::to_string(m_registry.number_of_connections()) },
+ });
+ }
+
+ void on_write_stats(std::shared_ptr<std::string> data, std::shared_ptr<boost::asio::const_buffer> buffer, boost::beast::error_code ec, std::size_t bytes_transferred)
+ {
+ boost::ignore_unused(bytes_transferred);
+
+ if (ec) {
+ std::cerr << "Error on session write stats: " << ec.message() << std::endl;
+ }
+ }
+
+ void stats_callback(const boost::system::error_code&)
+ {
+ if (m_stats_timer) {
+ auto data{std::make_shared<std::string>(stats_xml())};
+ auto buffer{std::make_shared<boost::asio::const_buffer>(data->data(), data->size())};
+ m_ws->async_write(*buffer, boost::asio::bind_executor(m_ws->next_layer().get_executor(),
+ boost::beast::bind_front_handler(&session::on_write_stats, shared_from_this(), data, buffer)));
+
+ m_stats_timer->expires_at(m_stats_timer->expires_at() + boost::asio::chrono::seconds(5));
+ m_stats_timer->async_wait(boost::beast::bind_front_handler(&session::stats_callback, this));
+ }
+ }
+
+ void setup_stats_timer()
+ {
+ if (!m_stats_timer) {
+ m_stats_timer = std::make_shared<boost::asio::steady_timer>(m_ws->next_layer().get_executor(), boost::asio::chrono::seconds(5));
+ m_stats_timer->async_wait(boost::beast::bind_front_handler(&session::stats_callback, this));
+ }
+ }
+
+ std::string handle_request(const std::string& request)
+ {
+ try {
+ std::lock_guard<std::mutex> lock(m_storage_mutex);
+
+ pt::ptree xml;
+ std::istringstream ss{request};
+ pt::xml_parser::read_xml(ss, xml);
+
+ std::string command {xml.get<std::string>("request.command")};
+
+ if (command == "modify") {
+ std::string id {xml.get<std::string>("request.id")};
+ validate_id(id);
+
+ int baserev {xml.get<int>("request.baserev")};
+ if (baserev != m_storage.getRevision(id))
+ return make_xml({{"type", "error"}, {"message", "Bad base revision ("s + std::to_string(baserev) + "). Current: "s + std::to_string(m_storage.getRevision(id)) }});
+
+ pt::ptree ptree;
+ ptree.put_child("diff", xml.get_child("request.diff"));
+ Diff d{ptree};
+ if (!d.empty()) {
+ std::string data {m_storage.getDocument(id)};
+ data = d.apply(data);
+
+ m_storage.setDocument(id, data);
+ m_registry.setId(m_ws, id);
+ notify_other_connections_diff(id, d);
+ }
+
+ int pos {xml.get<int>("request.pos")};
+ if (m_storage.getCursorPos(id) != pos) {
+ m_storage.setCursorPos(id, pos);
+ notify_other_connections_pos(id);
+ }
+ return make_xml({{"type", "modify"}, {"revision", std::to_string(m_storage.getRevision(id)) }});
+ } else if (command == "cursorpos") {
+ std::string id {xml.get<std::string>("request.id")};
+ validate_id(id);
+ int pos {xml.get<int>("request.pos")};
+ if (m_storage.getCursorPos(id) != pos) {
+ m_storage.setCursorPos(id, pos);
+ notify_other_connections_pos(id);
+ }
+ return {};
+ } else if (command == "getfile") {
+ std::string id {xml.get<std::string>("request.id")};
+ validate_id(id);
+
+ std::string filedata;
+ try {
+ filedata = m_storage.getDocument(id);
+ } catch (const std::runtime_error&) {
+ m_storage.setDocument(id, filedata);
+ }
+
+ if (filedata.size() > 30000000)
+ throw std::runtime_error("File too big");
+ m_registry.setId(m_ws, id);
+
+ return make_xml({
+ {"type", "getfile"},
+ {"data", filedata},
+ {"revision", std::to_string(m_storage.getRevision(id)) },
+ {"pos", std::to_string(m_storage.getCursorPos(id)) }
+ });
+ } else if (command == "getpos") {
+ std::string id {xml.get<std::string>("request.id")};
+ validate_id(id);
+
+ return make_xml({
+ {"type", "getpos"},
+ {"pos", std::to_string(m_storage.getCursorPos(id)) }
+ });
+ } else if (command == "newid") {
+ return make_xml({{"type", "newid"}, {"id", m_storage.generate_id()}});
+ } else if (command == "qrcode") {
+ std::string url{xml.get<std::string>("request.url")};
+
+ if (url.size() > 1000)
+ throw std::runtime_error("URL too big");
+
+ std::string pngdata {QRCode::getQRCode(url)};
+
+ return make_xml({{"type", "qrcode"}, {"png", Reichwein::Base64::encode64(pngdata)}});
+ } else if (command == "getversion") {
+ return make_xml({
+ {"type", "version"},
+ {"version", WHITEBOARD_VERSION }
+ });
+ } else if (command == "getstats") {
+ setup_stats_timer();
+ return stats_xml();
+ } else if (command == "pdf") {
+ std::string id {xml.get<std::string>("request.id")};
+ validate_id(id);
+ Reichwein::Tempfile mdFilePath{".md"};
+ Reichwein::File::setFile(mdFilePath.getPath(), m_storage.getDocument(id));
+ Reichwein::Tempfile pdfFilePath{".pdf"};
+ int system_result{bp::system("/usr/bin/pandoc", mdFilePath.getPath().generic_string(), "-o", pdfFilePath.getPath().generic_string())};
+ if (system_result)
+ throw std::runtime_error("pandoc returned: "s + std::to_string(system_result));
+ std::string pdfdata{Reichwein::File::getFile(pdfFilePath.getPath())};
+ return make_xml({{"type", "pdf"}, {"pdf", Reichwein::Base64::encode64(pdfdata)}});
+ } else {
+ throw std::runtime_error("Bad command: "s + command);
+ }
+
+ } catch (const std::exception& ex) {
+ return make_xml({{"type", "error"}, {"message", "Message handling error: "s + ex.what()}});
+ }
+ }
+private:
+ ConnectionRegistry& m_registry;
+ Storage& m_storage;
+ std::mutex& m_storage_mutex;
+ connection m_ws;
+ ConnectionRegistry::RegistryGuard m_connection_guard;
+
+ boost::beast::http::request_parser<boost::beast::http::string_body> m_parser;
+ boost::beast::http::request<boost::beast::http::string_body> m_req;
+ boost::beast::flat_buffer m_buffer;
+
+ std::shared_ptr<boost::asio::steady_timer> m_stats_timer{};
+};
+
+void Whiteboard::do_accept()
+{
+ // The new connection gets its own strand
+ m_acceptor->async_accept(boost::asio::make_strand(*m_ioc),
+ std::bind(&Whiteboard::on_accept, this, _1, _2));
+}
+
+void Whiteboard::on_accept(boost::system::error_code ec, boost::asio::ip::tcp::socket socket)
+{
+ if (ec) {
+ std::cerr << "Error on accept: " << ec.message() << std::endl;
+ } else {
+ if (m_registry.number_of_connections() >= m_config->getMaxConnections()) {
+ // limit reached
+ socket.close();
+ } else {
+ std::make_shared<session>(m_registry, *m_storage, m_storage_mutex, std::move(socket))->run();
+ }
+ }
+
+ do_accept();
+}
+
+// for long running connections, don't timeout them but touch associated ids
+// regularly, at cleanup time
+void Whiteboard::touch_all_connections()
+{
+ std::for_each(m_registry.begin(), m_registry.end(), [&](const std::pair<ConnectionRegistry::connection, std::string>& i)
+ {
+ m_storage->touchDocument(i.second);
+ });
+}
+
+// the actual main() for testability
+int Whiteboard::run(int argc, char* argv[])
+{
+ try {
+ bool flag_cleanup{};
+ fs::path configFile;
+
+ if (argc == 2) {
+ if (argv[1] == "-h"s || argv[1] == "-?"s) {
+ usage();
+ exit(0);
+ } else if (argv[1] == "-C"s) {
+ flag_cleanup = true;
+ }
+ } else if (argc == 3) {
+ if (argv[1] == "-c"s) {
+ configFile = argv[2];
+ }
+ }
+
+ if (configFile.empty())
+ m_config = std::make_unique<Config>();
+ else
+ m_config = std::make_unique<Config>(configFile);
+
+ m_storage = std::make_unique<Storage>(*m_config);
+
+ if (flag_cleanup) {
+ m_storage->cleanup();
+ exit(0);
+ }
+
+ QRCode::init();
+
+ auto const address = boost::asio::ip::make_address(m_config->getListenAddress());
+ auto const port = static_cast<unsigned short>(m_config->getListenPort());
+
+ // The io_context is required for all I/O
+ m_ioc = std::make_unique<boost::asio::io_context>(m_config->getThreads());
+
+ // for now, just terminate on SIGINT, SIGHUP and SIGTERM
+ boost::asio::signal_set signals(*m_ioc, SIGINT, SIGTERM, SIGHUP);
+ signals.async_wait([&](const boost::system::error_code& error, int signal_number){
+ std::cout << "Terminating via signal " << signal_number << std::endl;
+ m_ioc->stop();
+ });
+
+ // Storage cleanup once a day
+ boost::asio::steady_timer storage_cleanup_timer(*m_ioc, boost::asio::chrono::hours(24));
+ std::function<void(const boost::system::error_code&)> storage_cleanup_callback =
+ [&](const boost::system::error_code& error){
+ std::lock_guard<std::mutex> lock(m_storage_mutex);
+ if (!m_storage)
+ throw std::runtime_error("Storage not initialized");
+ touch_all_connections();
+ m_storage->cleanup();
+ storage_cleanup_timer.expires_at(storage_cleanup_timer.expires_at() + boost::asio::chrono::hours(24));
+ storage_cleanup_timer.async_wait(storage_cleanup_callback);
+ };
+ storage_cleanup_timer.async_wait(storage_cleanup_callback);
+
+ // The acceptor receives incoming connections
+ m_acceptor = std::make_unique<boost::asio::ip::tcp::acceptor>(*m_ioc, boost::asio::ip::tcp::endpoint{address, port});
+
+ do_accept();
+
+ // Run the I/O service on the requested number of threads
+ std::vector<std::thread> v;
+ v.reserve(m_config->getThreads() - 1);
+ for (auto i = m_config->getThreads() - 1; i > 0; --i) {
+ v.emplace_back(
+ [&]
+ {
+ m_ioc->run();
+ });
+ }
+ m_ioc->run();
+
+ for (auto& t: v) {
+ t.join();
+ }
+
+ } catch (const std::exception& ex) {
+ std::cerr << "Error: " << ex.what() << std::endl;
+ }
+
+ return 0;
+}
+