From 1191f07767583a9b19280a4f29cb1b0bd6799785 Mon Sep 17 00:00:00 2001 From: Roland Reichwein Date: Mon, 9 Jan 2023 21:17:26 +0100 Subject: Websocket proxy --- https.cpp | 12 +- tests/test-webserver.cpp | 59 ++++++---- websocket.h | 299 ++++++++++++++++++++++++++++++++--------------- 3 files changed, 246 insertions(+), 124 deletions(-) diff --git a/https.cpp b/https.cpp index 3a68b00..ce3a6fd 100644 --- a/https.cpp +++ b/https.cpp @@ -51,6 +51,7 @@ namespace { // Handles an HTTP server connection class session : public std::enable_shared_from_this { + boost::asio::io_context& ioc_; beast::ssl_stream stream_; beast::flat_buffer buffer_; Server& m_server; @@ -78,11 +79,13 @@ public: // Take ownership of the socket explicit session( + boost::asio::io_context& ioc, tcp::socket&& socket, ssl::context& ctx, - Server& server) - : stream_(std::move(socket), ctx) - , m_server(server) + Server& server): + ioc_(ioc), + stream_(std::move(socket), ctx), + m_server(server) { } @@ -169,7 +172,7 @@ public: if (websocket::is_upgrade(req_)) { beast::get_lowest_layer(stream_).expires_never(); - std::make_shared(std::move(stream_))->do_accept(parser_->release()); + std::make_shared(ioc_, std::move(stream_))->do_accept_in(parser_->release()); return; } @@ -313,6 +316,7 @@ private: { // Create the session and run it std::make_shared( + ioc_, std::move(socket), ctx_, m_server)->run(); diff --git a/tests/test-webserver.cpp b/tests/test-webserver.cpp index 6bbf302..1c1e6cc 100644 --- a/tests/test-webserver.cpp +++ b/tests/test-webserver.cpp @@ -503,38 +503,38 @@ public: throw std::runtime_error("Fork unsuccessful."); if (m_pid == 0) { // child process branch - while (true) { - try - { - auto const address = boost::asio::ip::make_address("localhost"); - auto const port = static_cast(9876); + try + { + auto const address = boost::asio::ip::make_address("::1"); + auto const port = static_cast(9876); - // The io_context is required for all I/O - boost::asio::io_context ioc{1}; + // The io_context is required for all I/O + boost::asio::io_context ioc{1}; - // The acceptor receives incoming connections - boost::asio::ip::tcp::acceptor acceptor{ioc, {address, port}}; - for(;;) - { - // This will receive the new connection - boost::asio::ip::tcp::socket socket{ioc}; + // The acceptor receives incoming connections + boost::asio::ip::tcp::acceptor acceptor{ioc, {address, port}}; + for(;;) + { + // This will receive the new connection + boost::asio::ip::tcp::socket socket{ioc}; - // Block until we get a connection - acceptor.accept(socket); + // Block until we get a connection + acceptor.accept(socket); - // Launch the session, transferring ownership of the socket - std::thread( - &WebsocketServerProcess::do_session, this, - std::move(socket)).detach(); - } - } - catch (const std::exception& e) - { - std::cerr << "Error: " << e.what() << std::endl; + // Launch the session, transferring ownership of the socket + std::thread( + &WebsocketServerProcess::do_session, this, + std::move(socket)).detach(); } } + catch (const std::exception& e) + { + std::cerr << "Error: " << e.what() << std::endl; + } exit(0); } + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } void stop() @@ -542,7 +542,7 @@ public: if (!is_running()) throw std::runtime_error("Process not running, so it can't be stopped"); - if (kill(m_pid, SIGKILL) != 0) + if (kill(m_pid, SIGTERM) != 0) throw std::runtime_error("Unable to kill process"); if (int result = waitpid(m_pid, NULL, 0); result != m_pid) @@ -632,9 +632,18 @@ BOOST_FIXTURE_TEST_CASE(websocket, Fixture) data = std::string(boost::asio::buffers_begin(buffer.data()), boost::asio::buffers_end(buffer.data())); BOOST_CHECK_EQUAL(data, "request1: 1"); + + buffer.consume(buffer.size()); + + ws.write(boost::asio::buffer(std::string(text))); + ws.read(buffer); + data = std::string(boost::asio::buffers_begin(buffer.data()), boost::asio::buffers_end(buffer.data())); + BOOST_CHECK_EQUAL(data, "request1: 2"); + // Close the WebSocket connection ws.close(boost::beast::websocket::close_code::normal); BOOST_REQUIRE(serverProcess.is_running()); + BOOST_REQUIRE(websocketProcess.is_running()); } diff --git a/websocket.h b/websocket.h index d8d0262..1611c45 100644 --- a/websocket.h +++ b/websocket.h @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -33,109 +34,217 @@ namespace net = boost::asio; // from namespace ssl = boost::asio::ssl; // from namespace websocket = beast::websocket; using tcp = boost::asio::ip::tcp; // from +using namespace std::placeholders; +// Server session, asynchronous, proxying class websocket_session: public std::enable_shared_from_this { - websocket::stream> ws_; - beast::flat_buffer buffer_; + boost::asio::io_context& ioc_; + boost::asio::ip::tcp::resolver resolver_; + boost::beast::websocket::stream> ws_in_; + boost::beast::flat_buffer buffer_in_; + boost::beast::websocket::stream ws_app_; + boost::beast::flat_buffer buffer_out_; + std::string host_; + std::string port_; public: - explicit websocket_session(beast::ssl_stream&& stream) : - ws_(std::move(stream)) + explicit websocket_session(boost::asio::io_context& ioc, beast::ssl_stream&& stream): + ioc_(ioc), + resolver_(boost::asio::make_strand(ioc_)), + ws_in_(std::move(stream)), + ws_app_(boost::asio::make_strand(ioc_)), + host_{"::1"}, + port_{"9876"} { } - // Start the asynchronous accept operation - template - void - do_accept(http::request> req) - { - // Set suggested timeout settings for the websocket - ws_.set_option( - websocket::stream_base::timeout::suggested( - beast::role_type::server)); - - // Set a decorator to change the Server of the handshake - ws_.set_option(websocket::stream_base::decorator( - [](websocket::response_type& res) - { - res.set(http::field::server, - std::string{"Reichwein.IT Webserver"}); - })); - - // Accept the websocket handshake - ws_.async_accept( - req, - beast::bind_front_handler( - &websocket_session::on_accept, - shared_from_this())); - } + // + // The initial setup path + // + + // Start the asynchronous accept operation + template + void do_accept_in(http::request> req) + { + // Set suggested timeout settings for the websocket + ws_in_.set_option( + websocket::stream_base::timeout::suggested( + beast::role_type::server)); + + // Set a decorator to change the Server of the handshake + ws_in_.set_option(websocket::stream_base::decorator( + [](websocket::response_type& res) + { + res.set(http::field::server, + std::string{"Reichwein.IT Webserver"}); + })); + + // Accept the websocket handshake + ws_in_.async_accept( + req, + beast::bind_front_handler( + &websocket_session::on_accept_in, + shared_from_this())); + } private: - void - on_accept(beast::error_code ec) - { - if(ec) - return fail(ec, "accept"); - - // Read a message - do_read(); - } - - void - do_read() - { - // Read a message into our buffer - ws_.async_read( - buffer_, - beast::bind_front_handler( - &websocket_session::on_read, - shared_from_this())); - } - - void - on_read( - beast::error_code ec, - std::size_t bytes_transferred) - { - boost::ignore_unused(bytes_transferred); - - // This indicates that the websocket_session was closed - if(ec == websocket::error::closed) - return; - - if(ec) - fail(ec, "read"); - - // Echo the message - ws_.text(ws_.got_text()); - std::string data(boost::asio::buffers_begin(buffer_.data()), boost::asio::buffers_end(buffer_.data())); - static int count{}; - data += ": " + std::to_string(count++); - buffer_.consume(buffer_.size()); - boost::beast::ostream(buffer_) << data; - ws_.async_write( - buffer_.data(), - beast::bind_front_handler( - &websocket_session::on_write, - shared_from_this())); - } - - void - on_write( - beast::error_code ec, - std::size_t bytes_transferred) - { - boost::ignore_unused(bytes_transferred); - - if(ec) - return fail(ec, "write"); - - // Clear the buffer - buffer_.consume(buffer_.size()); - - // Do another read - do_read(); - } -}; + void on_accept_in(beast::error_code ec) + { + if (ec) + return fail(ec, "accept in"); + + resolver_.async_resolve(host_, port_, + beast::bind_front_handler(&websocket_session::on_resolve_app, shared_from_this())); + } + + void on_resolve_app(beast::error_code ec, tcp::resolver::results_type results) + { + if (ec) + return fail(ec, "resolve app"); + + beast::get_lowest_layer(ws_app_).async_connect(results, + beast::bind_front_handler(&websocket_session::on_connect_app, shared_from_this())); + } + + void on_connect_app(beast::error_code ec, tcp::resolver::results_type::endpoint_type endpoint) + { + if (ec) + return fail(ec, "connect app"); + + beast::get_lowest_layer(ws_app_).expires_never(); + + host_ += ':' + std::to_string(endpoint.port()); + + // Set suggested timeout settings for the websocket + ws_app_.set_option( + websocket::stream_base::timeout::suggested( + beast::role_type::client)); + + ws_app_.set_option(boost::beast::websocket::stream_base::decorator( + [](boost::beast::websocket::request_type& req) + { + req.set(boost::beast::http::field::user_agent, "Reichwein.IT Webserver Websocket client"); + })); + + ws_app_.async_handshake(host_, "/", + beast::bind_front_handler(&websocket_session::on_handshake_app, shared_from_this())); + } + + void on_handshake_app(beast::error_code ec) + { + if (ec) + return fail(ec, "handshake app"); + + // Start reading messages from both sides, asynchronously + do_read_in(); + do_read_app(); + } + + // + // The input path (client,ws_in_ -> app,ws_app_) via + // + + void + do_read_in() + { + // Read a message into our buffer + ws_in_.async_read( + buffer_in_, + beast::bind_front_handler( + &websocket_session::on_read_in, + shared_from_this())); + } + + void + on_read_in( + beast::error_code ec, + std::size_t bytes_transferred) + { + boost::ignore_unused(bytes_transferred); + + // This indicates that the websocket_session was closed + if (ec == websocket::error::closed) + return; + + if (ec) + fail(ec, "read in"); + + ws_app_.text(ws_in_.got_text()); + + do_write_app(); + } + + void do_write_app() + { + ws_app_.async_write(buffer_in_.data(), + beast::bind_front_handler( + &websocket_session::on_write_app, + shared_from_this())); + } + + void on_write_app(beast::error_code ec, std::size_t bytes_transferred) + { + boost::ignore_unused(bytes_transferred); + + if (ec) + fail(ec, "write app"); + + buffer_in_.consume(buffer_in_.size()); + + // Do another read + do_read_in(); + } + + // + // The output path (app,ws_app_ -> client,ws_in_) + // + + void do_read_app() + { + // Read a message into our buffer + ws_app_.async_read( + buffer_out_, + beast::bind_front_handler( + &websocket_session::on_read_app, + shared_from_this())); + } + + void on_read_app(beast::error_code ec, std::size_t bytes_transferred) + { + boost::ignore_unused(bytes_transferred); + + if (ec == websocket::error::closed) + return; + + if (ec) + fail(ec, "read app"); + + do_write_out(); + } + + void do_write_out() + { + ws_in_.async_write(buffer_out_.data(), + beast::bind_front_handler( + &websocket_session::on_write_out, + shared_from_this())); + } + + void on_write_out( + beast::error_code ec, + std::size_t bytes_transferred) + { + boost::ignore_unused(bytes_transferred); + + if(ec) + return fail(ec, "write out"); + + // Clear the buffer + buffer_out_.consume(buffer_out_.size()); + + // Do another read + do_read_app(); + } +}; // class -- cgit v1.2.3