tools/hexnet: prepare for multithreading

Multithreading will be needed to simultaneously receive and send data.

The preparations include:
- move all the settings of the program into global scope
- add mutexes
- move the code that reads from the TCP socket into a dedicated function
BodgeMaster-unfinished
BodgeMaster 2022-07-25 15:48:57 +02:00
parent b044503951
commit adc9a7f36b
1 changed files with 70 additions and 23 deletions

View File

@ -19,6 +19,9 @@
#include <cstdint> #include <cstdint>
#include <cctype> #include <cctype>
#include <sockpp/tcp_acceptor.h> #include <sockpp/tcp_acceptor.h>
#include <thread>
#include <mutex>
#include <csignal>
#include "../lib/error.h++" #include "../lib/error.h++"
#include "../lib/cli.h++" #include "../lib/cli.h++"
@ -28,17 +31,65 @@
#define EXIT_USAGE 2 #define EXIT_USAGE 2
#define EXIT_UNIMPLEMENTED 3 #define EXIT_UNIMPLEMENTED 3
int main(int argc, char* argv[]){
// Argument parsing ################################################
bool ipv4 = true; bool ipv4 = true;
bool ipv6 = true; bool ipv6 = true;
bool tcp = true; bool tcp = true;
bool udp = true; bool udp = true;
bool listen = false; bool listenMode = false;
int64_t mtu = 1500; int64_t mtu = 1500;
std::string host; std::string host;
in_port_t port; in_port_t port;
sockpp::tcp_socket* tcpSocket;
sockpp::tcp_acceptor tcpAcceptor;
std::mutex tcpSocketMutex;
std::mutex consoleMutex;
// used for coordinated graceful exit across threads
bool exitProgram = false;
void signalHandler(int signal) {
exitProgram = true;
// if still waiting for incoming connection, stop waiting
tcpAcceptor.shutdown();
// tell sockpp to close TCP socket if open because it blocks when trying
// to read and there is no data
if (*tcpSocket) {
// Intentionally not using the mutex here
tcpSocket->shutdown(SHUT_RD);
}
//TODO: figure out if - and how - this applies to UDP
// Priority is to finish up all unfinished business that can be finished up.
// If something has the console mutex locked, that should not prevent
// other threads from winding down. This is why logging happens last.
consoleMutex.lock();
std::cerr << "Received signal " << signal << ", shutting down." << std::endl;
consoleMutex.unlock();
}
void readFromTCPSocket(sockpp::tcp_socket* socket, int64_t mtu) {
ssize_t numBytes;
uint8_t buffer[mtu];
tcpSocketMutex.lock();
while (!exitProgram && (numBytes = socket->read(buffer, sizeof(buffer))) > 0) {
tcpSocketMutex.unlock();
consoleMutex.lock();
for (ssize_t i=0; i<numBytes; i++) {
std::cout << std::hex << std::setfill('0') << std::setw(2) << (short) buffer[i];
}
std::cout.flush();
consoleMutex.unlock();
tcpSocketMutex.lock();
}
tcpSocketMutex.unlock();
consoleMutex.lock();
std::cerr << std::endl << "Connection closed." << std::endl;
consoleMutex.unlock();
}
int main(int argc, char* argv[]){
signal(SIGINT, signalHandler);
signal(SIGTERM, signalHandler);
std::vector<CLI::Flag> flags; std::vector<CLI::Flag> flags;
flags.push_back(CLI::Flag('4', "ipv4", "use IPv4, defaults to both when -4 and -6 are omitted, otherwise uses what is specified")); flags.push_back(CLI::Flag('4', "ipv4", "use IPv4, defaults to both when -4 and -6 are omitted, otherwise uses what is specified"));
@ -68,7 +119,7 @@ int main(int argc, char* argv[]){
udp = cliParser.getFlag('u').value; udp = cliParser.getFlag('u').value;
} }
if (cliParser.getUnpositionalArgument('c').errorCode == ErrorCodes::NOT_PRESENT) { if (cliParser.getUnpositionalArgument('c').errorCode == ErrorCodes::NOT_PRESENT) {
listen = true; listenMode = true;
} }
if (cliParser.getUnpositionalArgument('m').errorCode == ErrorCodes::SUCCESS) { if (cliParser.getUnpositionalArgument('m').errorCode == ErrorCodes::SUCCESS) {
mtu = std::stol(cliParser.getUnpositionalArgument('m').value); mtu = std::stol(cliParser.getUnpositionalArgument('m').value);
@ -78,7 +129,7 @@ int main(int argc, char* argv[]){
// ensure that the given value is a valid port // ensure that the given value is a valid port
port = (in_port_t) std::stoi(cliParser.getPositionalArgument(0).value); port = (in_port_t) std::stoi(cliParser.getPositionalArgument(0).value);
if (listen) { if (listenMode) {
if (ipv6) { if (ipv6) {
std::cerr << "IPv6 support is not implented yet." << std::endl; std::cerr << "IPv6 support is not implented yet." << std::endl;
return EXIT_UNIMPLEMENTED; return EXIT_UNIMPLEMENTED;
@ -90,7 +141,7 @@ int main(int argc, char* argv[]){
std::cerr << "Listening on port " << port << "." << std::endl; std::cerr << "Listening on port " << port << "." << std::endl;
sockpp::socket_initializer socketInitializer; sockpp::socket_initializer socketInitializer;
sockpp::tcp_acceptor tcpAcceptor(port); tcpAcceptor = sockpp::tcp_acceptor(port);
if (!tcpAcceptor) { if (!tcpAcceptor) {
std::cerr << "Error while creating TCP acceptor: " << tcpAcceptor.last_error_str() << std::endl; std::cerr << "Error while creating TCP acceptor: " << tcpAcceptor.last_error_str() << std::endl;
@ -98,24 +149,20 @@ int main(int argc, char* argv[]){
} }
sockpp::inet_address peer; sockpp::inet_address peer;
sockpp::tcp_socket tcpSocket = tcpAcceptor.accept(&peer); tcpSocket = new sockpp::tcp_socket();
*tcpSocket = tcpAcceptor.accept(&peer);
std::cerr << "Incoming connection from " << peer << std::endl; std::cerr << "Incoming connection from " << peer << std::endl;
if (!tcpSocket) { if (!(*tcpSocket)) {
std::cerr << "Error on incoming connection: " << tcpAcceptor.last_error_str() << std::endl; std::cerr << "Error on incoming connection: " << tcpAcceptor.last_error_str() << std::endl;
delete tcpSocket;
return EXIT_RUNTIME; return EXIT_RUNTIME;
} }
ssize_t numBytes; readFromTCPSocket(tcpSocket, mtu);
uint8_t buffer[mtu];
while ((numBytes = tcpSocket.read(buffer, sizeof(buffer))) > 0) {
for (ssize_t i=0; i<numBytes; i++) {
std::cout << std::hex << std::setfill('0') << std::setw(2) << (short) buffer[i];
}
std::cout.flush();
}
std::cerr << std::endl << "Connection closed." << std::endl; delete tcpSocket;
return EXIT_SUCCESS; return EXIT_SUCCESS;
} else { } else {