#include "websocketserverprocess.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "webserver.h" #include "response.h" #include "helper.h" using namespace std::string_literals; namespace fs = std::filesystem; namespace pt = boost::property_tree; using namespace boost::unit_test; using namespace Reichwein; WebsocketServerProcess::WebsocketServerProcess() { // RAII pattern for shared memory allocation/deallocation m_shared = std::unique_ptr>( (shared_data_t*)mmap(NULL, sizeof(shared_data_t), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0), [this](shared_data_t*){munmap(m_shared.get(), sizeof(shared_data_t));}); start(); } WebsocketServerProcess::~WebsocketServerProcess() { stop(); } void WebsocketServerProcess::do_session(boost::asio::ip::tcp::socket socket) { try { // Construct the stream by moving in the socket boost::beast::websocket::stream ws{std::move(socket)}; // Set a decorator to change the Server of the handshake ws.set_option(boost::beast::websocket::stream_base::decorator( [](boost::beast::websocket::response_type& res) { res.set(boost::beast::http::field::server, std::string("Reichwein.IT Test Websocket Server")); })); boost::beast::http::request_parser parser; request_type req; boost::beast::flat_buffer buffer; boost::beast::http::read(ws.next_layer(), buffer, parser); req = parser.get(); { std::lock_guard lock{m_shared->mutex}; strncpy(m_shared->subprotocol, std::string{req[http::field::sec_websocket_protocol]}.data(), sizeof(m_shared->subprotocol)); strncpy(m_shared->target, std::string{req.target()}.data(), sizeof(m_shared->target)); } ws.accept(req); for(;;) { boost::beast::flat_buffer buffer; ws.read(buffer); // Reply with : ws.text(ws.got_text()); std::string data(boost::asio::buffers_begin(buffer.data()), boost::asio::buffers_end(buffer.data())); data += ": " + std::to_string(m_count++); buffer.consume(buffer.size()); boost::beast::ostream(buffer) << data; ws.write(buffer.data()); } } catch(boost::beast::system_error const& se) { // This indicates that the session was closed if(se.code() != boost::beast::websocket::error::closed) std::cerr << "Error: " << se.code().message() << std::endl; } catch(std::exception const& e) { std::cerr << "Error: " << e.what() << std::endl; } } bool WebsocketServerProcess::is_running() { if (m_pid == 0) return false; return Reichwein::Process::is_running(m_pid); } void WebsocketServerProcess::start() { if (m_pid != 0) throw std::runtime_error("Process already running, so it can't be started"); // connect stdout of new child process to stream of parent, via pipe m_pid = fork(); if (m_pid < 0) throw std::runtime_error("Fork unsuccessful."); if (m_pid == 0) { // child process branch try { auto const address = boost::asio::ip::make_address("::1"); auto const port = static_cast(8765); // The io_context is required for all I/O boost::asio::io_context ioc{1}; // The acceptor receives incoming connections boost::asio::ip::tcp::acceptor acceptor{ioc, {address, port}}; for(;;) { // This will receive the new connection boost::asio::ip::tcp::socket socket{ioc}; // Block until we get a connection acceptor.accept(socket); // Launch the session, transferring ownership of the socket std::thread( &WebsocketServerProcess::do_session, this, std::move(socket)).detach(); } } catch (const std::exception& e) { std::cerr << "Error: " << e.what() << std::endl; } exit(0); } Process::wait_for_pid_listening_on(m_pid, 8765); } void WebsocketServerProcess::stop() { if (!is_running()) throw std::runtime_error("Process not running, so it can't be stopped"); if (kill(m_pid, SIGTERM) != 0) throw std::runtime_error("Unable to kill process"); if (int result = waitpid(m_pid, NULL, 0); result != m_pid) throw std::runtime_error("waitpid returned "s + std::to_string(result)); m_pid = 0; } std::string WebsocketServerProcess::subprotocol() { std::lock_guard lock{m_shared->mutex}; return m_shared->subprotocol; } std::string WebsocketServerProcess::target() { std::lock_guard lock{m_shared->mutex}; return m_shared->target; }