diff options
| author | Roland Reichwein <mail@reichwein.it> | 2020-05-07 19:24:45 +0200 | 
|---|---|---|
| committer | Roland Reichwein <mail@reichwein.it> | 2020-05-07 19:24:45 +0200 | 
| commit | 9e635d9b19e72eefef082dd8071d3e4c9d6cfab1 (patch) | |
| tree | 4e205947bc1f6b62f743faf3312f3e1e7e77f647 /plugins | |
| parent | 1cc484b25547e349177cf652f62021b802f48655 (diff) | |
Separated out TCP socket class
Diffstat (limited to 'plugins')
| -rw-r--r-- | plugins/fcgi/Makefile | 4 | ||||
| -rw-r--r-- | plugins/fcgi/fcgi.cpp | 207 | ||||
| -rw-r--r-- | plugins/fcgi/fcgi.h | 35 | ||||
| -rw-r--r-- | plugins/fcgi/fcgiid.cpp | 19 | ||||
| -rw-r--r-- | plugins/fcgi/fcgiid.h | 18 | ||||
| -rw-r--r-- | plugins/fcgi/socket.cpp | 121 | ||||
| -rw-r--r-- | plugins/fcgi/socket.h | 70 | 
7 files changed, 323 insertions, 151 deletions
diff --git a/plugins/fcgi/Makefile b/plugins/fcgi/Makefile index bb54d7c..c2789d8 100644 --- a/plugins/fcgi/Makefile +++ b/plugins/fcgi/Makefile @@ -57,7 +57,9 @@ LIBS+= \  endif  PROGSRC=\ -    fcgi.cpp +    fcgi.cpp \ +    fcgiid.cpp \ +    socket.cpp  TESTSRC=\      test-webserver.cpp \ diff --git a/plugins/fcgi/fcgi.cpp b/plugins/fcgi/fcgi.cpp index 464ba75..4ff8253 100644 --- a/plugins/fcgi/fcgi.cpp +++ b/plugins/fcgi/fcgi.cpp @@ -1,6 +1,7 @@  #include "fcgi.h"  #include "fastcgi.h" +#include "socket.h"  #include <boost/algorithm/string/predicate.hpp>  #include <boost/array.hpp> @@ -343,149 +344,119 @@ std::string fcgi_plugin::fcgiQuery(FCGIContext& context)   std::unordered_map<std::string, std::string> app_values; // will be read by FCGI_GET_VALUES - size_t pos { app_addr.find(':') }; - if (pos != app_addr.npos) { // tcp socket: host:port -  auto endpoints{m_resolver.resolve(app_addr.substr(0, pos), app_addr.substr(pos + 1))}; -  bool opening{false}; -   -  std::lock_guard<std::mutex> socket_lock{m_socket_mutex}; + auto it {m_sockets.find(app_addr)}; -  auto it {m_sockets.find(app_addr)}; + std::shared_ptr<Socket> socket; -  std::pair<std::unordered_map<std::string, boost::asio::ip::tcp::socket>::iterator, bool> it2{m_sockets.end(), false}; -  if (it == m_sockets.end()) -   it2 = m_sockets.emplace(app_addr, m_io_context); // add new element if necessary + if (it == m_sockets.end()) { // add new element +  socket = m_socket_factory.create(app_addr); -  boost::asio::ip::tcp::socket& socket { it2.second ? it2.first->second : it->second }; // use just added element or previously found one +  if (!socket) { +   std::cerr << "FCGI Error: Invalid app_addr." << std::endl; +   return HttpStatus("500", "FCGI configuration", context.SetResponseHeader); +  } -  socket.close(); // TODO: Bug workaround: Keeping TCP socket open doesn't work for now +  m_sockets[app_addr] = socket; -  if (!socket.is_open()) { -   std::cout << "FCGI: Opening new socket" << std::endl; + } else { // use already existing element +  socket = it->second; + } -   boost::asio::connect(socket, endpoints); + bool opening{false}; +   + std::lock_guard<std::mutex> socket_lock{socket->getMutex()}; -   boost::asio::socket_base::keep_alive keepAlive(true); -   socket.set_option(keepAlive); + socket->close(); // TODO: Bug workaround: Keeping TCP socket open doesn't work for now -   struct timeval tv; -   tv.tv_sec  = 0; // infinite -   tv.tv_usec = 0; -   if (setsockopt(socket.native_handle(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) -    std::cerr << "FCGI Error: SO_RCVTIMEO" << std::endl; + if (!socket->is_open()) { +  std::cout << "FCGI: Opening new socket" << std::endl; -   if (setsockopt(socket.native_handle(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv))) -    std::cerr << "FCGI Error: SO_SNDTIMEO" << std::endl; +  socket->open(); +  opening = true; + } -   int val{1}; -   if (setsockopt(socket.native_handle(), SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val))) -    std::cerr << "FCGI Error: SO_KEEPALIVE" << std::endl; + if (!socket->is_open()) { +  return HttpStatus("500", "FCGI connection", context.SetResponseHeader); + } -   opening = true; -  } + FCGI_ID_Guard id_guard(socket->fcgi_id()); + uint16_t id {id_guard.getID()}; -  if (!socket.is_open()) { -   return HttpStatus("500", "FCGI connection", context.SetResponseHeader); + try { +  if (opening) { +   FCGI_Record get_values{FCGI_GET_VALUES, 0, system_config_bytes}; +   if (socket->write(get_values.getBuffer()) != get_values.getBuffer().size()) +    std::cerr << "Warning: Not all bytes written 1" << std::endl;    } -  FCGI_ID_Guard id_guard(m_fcgi_id); -  uint16_t id {id_guard.getID()}; - -  try { -   if (opening) { -    FCGI_Record get_values{FCGI_GET_VALUES, 0, system_config_bytes}; -    if (socket.write_some(boost::asio::buffer(get_values.getBuffer())) != get_values.getBuffer().size()) -     std::cerr << "Warning: Not all bytes written 1" << std::endl; -   } - -   FCGI_Record begin_request{FCGI_BEGIN_REQUEST, id, FCGI_RESPONDER, FCGI_KEEP_CONN}; -   if (socket.write_some(boost::asio::buffer(begin_request.getBuffer())) != begin_request.getBuffer().size()) -    std::cerr << "Warning: Not all bytes written 3" << std::endl; +  FCGI_Record begin_request{FCGI_BEGIN_REQUEST, id, FCGI_RESPONDER, FCGI_KEEP_CONN}; +  if (socket->write(begin_request.getBuffer()) != begin_request.getBuffer().size()) +   std::cerr << "Warning: Not all bytes written 3" << std::endl; -   FCGI_Record params{FCGI_PARAMS, id, env_bytes}; -   if (socket.write_some(boost::asio::buffer(params.getBuffer())) != params.getBuffer().size()) -    std::cerr << "Warning: Not all bytes written 4" << std::endl; +  FCGI_Record params{FCGI_PARAMS, id, env_bytes}; +  if (socket->write(params.getBuffer()) != params.getBuffer().size()) +   std::cerr << "Warning: Not all bytes written 4" << std::endl; -   if (env_bytes.size()) {  -    FCGI_Record params_end{FCGI_PARAMS, id, std::string{}}; -    if (socket.write_some(boost::asio::buffer(params_end.getBuffer())) != params_end.getBuffer().size()) -     std::cerr << "Warning: Not all bytes written 5" << std::endl; -   } +  if (env_bytes.size()) {  +   FCGI_Record params_end{FCGI_PARAMS, id, std::string{}}; +   if (socket->write(params_end.getBuffer()) != params_end.getBuffer().size()) +    std::cerr << "Warning: Not all bytes written 5" << std::endl; +  } -   std::string body {context.GetRequestParam("body")}; -   FCGI_Record stdin_{FCGI_STDIN, id, body}; -   if (socket.write_some(boost::asio::buffer(stdin_.getBuffer())) != stdin_.getBuffer().size()) -    std::cerr << "Warning: Not all bytes written 6" << std::endl; -   -   if (body.size()) { -    FCGI_Record stdin_end{FCGI_STDIN, id, std::string{}}; -    if (socket.write_some(boost::asio::buffer(stdin_end.getBuffer())) != stdin_end.getBuffer().size()) -     std::cerr << "Warning: Not all bytes written 7" << std::endl; -   } -  } catch (const boost::system::system_error& ex) { -   if (ex.code() == boost::asio::error::eof) { -     std::cerr << "FCGI Error: EOF on write" << std::endl; // seems to be ok here -     return HttpStatus("500", "FCGI connection: EOF on write", context.SetResponseHeader); -   } +  std::string body {context.GetRequestParam("body")}; +  FCGI_Record stdin_{FCGI_STDIN, id, body}; +  if (socket->write(stdin_.getBuffer()) != stdin_.getBuffer().size()) +   std::cerr << "Warning: Not all bytes written 6" << std::endl; +  +  if (body.size()) { +   FCGI_Record stdin_end{FCGI_STDIN, id, std::string{}}; +   if (socket->write(stdin_end.getBuffer()) != stdin_end.getBuffer().size()) +    std::cerr << "Warning: Not all bytes written 7" << std::endl;    } + } catch (const fcgi_eof_error&) { +   std::cerr << "FCGI Error: EOF on write" << std::endl; // seems to be ok here +   return HttpStatus("500", "FCGI connection: EOF on write", context.SetResponseHeader); + }  #if 0 -  FCGI_Record data{FCGI_DATA, id, std::string{}}; -  if (socket.write_some(boost::asio::buffer(data.getBuffer())) != data.getBuffer().size()) -   std::cerr << "Warning: Not all bytes written 8" << std::endl; + FCGI_Record data{FCGI_DATA, id, std::string{}}; + if (socket->write(data.getBuffer()) != data.getBuffer().size()) +  std::cerr << "Warning: Not all bytes written 8" << std::endl;  #endif -  bool ended{false}; -  std::vector<char> inbuf; -  std::vector<char> inbuf_part(1024); -  while (!ended) { + bool ended{false}; + std::vector<char> inbuf; + std::vector<char> inbuf_part(1024); + while (!ended) { +  try { +   size_t got {socket->read(inbuf_part)}; +   inbuf.insert(inbuf.end(), inbuf_part.begin(), inbuf_part.begin() + got); +  } catch (const fcgi_eof_error&) { +   //std::cerr << "FCGI Warning: Early EOF" << std::endl; // seems to be ok here +   ended = true; +   //return HttpStatus("500", "FCGI connection: EOF on read", context.SetResponseHeader); +  } + +  while (inbuf.size() > 0) { +      try { -    size_t got {socket.read_some(boost::asio::buffer(inbuf_part))}; -    inbuf.insert(inbuf.end(), inbuf_part.begin(), inbuf_part.begin() + got); -   } catch (const boost::system::system_error& ex) { -    if (ex.code() == boost::asio::error::eof) { -     //std::cerr << "FCGI Warning: Early EOF" << std::endl; // seems to be ok here -     ended = true; -     //return HttpStatus("500", "FCGI connection: EOF on read", context.SetResponseHeader); -    } else { -     std::cerr << "FCGI Warning: Expected EOF, got " << ex.code() << ", " << ex.what() << std::endl; +    FCGI_Record r{inbuf}; +    if (r.getType() == FCGI_END_REQUEST) {       ended = true; -    } -   } - -   while (inbuf.size() > 0) { -   -    try { -     FCGI_Record r{inbuf}; -     if (r.getType() == FCGI_END_REQUEST) { -      ended = true; -     } else if (r.getType() == FCGI_STDOUT) { -      output_data += r.getContent(); -     } else if (r.getType() == FCGI_STDERR) { -      std::cerr << "FCGI STDERR: " << r.getContent() << std::endl; -     } else if (r.getType() == FCGI_GET_VALUES_RESULT) { -      FCGI_DecodeEnv(r.getContent(), app_values); -      DumpAppValues(app_values); -     } else -      throw std::runtime_error("Unhandled FCGI type: "s + std::to_string(r.getType())); -    } catch (const std::length_error& ex) { -     // ignore if not enough data available yet -     break; -    } +    } else if (r.getType() == FCGI_STDOUT) { +     output_data += r.getContent(); +    } else if (r.getType() == FCGI_STDERR) { +     std::cerr << "FCGI STDERR: " << r.getContent() << std::endl; +    } else if (r.getType() == FCGI_GET_VALUES_RESULT) { +     FCGI_DecodeEnv(r.getContent(), app_values); +     DumpAppValues(app_values); +    } else +     throw std::runtime_error("Unhandled FCGI type: "s + std::to_string(r.getType())); +   } catch (const std::length_error& ex) { +    // ignore if not enough data available yet +    break;     }    } - - } else if (fs::is_socket(fs::path{app_addr})) { // Unix domain socket -  // TODO -  std::cerr << "FCGI Error: Unix domain sockets not yet implemented." << std::endl; -  return HttpStatus("500", "FCGI configuration", context.SetResponseHeader); - } else if (fs::is_regular_file(fs::path{app_addr})) { // Executable to start -  // TODO -  std::cerr << "FCGI Error: Executable FCGI not yet implemented." << std::endl; -  return HttpStatus("500", "FCGI configuration", context.SetResponseHeader); - } else { -  std::cerr << "FCGI Error: Invalid app_addr type." << std::endl; -  return HttpStatus("500", "FCGI configuration", context.SetResponseHeader);   }   std::istringstream is_out{output_data}; @@ -532,8 +503,6 @@ std::string fcgi_plugin::name()  }  fcgi_plugin::fcgi_plugin() - : m_io_context() - , m_resolver(m_io_context)  {   //std::cout << "Plugin constructor" << std::endl;  } diff --git a/plugins/fcgi/fcgi.h b/plugins/fcgi/fcgi.h index 4f77719..289c4d6 100644 --- a/plugins/fcgi/fcgi.h +++ b/plugins/fcgi/fcgi.h @@ -2,38 +2,14 @@  #include "../../plugin_interface.h" +#include "socket.h" +  #include <boost/asio.hpp>  #include <cstdint>  #include <mutex>  #include <set> -// TODO: multithreading -class FCGI_ID -{ - std::set<uint16_t >m_unused; - uint16_t m_current_max{}; - -public: - FCGI_ID(){} - - // starting at 1 - uint16_t getID(){ -  if (m_unused.empty()) { -   m_current_max++; -   return m_current_max; -  } else { -   uint16_t result{*m_unused.begin()}; -   m_unused.erase(m_unused.begin()); -   return result; -  } - } - - void putID(uint16_t id){ -  m_unused.insert(id); - } -}; -  // automatically reserves ID, and releases it via RAII  class FCGI_ID_Guard  { @@ -57,12 +33,9 @@ struct FCGIContext;  class fcgi_plugin: public webserver_plugin_interface   { - FCGI_ID m_fcgi_id; - boost::asio::io_context m_io_context; - boost::asio::ip::tcp::resolver m_resolver; + SocketFactory m_socket_factory; - std::mutex m_socket_mutex; // guard m_socket use in different threads - std::unordered_map<std::string, boost::asio::ip::tcp::socket> m_sockets; + std::unordered_map<std::string, std::shared_ptr<Socket>> m_sockets;  public:   fcgi_plugin(); diff --git a/plugins/fcgi/fcgiid.cpp b/plugins/fcgi/fcgiid.cpp new file mode 100644 index 0000000..778cbc9 --- /dev/null +++ b/plugins/fcgi/fcgiid.cpp @@ -0,0 +1,19 @@ +#include "fcgiid.h" +  + // starting at 1 +uint16_t FCGI_ID::getID() +{ + if (m_unused.empty()) { +  m_current_max++; +  return m_current_max; + } else { +  uint16_t result{*m_unused.begin()}; +  m_unused.erase(m_unused.begin()); +  return result; + } +} + +void FCGI_ID::putID(uint16_t id) +{ + m_unused.insert(id); +} diff --git a/plugins/fcgi/fcgiid.h b/plugins/fcgi/fcgiid.h new file mode 100644 index 0000000..e3649d7 --- /dev/null +++ b/plugins/fcgi/fcgiid.h @@ -0,0 +1,18 @@ +#pragma once + +#include <cstdint> +#include <set> + +class FCGI_ID +{ + std::set<uint16_t >m_unused; + uint16_t m_current_max{}; + +public: + FCGI_ID(){} + + uint16_t getID(); + void putID(uint16_t id); +}; + + diff --git a/plugins/fcgi/socket.cpp b/plugins/fcgi/socket.cpp new file mode 100644 index 0000000..0a2a381 --- /dev/null +++ b/plugins/fcgi/socket.cpp @@ -0,0 +1,121 @@ +#include "socket.h" + +#include <filesystem> +#include <iostream> + +namespace fs = std::filesystem; +using namespace std::string_literals; + +std::mutex& Socket::getMutex() +{ + return m_mutex; +} + +FCGI_ID& Socket::fcgi_id() +{ + return m_fcgi_id; +} + +SocketFactory::SocketFactory() + : m_io_context() +{ +} + +std::shared_ptr<Socket> SocketFactory::create(const std::string& app_addr) +{ + size_t pos { app_addr.find(':') }; + if (pos != app_addr.npos) { // tcp socket: host:port + +  return std::make_shared<TCPSocket>(app_addr.substr(0, pos), app_addr.substr(pos + 1), m_io_context); +  + } else if (fs::is_socket(fs::path{app_addr})) { // Unix domain socket +  // TODO +  std::cerr << "FCGI Error: Unix domain sockets not yet implemented." << std::endl; + } else if (fs::is_regular_file(fs::path{app_addr})) { // Executable to start +  // TODO +  std::cerr << "FCGI Error: Executable FCGI not yet implemented." << std::endl; + } else { +  std::cerr << "FCGI Error: Invalid app_addr type." << std::endl; + } + + return {};  +} + +TCPSocket::TCPSocket(const std::string& host, const std::string& port, boost::asio::io_context& io_context) + : m_io_context(io_context) + , m_host(host) + , m_port(port) + , m_socket(io_context) +{ +} + +TCPSocket::~TCPSocket() +{ +} + +void TCPSocket::open() +{ + boost::asio::ip::tcp::resolver resolver(m_io_context); + auto endpoints{resolver.resolve(m_host, m_port)}; + try { +  boost::asio::connect(m_socket, endpoints); + } catch(const std::exception& ex) { +  std::cerr << "FCGI Error: Error on connecting to " << m_host << ":" << m_port << ": " << ex.what() << std::endl; +  return; + } + + boost::asio::socket_base::keep_alive keepAlive(true); + m_socket.set_option(keepAlive); + + struct timeval tv; + tv.tv_sec  = 0; // infinite + tv.tv_usec = 0; + if (setsockopt(m_socket.native_handle(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) +  std::cerr << "FCGI Error: SO_RCVTIMEO" << std::endl; + + if (setsockopt(m_socket.native_handle(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv))) +  std::cerr << "FCGI Error: SO_SNDTIMEO" << std::endl; + + int val{1}; + if (setsockopt(m_socket.native_handle(), SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val))) +  std::cerr << "FCGI Error: SO_KEEPALIVE" << std::endl; +} + +bool TCPSocket::is_open() +{ + return m_socket.is_open(); +} + +void TCPSocket::close() +{ + m_socket.close(); +} + +size_t TCPSocket::write(const std::vector<char>& data) +{ + try { +  return m_socket.write_some(boost::asio::buffer(data)); + } catch (const boost::system::system_error& ex) { +  if (ex.code() == boost::asio::error::eof) { +   throw fcgi_eof_error("EOF on write"); +  } else +   throw std::runtime_error("FCGI Error: Unknown boost asio exception on write: "s + ex.what()); + } catch (const std::exception& ex) { +   throw std::runtime_error("FCGI Error: Unknown exception on write: "s + ex.what()); + } +} + +size_t TCPSocket::read(std::vector<char>& data) +{ + try { +  return m_socket.read_some(boost::asio::buffer(data)); + } catch (const boost::system::system_error& ex) { +  if (ex.code() == boost::asio::error::eof) { +   throw fcgi_eof_error("EOF on read"); +  } else +   throw std::runtime_error("FCGI Error: Unknown boost asio exception on read: "s + ex.what()); + } catch (const std::exception& ex) { +   throw std::runtime_error("FCGI Error: Unknown exception on read: "s + ex.what()); + } +} + diff --git a/plugins/fcgi/socket.h b/plugins/fcgi/socket.h new file mode 100644 index 0000000..b4ec54b --- /dev/null +++ b/plugins/fcgi/socket.h @@ -0,0 +1,70 @@ +#pragma once + +#include "fcgiid.h" + +#include <boost/asio.hpp> + +#include <cstdint> +#include <mutex> +#include <set> +#include <string> + +class fcgi_eof_error: public std::runtime_error +{ +public: + fcgi_eof_error(const std::string& what_arg): std::runtime_error(what_arg) {} +}; + +class Socket +{ + std::mutex m_mutex; // guard socket use in different threads + FCGI_ID m_fcgi_id; + +public: + virtual ~Socket() {} + + std::mutex& getMutex(); + + FCGI_ID& fcgi_id(); + + virtual void open() = 0; + virtual void close() = 0; + virtual bool is_open() = 0; + virtual size_t write(const std::vector<char>& data) = 0; + virtual size_t read(std::vector<char>& data) = 0; +}; + +class SocketFactory +{ + boost::asio::io_context m_io_context; + +public: + SocketFactory(); + std::shared_ptr<Socket> create(const std::string& name); +}; + +class TCPSocket: public Socket +{ + boost::asio::io_context& m_io_context; + std::string m_host; + std::string m_port; + boost::asio::ip::tcp::socket m_socket; + +public: + TCPSocket(const std::string& host, const std::string& port, boost::asio::io_context& io_context); + ~TCPSocket() override; +  + void open() override; + void close() override; + bool is_open() override; + size_t write(const std::vector<char>& data) override; + size_t read(std::vector<char>& data) override; +}; + +#if 0 +class FileSocket: public Socket +{ + ~FileSocket() override; +}; +#endif +  | 
