From adc9a7f36b6bef4e333aae2ad354a2a27eeca426 Mon Sep 17 00:00:00 2001 From: BodgeMaster <> Date: Mon, 25 Jul 2022 15:48:57 +0200 Subject: [PATCH] tools/hexnet: prepare for multithreading Multithreading will be needed to simultaneously receive and send data. The preparations include: - move all the settings of the program into global scope - add mutexes - move the code that reads from the TCP socket into a dedicated function --- src/tools/hexnet.cpp | 93 +++++++++++++++++++++++++++++++++----------- 1 file changed, 70 insertions(+), 23 deletions(-) diff --git a/src/tools/hexnet.cpp b/src/tools/hexnet.cpp index 80e4230..aaaa356 100644 --- a/src/tools/hexnet.cpp +++ b/src/tools/hexnet.cpp @@ -19,6 +19,9 @@ #include #include #include +#include +#include +#include #include "../lib/error.h++" #include "../lib/cli.h++" @@ -28,17 +31,65 @@ #define EXIT_USAGE 2 #define EXIT_UNIMPLEMENTED 3 +bool ipv4 = true; +bool ipv6 = true; +bool tcp = true; +bool udp = true; +bool listenMode = false; +int64_t mtu = 1500; +std::string host; +in_port_t port; +sockpp::tcp_socket* tcpSocket; +sockpp::tcp_acceptor tcpAcceptor; +std::mutex tcpSocketMutex; +std::mutex consoleMutex; +// used for coordinated graceful exit across threads +bool exitProgram = false; + +void signalHandler(int signal) { + exitProgram = true; + // if still waiting for incoming connection, stop waiting + tcpAcceptor.shutdown(); + // tell sockpp to close TCP socket if open because it blocks when trying + // to read and there is no data + if (*tcpSocket) { + // Intentionally not using the mutex here + tcpSocket->shutdown(SHUT_RD); + } + //TODO: figure out if - and how - this applies to UDP + + // Priority is to finish up all unfinished business that can be finished up. + // If something has the console mutex locked, that should not prevent + // other threads from winding down. This is why logging happens last. + consoleMutex.lock(); + std::cerr << "Received signal " << signal << ", shutting down." << std::endl; + consoleMutex.unlock(); +} + +void readFromTCPSocket(sockpp::tcp_socket* socket, int64_t mtu) { + ssize_t numBytes; + uint8_t buffer[mtu]; + tcpSocketMutex.lock(); + while (!exitProgram && (numBytes = socket->read(buffer, sizeof(buffer))) > 0) { + tcpSocketMutex.unlock(); + consoleMutex.lock(); + for (ssize_t i=0; i flags; flags.push_back(CLI::Flag('4', "ipv4", "use IPv4, defaults to both when -4 and -6 are omitted, otherwise uses what is specified")); @@ -68,7 +119,7 @@ int main(int argc, char* argv[]){ udp = cliParser.getFlag('u').value; } if (cliParser.getUnpositionalArgument('c').errorCode == ErrorCodes::NOT_PRESENT) { - listen = true; + listenMode = true; } if (cliParser.getUnpositionalArgument('m').errorCode == ErrorCodes::SUCCESS) { mtu = std::stol(cliParser.getUnpositionalArgument('m').value); @@ -78,7 +129,7 @@ int main(int argc, char* argv[]){ // ensure that the given value is a valid port port = (in_port_t) std::stoi(cliParser.getPositionalArgument(0).value); - if (listen) { + if (listenMode) { if (ipv6) { std::cerr << "IPv6 support is not implented yet." << std::endl; return EXIT_UNIMPLEMENTED; @@ -90,7 +141,7 @@ int main(int argc, char* argv[]){ std::cerr << "Listening on port " << port << "." << std::endl; sockpp::socket_initializer socketInitializer; - sockpp::tcp_acceptor tcpAcceptor(port); + tcpAcceptor = sockpp::tcp_acceptor(port); if (!tcpAcceptor) { std::cerr << "Error while creating TCP acceptor: " << tcpAcceptor.last_error_str() << std::endl; @@ -98,24 +149,20 @@ int main(int argc, char* argv[]){ } sockpp::inet_address peer; - sockpp::tcp_socket tcpSocket = tcpAcceptor.accept(&peer); + tcpSocket = new sockpp::tcp_socket(); + *tcpSocket = tcpAcceptor.accept(&peer); + std::cerr << "Incoming connection from " << peer << std::endl; - if (!tcpSocket) { + if (!(*tcpSocket)) { std::cerr << "Error on incoming connection: " << tcpAcceptor.last_error_str() << std::endl; + delete tcpSocket; return EXIT_RUNTIME; } - ssize_t numBytes; - uint8_t buffer[mtu]; - while ((numBytes = tcpSocket.read(buffer, sizeof(buffer))) > 0) { - for (ssize_t i=0; i