summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorRoland Reichwein <mail@reichwein.it>2023-01-09 21:17:26 +0100
committerRoland Reichwein <mail@reichwein.it>2023-01-09 21:17:26 +0100
commit1191f07767583a9b19280a4f29cb1b0bd6799785 (patch)
tree55563e05902173f9b809fcc81fce5a979253522a
parentdc2e2b3e293a8374a2627982b521cc6865129c49 (diff)
Websocket proxy
-rw-r--r--https.cpp12
-rw-r--r--tests/test-webserver.cpp59
-rw-r--r--websocket.h299
3 files changed, 246 insertions, 124 deletions
diff --git a/https.cpp b/https.cpp
index 3a68b00..ce3a6fd 100644
--- a/https.cpp
+++ b/https.cpp
@@ -51,6 +51,7 @@ namespace {
// Handles an HTTP server connection
class session : public std::enable_shared_from_this<session>
{
+ boost::asio::io_context& ioc_;
beast::ssl_stream<beast::tcp_stream> stream_;
beast::flat_buffer buffer_;
Server& m_server;
@@ -78,11 +79,13 @@ public:
// Take ownership of the socket
explicit
session(
+ boost::asio::io_context& ioc,
tcp::socket&& socket,
ssl::context& ctx,
- Server& server)
- : stream_(std::move(socket), ctx)
- , m_server(server)
+ Server& server):
+ ioc_(ioc),
+ stream_(std::move(socket), ctx),
+ m_server(server)
{
}
@@ -169,7 +172,7 @@ public:
if (websocket::is_upgrade(req_))
{
beast::get_lowest_layer(stream_).expires_never();
- std::make_shared<websocket_session>(std::move(stream_))->do_accept(parser_->release());
+ std::make_shared<websocket_session>(ioc_, std::move(stream_))->do_accept_in(parser_->release());
return;
}
@@ -313,6 +316,7 @@ private:
{
// Create the session and run it
std::make_shared<session>(
+ ioc_,
std::move(socket),
ctx_,
m_server)->run();
diff --git a/tests/test-webserver.cpp b/tests/test-webserver.cpp
index 6bbf302..1c1e6cc 100644
--- a/tests/test-webserver.cpp
+++ b/tests/test-webserver.cpp
@@ -503,38 +503,38 @@ public:
throw std::runtime_error("Fork unsuccessful.");
if (m_pid == 0) { // child process branch
- while (true) {
- try
- {
- auto const address = boost::asio::ip::make_address("localhost");
- auto const port = static_cast<unsigned short>(9876);
+ try
+ {
+ auto const address = boost::asio::ip::make_address("::1");
+ auto const port = static_cast<unsigned short>(9876);
- // The io_context is required for all I/O
- boost::asio::io_context ioc{1};
+ // The io_context is required for all I/O
+ boost::asio::io_context ioc{1};
- // The acceptor receives incoming connections
- boost::asio::ip::tcp::acceptor acceptor{ioc, {address, port}};
- for(;;)
- {
- // This will receive the new connection
- boost::asio::ip::tcp::socket socket{ioc};
+ // The acceptor receives incoming connections
+ boost::asio::ip::tcp::acceptor acceptor{ioc, {address, port}};
+ for(;;)
+ {
+ // This will receive the new connection
+ boost::asio::ip::tcp::socket socket{ioc};
- // Block until we get a connection
- acceptor.accept(socket);
+ // Block until we get a connection
+ acceptor.accept(socket);
- // Launch the session, transferring ownership of the socket
- std::thread(
- &WebsocketServerProcess::do_session, this,
- std::move(socket)).detach();
- }
- }
- catch (const std::exception& e)
- {
- std::cerr << "Error: " << e.what() << std::endl;
+ // Launch the session, transferring ownership of the socket
+ std::thread(
+ &WebsocketServerProcess::do_session, this,
+ std::move(socket)).detach();
}
}
+ catch (const std::exception& e)
+ {
+ std::cerr << "Error: " << e.what() << std::endl;
+ }
exit(0);
}
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
void stop()
@@ -542,7 +542,7 @@ public:
if (!is_running())
throw std::runtime_error("Process not running, so it can't be stopped");
- if (kill(m_pid, SIGKILL) != 0)
+ if (kill(m_pid, SIGTERM) != 0)
throw std::runtime_error("Unable to kill process");
if (int result = waitpid(m_pid, NULL, 0); result != m_pid)
@@ -632,9 +632,18 @@ BOOST_FIXTURE_TEST_CASE(websocket, Fixture)
data = std::string(boost::asio::buffers_begin(buffer.data()), boost::asio::buffers_end(buffer.data()));
BOOST_CHECK_EQUAL(data, "request1: 1");
+
+ buffer.consume(buffer.size());
+
+ ws.write(boost::asio::buffer(std::string(text)));
+ ws.read(buffer);
+ data = std::string(boost::asio::buffers_begin(buffer.data()), boost::asio::buffers_end(buffer.data()));
+ BOOST_CHECK_EQUAL(data, "request1: 2");
+
// Close the WebSocket connection
ws.close(boost::beast::websocket::close_code::normal);
BOOST_REQUIRE(serverProcess.is_running());
+ BOOST_REQUIRE(websocketProcess.is_running());
}
diff --git a/websocket.h b/websocket.h
index d8d0262..1611c45 100644
--- a/websocket.h
+++ b/websocket.h
@@ -9,6 +9,7 @@
#include <boost/beast/websocket/ssl.hpp>
#include <boost/beast/ssl/ssl_stream.hpp>
#include <boost/asio/buffers_iterator.hpp>
+#include <boost/asio/connect.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/ssl/context.hpp>
#include <boost/beast/ssl.hpp>
@@ -33,109 +34,217 @@ namespace net = boost::asio; // from <boost/asio.hpp>
namespace ssl = boost::asio::ssl; // from <boost/asio/ssl.hpp>
namespace websocket = beast::websocket;
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
+using namespace std::placeholders;
+// Server session, asynchronous, proxying
class websocket_session: public std::enable_shared_from_this<websocket_session>
{
- websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws_;
- beast::flat_buffer buffer_;
+ boost::asio::io_context& ioc_;
+ boost::asio::ip::tcp::resolver resolver_;
+ boost::beast::websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws_in_;
+ boost::beast::flat_buffer buffer_in_;
+ boost::beast::websocket::stream<beast::tcp_stream> ws_app_;
+ boost::beast::flat_buffer buffer_out_;
+ std::string host_;
+ std::string port_;
public:
- explicit websocket_session(beast::ssl_stream<beast::tcp_stream>&& stream) :
- ws_(std::move(stream))
+ explicit websocket_session(boost::asio::io_context& ioc, beast::ssl_stream<beast::tcp_stream>&& stream):
+ ioc_(ioc),
+ resolver_(boost::asio::make_strand(ioc_)),
+ ws_in_(std::move(stream)),
+ ws_app_(boost::asio::make_strand(ioc_)),
+ host_{"::1"},
+ port_{"9876"}
{
}
- // Start the asynchronous accept operation
- template<class Body, class Allocator>
- void
- do_accept(http::request<Body, http::basic_fields<Allocator>> req)
- {
- // Set suggested timeout settings for the websocket
- ws_.set_option(
- websocket::stream_base::timeout::suggested(
- beast::role_type::server));
-
- // Set a decorator to change the Server of the handshake
- ws_.set_option(websocket::stream_base::decorator(
- [](websocket::response_type& res)
- {
- res.set(http::field::server,
- std::string{"Reichwein.IT Webserver"});
- }));
-
- // Accept the websocket handshake
- ws_.async_accept(
- req,
- beast::bind_front_handler(
- &websocket_session::on_accept,
- shared_from_this()));
- }
+ //
+ // The initial setup path
+ //
+
+ // Start the asynchronous accept operation
+ template<class Body, class Allocator>
+ void do_accept_in(http::request<Body, http::basic_fields<Allocator>> req)
+ {
+ // Set suggested timeout settings for the websocket
+ ws_in_.set_option(
+ websocket::stream_base::timeout::suggested(
+ beast::role_type::server));
+
+ // Set a decorator to change the Server of the handshake
+ ws_in_.set_option(websocket::stream_base::decorator(
+ [](websocket::response_type& res)
+ {
+ res.set(http::field::server,
+ std::string{"Reichwein.IT Webserver"});
+ }));
+
+ // Accept the websocket handshake
+ ws_in_.async_accept(
+ req,
+ beast::bind_front_handler(
+ &websocket_session::on_accept_in,
+ shared_from_this()));
+ }
private:
- void
- on_accept(beast::error_code ec)
- {
- if(ec)
- return fail(ec, "accept");
-
- // Read a message
- do_read();
- }
-
- void
- do_read()
- {
- // Read a message into our buffer
- ws_.async_read(
- buffer_,
- beast::bind_front_handler(
- &websocket_session::on_read,
- shared_from_this()));
- }
-
- void
- on_read(
- beast::error_code ec,
- std::size_t bytes_transferred)
- {
- boost::ignore_unused(bytes_transferred);
-
- // This indicates that the websocket_session was closed
- if(ec == websocket::error::closed)
- return;
-
- if(ec)
- fail(ec, "read");
-
- // Echo the message
- ws_.text(ws_.got_text());
- std::string data(boost::asio::buffers_begin(buffer_.data()), boost::asio::buffers_end(buffer_.data()));
- static int count{};
- data += ": " + std::to_string(count++);
- buffer_.consume(buffer_.size());
- boost::beast::ostream(buffer_) << data;
- ws_.async_write(
- buffer_.data(),
- beast::bind_front_handler(
- &websocket_session::on_write,
- shared_from_this()));
- }
-
- void
- on_write(
- beast::error_code ec,
- std::size_t bytes_transferred)
- {
- boost::ignore_unused(bytes_transferred);
-
- if(ec)
- return fail(ec, "write");
-
- // Clear the buffer
- buffer_.consume(buffer_.size());
-
- // Do another read
- do_read();
- }
-};
+ void on_accept_in(beast::error_code ec)
+ {
+ if (ec)
+ return fail(ec, "accept in");
+
+ resolver_.async_resolve(host_, port_,
+ beast::bind_front_handler(&websocket_session::on_resolve_app, shared_from_this()));
+ }
+
+ void on_resolve_app(beast::error_code ec, tcp::resolver::results_type results)
+ {
+ if (ec)
+ return fail(ec, "resolve app");
+
+ beast::get_lowest_layer(ws_app_).async_connect(results,
+ beast::bind_front_handler(&websocket_session::on_connect_app, shared_from_this()));
+ }
+
+ void on_connect_app(beast::error_code ec, tcp::resolver::results_type::endpoint_type endpoint)
+ {
+ if (ec)
+ return fail(ec, "connect app");
+
+ beast::get_lowest_layer(ws_app_).expires_never();
+
+ host_ += ':' + std::to_string(endpoint.port());
+
+ // Set suggested timeout settings for the websocket
+ ws_app_.set_option(
+ websocket::stream_base::timeout::suggested(
+ beast::role_type::client));
+
+ ws_app_.set_option(boost::beast::websocket::stream_base::decorator(
+ [](boost::beast::websocket::request_type& req)
+ {
+ req.set(boost::beast::http::field::user_agent, "Reichwein.IT Webserver Websocket client");
+ }));
+
+ ws_app_.async_handshake(host_, "/",
+ beast::bind_front_handler(&websocket_session::on_handshake_app, shared_from_this()));
+ }
+
+ void on_handshake_app(beast::error_code ec)
+ {
+ if (ec)
+ return fail(ec, "handshake app");
+
+ // Start reading messages from both sides, asynchronously
+ do_read_in();
+ do_read_app();
+ }
+
+ //
+ // The input path (client,ws_in_ -> app,ws_app_) via
+ //
+
+ void
+ do_read_in()
+ {
+ // Read a message into our buffer
+ ws_in_.async_read(
+ buffer_in_,
+ beast::bind_front_handler(
+ &websocket_session::on_read_in,
+ shared_from_this()));
+ }
+
+ void
+ on_read_in(
+ beast::error_code ec,
+ std::size_t bytes_transferred)
+ {
+ boost::ignore_unused(bytes_transferred);
+
+ // This indicates that the websocket_session was closed
+ if (ec == websocket::error::closed)
+ return;
+
+ if (ec)
+ fail(ec, "read in");
+
+ ws_app_.text(ws_in_.got_text());
+
+ do_write_app();
+ }
+
+ void do_write_app()
+ {
+ ws_app_.async_write(buffer_in_.data(),
+ beast::bind_front_handler(
+ &websocket_session::on_write_app,
+ shared_from_this()));
+ }
+
+ void on_write_app(beast::error_code ec, std::size_t bytes_transferred)
+ {
+ boost::ignore_unused(bytes_transferred);
+
+ if (ec)
+ fail(ec, "write app");
+
+ buffer_in_.consume(buffer_in_.size());
+
+ // Do another read
+ do_read_in();
+ }
+
+ //
+ // The output path (app,ws_app_ -> client,ws_in_)
+ //
+
+ void do_read_app()
+ {
+ // Read a message into our buffer
+ ws_app_.async_read(
+ buffer_out_,
+ beast::bind_front_handler(
+ &websocket_session::on_read_app,
+ shared_from_this()));
+ }
+
+ void on_read_app(beast::error_code ec, std::size_t bytes_transferred)
+ {
+ boost::ignore_unused(bytes_transferred);
+
+ if (ec == websocket::error::closed)
+ return;
+
+ if (ec)
+ fail(ec, "read app");
+
+ do_write_out();
+ }
+
+ void do_write_out()
+ {
+ ws_in_.async_write(buffer_out_.data(),
+ beast::bind_front_handler(
+ &websocket_session::on_write_out,
+ shared_from_this()));
+ }
+
+ void on_write_out(
+ beast::error_code ec,
+ std::size_t bytes_transferred)
+ {
+ boost::ignore_unused(bytes_transferred);
+
+ if(ec)
+ return fail(ec, "write out");
+
+ // Clear the buffer
+ buffer_out_.consume(buffer_out_.size());
+
+ // Do another read
+ do_read_app();
+ }
+}; // class