From b5a684d85eb571eb16d6c5e8bc040c9ee50be6c4 Mon Sep 17 00:00:00 2001 From: Sergiotarxz Date: Thu, 20 Apr 2023 01:25:20 +0200 Subject: [PATCH] Adding initial commit. --- Doxyfile | 7 ++ README.md | 186 +++++++++++++++++++++++++++++++++++++++++++++++- include/.exists | 0 meson.build | 31 ++++++++ src/.exists | 0 src/main.cpp | 161 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 384 insertions(+), 1 deletion(-) create mode 100644 Doxyfile create mode 100644 include/.exists create mode 100644 meson.build create mode 100644 src/.exists create mode 100644 src/main.cpp diff --git a/Doxyfile b/Doxyfile new file mode 100644 index 0000000..4b78d27 --- /dev/null +++ b/Doxyfile @@ -0,0 +1,7 @@ +GENERATE_HTML=yes +EXTRACT_ALL=yes +RECURSIVE=yes +INPUT=../include +DISABLE_INDEX = YES +GENERATE_TREEVIEW = YES +PROJECT_NAME=dining-philosophers diff --git a/README.md b/README.md index b445b53..29d367f 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,186 @@ -# producer-consumer +# Producer-Consumer Problem Solution. + +## Description + +This is a very common problem in multithreading, one or more threads +generate data in a buffer and one or more threads have to process +that data. + +This solution is very useful when for example listening for +network connections in a multithreaded manner, various threads +wait for client requests while a pool of consumer threads +dispatch those requests. + +## The full example + +Just because the solution is in this case a single file +I decided to copy it in the README.md so you do +not have to navigate by the project, I could not +unfortunately do this with the philosophers having +a dinner. + + +```C++ +#include +#include +#include +#include +#include +#include +#include + +void +argumentAsNumber(int argc, char **argv, int numberOfArgument, std::string nameOfArgument, int *output); + +// The buffer size can be changed, the bigger it +// is, the less the producer threads would +// have to wait for a slot in the event of +// consumers being unable to keep up with +// the generated values. +const int sizeBuffer = 10; + +// Handy typedefs to reduce the line noise. +// This is very useful for shared_ptr. +typedef std::list ListOfThreads; +typedef std::shared_ptr Buffer; +typedef std::shared_ptr BufferAllocation; +typedef std::counting_semaphore ProducerConsumerSemaphore; +typedef std::shared_ptr ProducerConsumerSemaphorePtr; +typedef std::shared_ptr MutexPtr; + +void +producerThread(Buffer buffer, BufferAllocation bufferAllocation, + ProducerConsumerSemaphorePtr producerAvailableSlots, + ProducerConsumerSemaphorePtr consumerAvailableSlots, + MutexPtr bufferMutex) { + // We are going to produce in the firsts iterations + // values from 0 to 50 and then values from + // 10 to 50. + int result = 0; + while (true) { + if (result > 50) { + result = 10; + } + // We reduce the available slots for producers. + // If we cant this would wait. + producerAvailableSlots->acquire(); + { + // We lock the buffer to avoid other + // threads to mangle when we are + // still writing. + std::lock_guard lk{*bufferMutex}; + // We set the last element of the buffer + // to be the generated value. + buffer[*bufferAllocation] = result; + // We increment the buffer size, although I + // do not know if this is needed to be commented. + (*bufferAllocation)++; + } + // Now there is one more available slot to be consumed. + consumerAvailableSlots->release(); + result++; + } +} + +void +consumerThread(Buffer buffer, BufferAllocation bufferAllocation, + ProducerConsumerSemaphorePtr producerAvailableSlots, + ProducerConsumerSemaphorePtr consumerAvailableSlots, + MutexPtr bufferMutex) { + // We keep consuming forever in every consumer thread. + while (true) { + // This attempt to release the consumerAvailableSlots semaphore. + // It will wait if the operation cannot be done because + // it is already 0. + consumerAvailableSlots->acquire(); + { + // We block the buffer and the variable containing its + // size so no other thread attempts to write or read there + // while we are doing this. + std::lock_guard lk{*bufferMutex}; + // We read the first value to create a first in first out. + // If we do not do this and the producer threads + // are more than the consumers or producing values + // way faster than they can be processed we would + // end with values that are never proccesed. + int result = buffer[0]; + // We move the buffer -1 position each element + // so buffer[1] is now buffer[0] + for (int i = 1; i < *bufferAllocation; i++) { + buffer[i-1] = buffer[i]; + } + std::cout << "Read " << result << " from the pool." << std::endl; + // We decrement the buffer allocated size. + (*bufferAllocation)--; + } + // We increment the producerAvailableSlots since there is + // now more room to insert values in the buffer. + producerAvailableSlots->release(); + } +} +int +main(int argc, char **argv) { + // This values are defaults that the user may change, argumentAsNumber function takes care + // of this. + int numberOfProducers = 5; + int numberOfConsumers = 6; + + // First argument is the numberOfProducers, the second is the numberOfConsumers, sadly + // C++ won't allow us to have runtime defined semaphore sizes so we cannot have + // runtime defined buffer size.. + argumentAsNumber(argc, argv, 0, std::string("numberOfProducers"), &numberOfProducers); + argumentAsNumber(argc, argv, 1, std::string("numberOfConsumers"), &numberOfConsumers); + + // We do not want the threads to go out of scope in the for loop of creation. + ListOfThreads producers; + ListOfThreads consumers; + + // This variable contains the produced values. + Buffer buffer(new int[sizeBuffer]); + + // This variable takes care of the buffer size. + BufferAllocation bufferAllocation(new int); + *bufferAllocation = 0; + + /* + * We need two semaphores, one to control the buffer is still not completly filled up and + * other to control the buffer is not empty. + */ + ProducerConsumerSemaphorePtr producerAvailableSlots(new ProducerConsumerSemaphore(sizeBuffer)); + ProducerConsumerSemaphorePtr consumerAvailableSlots(new ProducerConsumerSemaphore(0)); + + // When we edit the buffer we do not want other threads to access the buffer because of race conditions. + MutexPtr bufferMutex(new std::mutex); + + // The creation of the producer threads. + for (unsigned long i = 0; i < (unsigned long) numberOfProducers; i++) { + producers.push_back(std::jthread([buffer, bufferAllocation, producerAvailableSlots, consumerAvailableSlots, bufferMutex] { + producerThread(buffer, bufferAllocation, producerAvailableSlots, consumerAvailableSlots, bufferMutex); + })); + } + // The creation of the consumer threads. + for (unsigned long i = 0; i < (unsigned long) numberOfConsumers; i++) { + consumers.push_back(std::jthread([buffer, bufferAllocation, producerAvailableSlots, consumerAvailableSlots, bufferMutex] { + consumerThread(buffer, bufferAllocation, producerAvailableSlots, consumerAvailableSlots, bufferMutex); + })); + } +} + +void +argumentAsNumber(int argc, char **argv, int numberOfArgument, std::string nameOfArgument, int *output) { + // arg: 0 has to have argc 2 2 < 2(0+2) pass, 2 < 3 (1+2) not pass. + if (argc < numberOfArgument + 2) { + std::cerr << "Argument with number " << numberOfArgument << " missing, using default for " << nameOfArgument << std::endl; + // Not enough arguments. + return; + } + try { + // Argument to number conversion. + *output = std::atoi(argv[numberOfArgument + 1]); + } catch (std::exception &exception) { + // Warning the user, that the argument could not be proccesed. + std::cerr << "Unable to convert to number " << numberOfArgument << " as number using default for " << nameOfArgument << std::endl; + } +} +``` diff --git a/include/.exists b/include/.exists new file mode 100644 index 0000000..e69de29 diff --git a/meson.build b/meson.build new file mode 100644 index 0000000..abc78d0 --- /dev/null +++ b/meson.build @@ -0,0 +1,31 @@ +project('tech.owlcode.consumer-producer', 'cpp') +add_global_arguments('-std=c++20', '-D_DEFAULT_SOURCE', language : 'cpp') + +inc = include_directories('include') + +sources = [ + 'src/main.cpp', +] + +inc = [ + 'include' +] + +link_arguments = [ +] + +executable('consumer-producer', + sources, + include_directories : inc, + install : true, + link_args : link_arguments, +) + +doxygen = find_program('doxygen', required : false) + +if doxygen.found() + message('Doxygen found') + run_target('docs', command : [doxygen, meson.source_root() + '/Doxyfile']) +else + warning('Documentation disabled without doxygen') +endif diff --git a/src/.exists b/src/.exists new file mode 100644 index 0000000..e69de29 diff --git a/src/main.cpp b/src/main.cpp new file mode 100644 index 0000000..ba2b1ba --- /dev/null +++ b/src/main.cpp @@ -0,0 +1,161 @@ +#include +#include +#include +#include +#include +#include +#include + +void +argumentAsNumber(int argc, char **argv, int numberOfArgument, std::string nameOfArgument, int *output); + +// The buffer size can be changed, the bigger it +// is, the less the producer threads would +// have to wait for a slot in the event of +// consumers being unable to keep up with +// the generated values. +const int sizeBuffer = 10; + +// Handy typedefs to reduce the line noise. +// This is very useful for shared_ptr. +typedef std::list ListOfThreads; +typedef std::shared_ptr Buffer; +typedef std::shared_ptr BufferAllocation; +typedef std::counting_semaphore ProducerConsumerSemaphore; +typedef std::shared_ptr ProducerConsumerSemaphorePtr; +typedef std::shared_ptr MutexPtr; + +void +producerThread(Buffer buffer, BufferAllocation bufferAllocation, + ProducerConsumerSemaphorePtr producerAvailableSlots, + ProducerConsumerSemaphorePtr consumerAvailableSlots, + MutexPtr bufferMutex) { + // We are going to produce in the firsts iterations + // values from 0 to 50 and then values from + // 10 to 50. + int result = 0; + while (true) { + if (result > 50) { + result = 10; + } + // We reduce the available slots for producers. + // If we cant this would wait. + producerAvailableSlots->acquire(); + { + // We lock the buffer to avoid other + // threads to mangle when we are + // still writing. + std::lock_guard lk{*bufferMutex}; + // We set the last element of the buffer + // to be the generated value. + buffer[*bufferAllocation] = result; + // We increment the buffer size, although I + // do not know if this is needed to be commented. + (*bufferAllocation)++; + } + // Now there is one more available slot to be consumed. + consumerAvailableSlots->release(); + result++; + } +} + +void +consumerThread(Buffer buffer, BufferAllocation bufferAllocation, + ProducerConsumerSemaphorePtr producerAvailableSlots, + ProducerConsumerSemaphorePtr consumerAvailableSlots, + MutexPtr bufferMutex) { + // We keep consuming forever in every consumer thread. + while (true) { + // This attempt to release the consumerAvailableSlots semaphore. + // It will wait if the operation cannot be done because + // it is already 0. + consumerAvailableSlots->acquire(); + { + // We block the buffer and the variable containing its + // size so no other thread attempts to write or read there + // while we are doing this. + std::lock_guard lk{*bufferMutex}; + // We read the first value to create a first in first out. + // If we do not do this and the producer threads + // are more than the consumers or producing values + // way faster than they can be processed we would + // end with values that are never proccesed. + int result = buffer[0]; + // We move the buffer -1 position each element + // so buffer[1] is now buffer[0] + for (int i = 1; i < *bufferAllocation; i++) { + buffer[i-1] = buffer[i]; + } + std::cout << "Read " << result << " from the pool." << std::endl; + // We decrement the buffer allocated size. + (*bufferAllocation)--; + } + // We increment the producerAvailableSlots since there is + // now more room to insert values in the buffer. + producerAvailableSlots->release(); + } +} +int +main(int argc, char **argv) { + // This values are defaults that the user may change, argumentAsNumber function takes care + // of this. + int numberOfProducers = 5; + int numberOfConsumers = 6; + + // First argument is the numberOfProducers, the second is the numberOfConsumers, sadly + // C++ won't allow us to have runtime defined semaphore sizes so we cannot have + // runtime defined buffer size.. + argumentAsNumber(argc, argv, 0, std::string("numberOfProducers"), &numberOfProducers); + argumentAsNumber(argc, argv, 1, std::string("numberOfConsumers"), &numberOfConsumers); + + // We do not want the threads to go out of scope in the for loop of creation. + ListOfThreads producers; + ListOfThreads consumers; + + // This variable contains the produced values. + Buffer buffer(new int[sizeBuffer]); + + // This variable takes care of the buffer size. + BufferAllocation bufferAllocation(new int); + *bufferAllocation = 0; + + /* + * We need two semaphores, one to control the buffer is still not completly filled up and + * other to control the buffer is not empty. + */ + ProducerConsumerSemaphorePtr producerAvailableSlots(new ProducerConsumerSemaphore(sizeBuffer)); + ProducerConsumerSemaphorePtr consumerAvailableSlots(new ProducerConsumerSemaphore(0)); + + // When we edit the buffer we do not want other threads to access the buffer because of race conditions. + MutexPtr bufferMutex(new std::mutex); + + // The creation of the producer threads. + for (unsigned long i = 0; i < (unsigned long) numberOfProducers; i++) { + producers.push_back(std::jthread([buffer, bufferAllocation, producerAvailableSlots, consumerAvailableSlots, bufferMutex] { + producerThread(buffer, bufferAllocation, producerAvailableSlots, consumerAvailableSlots, bufferMutex); + })); + } + // The creation of the consumer threads. + for (unsigned long i = 0; i < (unsigned long) numberOfConsumers; i++) { + consumers.push_back(std::jthread([buffer, bufferAllocation, producerAvailableSlots, consumerAvailableSlots, bufferMutex] { + consumerThread(buffer, bufferAllocation, producerAvailableSlots, consumerAvailableSlots, bufferMutex); + })); + } +} + +void +argumentAsNumber(int argc, char **argv, int numberOfArgument, std::string nameOfArgument, int *output) { + // arg: 0 has to have argc 2 2 < 2(0+2) pass, 2 < 3 (1+2) not pass. + if (argc < numberOfArgument + 2) { + std::cerr << "Argument with number " << numberOfArgument << " missing, using default for " << nameOfArgument << std::endl; + // Not enough arguments. + return; + } + try { + // Argument to number conversion. + *output = std::atoi(argv[numberOfArgument + 1]); + } catch (std::exception &exception) { + // Warning the user, that the argument could not be proccesed. + std::cerr << "Unable to convert to number " << numberOfArgument << " as number using default for " << nameOfArgument << std::endl; + } +}