diff options
| -rw-r--r-- | https.cpp | 12 | ||||
| -rw-r--r-- | tests/test-webserver.cpp | 59 | ||||
| -rw-r--r-- | websocket.h | 299 | 
3 files changed, 246 insertions, 124 deletions
@@ -51,6 +51,7 @@ namespace {  // Handles an HTTP server connection  class session : public std::enable_shared_from_this<session>  { + boost::asio::io_context& ioc_;   beast::ssl_stream<beast::tcp_stream> stream_;   beast::flat_buffer buffer_;   Server& m_server; @@ -78,11 +79,13 @@ public:      // Take ownership of the socket      explicit      session( +        boost::asio::io_context& ioc,          tcp::socket&& socket,          ssl::context& ctx, -        Server& server) -        : stream_(std::move(socket), ctx) -        , m_server(server) +        Server& server): +         ioc_(ioc), +         stream_(std::move(socket), ctx), +         m_server(server)      {      } @@ -169,7 +172,7 @@ public:          if (websocket::is_upgrade(req_))          {           beast::get_lowest_layer(stream_).expires_never(); -         std::make_shared<websocket_session>(std::move(stream_))->do_accept(parser_->release()); +         std::make_shared<websocket_session>(ioc_, std::move(stream_))->do_accept_in(parser_->release());           return;          } @@ -313,6 +316,7 @@ private:          {              // Create the session and run it              std::make_shared<session>( +                ioc_,                  std::move(socket),                  ctx_,                  m_server)->run(); diff --git a/tests/test-webserver.cpp b/tests/test-webserver.cpp index 6bbf302..1c1e6cc 100644 --- a/tests/test-webserver.cpp +++ b/tests/test-webserver.cpp @@ -503,38 +503,38 @@ public:     throw std::runtime_error("Fork unsuccessful.");    if (m_pid == 0) { // child process branch -   while (true) { -    try -    { -     auto const address = boost::asio::ip::make_address("localhost"); -     auto const port = static_cast<unsigned short>(9876); +   try +   { +    auto const address = boost::asio::ip::make_address("::1"); +    auto const port = static_cast<unsigned short>(9876); -     // The io_context is required for all I/O -     boost::asio::io_context ioc{1}; +    // The io_context is required for all I/O +    boost::asio::io_context ioc{1}; -     // The acceptor receives incoming connections -     boost::asio::ip::tcp::acceptor acceptor{ioc, {address, port}}; -     for(;;) -     { -      // This will receive the new connection -      boost::asio::ip::tcp::socket socket{ioc}; +    // The acceptor receives incoming connections +    boost::asio::ip::tcp::acceptor acceptor{ioc, {address, port}}; +    for(;;) +    { +     // This will receive the new connection +     boost::asio::ip::tcp::socket socket{ioc}; -      // Block until we get a connection -      acceptor.accept(socket); +     // Block until we get a connection +     acceptor.accept(socket); -      // Launch the session, transferring ownership of the socket -      std::thread( -          &WebsocketServerProcess::do_session, this, -          std::move(socket)).detach(); -     } -    } -    catch (const std::exception& e) -    { -     std::cerr << "Error: " << e.what() << std::endl; +     // Launch the session, transferring ownership of the socket +     std::thread( +         &WebsocketServerProcess::do_session, this, +         std::move(socket)).detach();      }     } +   catch (const std::exception& e) +   { +    std::cerr << "Error: " << e.what() << std::endl; +   }     exit(0);    } + +  std::this_thread::sleep_for(std::chrono::milliseconds(100));   }   void stop() @@ -542,7 +542,7 @@ public:    if (!is_running())     throw std::runtime_error("Process not running, so it can't be stopped"); -  if (kill(m_pid, SIGKILL) != 0) +  if (kill(m_pid, SIGTERM) != 0)     throw std::runtime_error("Unable to kill process");    if (int result = waitpid(m_pid, NULL, 0); result != m_pid) @@ -632,9 +632,18 @@ BOOST_FIXTURE_TEST_CASE(websocket, Fixture)   data = std::string(boost::asio::buffers_begin(buffer.data()), boost::asio::buffers_end(buffer.data()));   BOOST_CHECK_EQUAL(data, "request1: 1"); + + buffer.consume(buffer.size()); + + ws.write(boost::asio::buffer(std::string(text))); + ws.read(buffer); + data = std::string(boost::asio::buffers_begin(buffer.data()), boost::asio::buffers_end(buffer.data())); + BOOST_CHECK_EQUAL(data, "request1: 2"); +   // Close the WebSocket connection   ws.close(boost::beast::websocket::close_code::normal);   BOOST_REQUIRE(serverProcess.is_running()); + BOOST_REQUIRE(websocketProcess.is_running());  } diff --git a/websocket.h b/websocket.h index d8d0262..1611c45 100644 --- a/websocket.h +++ b/websocket.h @@ -9,6 +9,7 @@  #include <boost/beast/websocket/ssl.hpp>  #include <boost/beast/ssl/ssl_stream.hpp>  #include <boost/asio/buffers_iterator.hpp> +#include <boost/asio/connect.hpp>  #include <boost/asio/dispatch.hpp>  #include <boost/asio/ssl/context.hpp>  #include <boost/beast/ssl.hpp> @@ -33,109 +34,217 @@ namespace net = boost::asio;            // from <boost/asio.hpp>  namespace ssl = boost::asio::ssl;       // from <boost/asio/ssl.hpp>  namespace websocket = beast::websocket;  using tcp = boost::asio::ip::tcp;       // from <boost/asio/ip/tcp.hpp> +using namespace std::placeholders; +// Server session, asynchronous, proxying  class websocket_session: public std::enable_shared_from_this<websocket_session>  { - websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws_; - beast::flat_buffer buffer_; + boost::asio::io_context& ioc_; + boost::asio::ip::tcp::resolver resolver_; + boost::beast::websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws_in_; + boost::beast::flat_buffer buffer_in_; + boost::beast::websocket::stream<beast::tcp_stream> ws_app_; + boost::beast::flat_buffer buffer_out_; + std::string host_; + std::string port_;  public: - explicit websocket_session(beast::ssl_stream<beast::tcp_stream>&& stream) : -  ws_(std::move(stream)) + explicit websocket_session(boost::asio::io_context& ioc, beast::ssl_stream<beast::tcp_stream>&& stream): +  ioc_(ioc), +  resolver_(boost::asio::make_strand(ioc_)), +  ws_in_(std::move(stream)), +  ws_app_(boost::asio::make_strand(ioc_)), +  host_{"::1"}, +  port_{"9876"}   {   } -    // Start the asynchronous accept operation -    template<class Body, class Allocator> -    void -    do_accept(http::request<Body, http::basic_fields<Allocator>> req) -    { -        // Set suggested timeout settings for the websocket -        ws_.set_option( -            websocket::stream_base::timeout::suggested( -                beast::role_type::server)); - -        // Set a decorator to change the Server of the handshake -        ws_.set_option(websocket::stream_base::decorator( -            [](websocket::response_type& res) -            { -                res.set(http::field::server, -                    std::string{"Reichwein.IT Webserver"}); -            })); - -        // Accept the websocket handshake -        ws_.async_accept( -            req, -            beast::bind_front_handler( -                &websocket_session::on_accept, -                shared_from_this())); -    } + // + // The initial setup path + // +  + // Start the asynchronous accept operation + template<class Body, class Allocator> + void do_accept_in(http::request<Body, http::basic_fields<Allocator>> req) + { +  // Set suggested timeout settings for the websocket +  ws_in_.set_option( +      websocket::stream_base::timeout::suggested( +          beast::role_type::server)); + +  // Set a decorator to change the Server of the handshake +  ws_in_.set_option(websocket::stream_base::decorator( +      [](websocket::response_type& res) +      { +          res.set(http::field::server, +              std::string{"Reichwein.IT Webserver"}); +      })); + +  // Accept the websocket handshake +  ws_in_.async_accept( +      req, +      beast::bind_front_handler( +          &websocket_session::on_accept_in, +          shared_from_this())); + }  private: -    void -    on_accept(beast::error_code ec) -    { -        if(ec) -            return fail(ec, "accept"); - -        // Read a message -        do_read(); -    } - -    void -    do_read() -    { -        // Read a message into our buffer -        ws_.async_read( -            buffer_, -            beast::bind_front_handler( -                &websocket_session::on_read, -                shared_from_this())); -    } - -    void -    on_read( -        beast::error_code ec, -        std::size_t bytes_transferred) -    { -        boost::ignore_unused(bytes_transferred); - -        // This indicates that the websocket_session was closed -        if(ec == websocket::error::closed) -            return; - -        if(ec) -            fail(ec, "read"); - -        // Echo the message -        ws_.text(ws_.got_text()); -        std::string data(boost::asio::buffers_begin(buffer_.data()), boost::asio::buffers_end(buffer_.data())); -        static int count{}; -        data += ": " + std::to_string(count++); -        buffer_.consume(buffer_.size()); -        boost::beast::ostream(buffer_) << data; -        ws_.async_write( -            buffer_.data(), -            beast::bind_front_handler( -                &websocket_session::on_write, -                shared_from_this())); -    } - -    void -    on_write( -        beast::error_code ec, -        std::size_t bytes_transferred) -    { -        boost::ignore_unused(bytes_transferred); - -        if(ec) -            return fail(ec, "write"); - -        // Clear the buffer -        buffer_.consume(buffer_.size()); - -        // Do another read -        do_read(); -    } -}; + void on_accept_in(beast::error_code ec) + { +  if (ec) +   return fail(ec, "accept in"); + +  resolver_.async_resolve(host_, port_, +                          beast::bind_front_handler(&websocket_session::on_resolve_app, shared_from_this())); + } +  + void on_resolve_app(beast::error_code ec, tcp::resolver::results_type results) + { +  if (ec) +   return fail(ec, "resolve app"); + +  beast::get_lowest_layer(ws_app_).async_connect(results, +                          beast::bind_front_handler(&websocket_session::on_connect_app, shared_from_this())); + } + + void on_connect_app(beast::error_code ec, tcp::resolver::results_type::endpoint_type endpoint) + { +  if (ec) +   return fail(ec, "connect app"); + +  beast::get_lowest_layer(ws_app_).expires_never(); + +  host_ += ':' + std::to_string(endpoint.port()); + +  // Set suggested timeout settings for the websocket +  ws_app_.set_option( +      websocket::stream_base::timeout::suggested( +	  beast::role_type::client)); + +  ws_app_.set_option(boost::beast::websocket::stream_base::decorator( +   [](boost::beast::websocket::request_type& req) +   { +    req.set(boost::beast::http::field::user_agent, "Reichwein.IT Webserver Websocket client"); +   })); + +  ws_app_.async_handshake(host_, "/", +                          beast::bind_front_handler(&websocket_session::on_handshake_app, shared_from_this())); + } + + void on_handshake_app(beast::error_code ec) + { +  if (ec) +   return fail(ec, "handshake app"); + +  // Start reading messages from both sides, asynchronously +  do_read_in(); +  do_read_app(); + } + + // + // The input path (client,ws_in_ -> app,ws_app_) via + // +  + void + do_read_in() + { +  // Read a message into our buffer +  ws_in_.async_read( +      buffer_in_, +      beast::bind_front_handler( +          &websocket_session::on_read_in, +          shared_from_this())); + } + + void + on_read_in( +     beast::error_code ec, +     std::size_t bytes_transferred) + { +  boost::ignore_unused(bytes_transferred); + +  // This indicates that the websocket_session was closed +  if (ec == websocket::error::closed) +   return; + +  if (ec) +   fail(ec, "read in"); + +  ws_app_.text(ws_in_.got_text()); +   +  do_write_app(); + } +  + void do_write_app() + { +  ws_app_.async_write(buffer_in_.data(), +      beast::bind_front_handler( +          &websocket_session::on_write_app, +          shared_from_this())); + } + + void on_write_app(beast::error_code ec, std::size_t bytes_transferred) + { +  boost::ignore_unused(bytes_transferred); + +  if (ec) +   fail(ec, "write app"); + +  buffer_in_.consume(buffer_in_.size()); + +  // Do another read +  do_read_in(); + } + + // + // The output path (app,ws_app_ -> client,ws_in_) + // +  + void do_read_app() + { +  // Read a message into our buffer +  ws_app_.async_read( +      buffer_out_, +      beast::bind_front_handler( +          &websocket_session::on_read_app, +          shared_from_this())); + } + + void on_read_app(beast::error_code ec, std::size_t bytes_transferred) + { +  boost::ignore_unused(bytes_transferred); + +  if (ec == websocket::error::closed) +   return; + +  if (ec) +   fail(ec, "read app"); + +  do_write_out(); + } + + void do_write_out() + { +  ws_in_.async_write(buffer_out_.data(), +      beast::bind_front_handler( +          &websocket_session::on_write_out, +          shared_from_this())); + } + + void on_write_out( +     beast::error_code ec, +     std::size_t bytes_transferred) + { +  boost::ignore_unused(bytes_transferred); + +  if(ec) +   return fail(ec, "write out"); + +  // Clear the buffer +  buffer_out_.consume(buffer_out_.size()); + +  // Do another read +  do_read_app(); + } +}; // class  | 
