include | ||
src | ||
Doxyfile | ||
LICENSE | ||
meson.build | ||
README.md |
Producer-Consumer Problem Solution.
Description
This is a very common problem in multithreading, one or more threads generate data in a buffer and one or more threads have to process that data.
This solution is very useful when for example listening for network connections in a multithreaded manner, various threads wait for client requests while a pool of consumer threads dispatch those requests.
The full example
Just because the solution is in this case a single file I decided to copy it in the README.md so you do not have to navigate by the project, I could not unfortunately do this with the philosophers having a dinner.
#include <memory>
#include <iostream>
#include <thread>
#include <semaphore>
#include <string>
#include <thread>
#include <list>
void
argumentAsNumber(int argc, char **argv, int numberOfArgument, std::string nameOfArgument, int *output);
// The buffer size can be changed, the bigger it
// is, the less the producer threads would
// have to wait for a slot in the event of
// consumers being unable to keep up with
// the generated values.
const int sizeBuffer = 10;
// Handy typedefs to reduce the line noise.
// This is very useful for shared_ptr.
typedef std::list<std::jthread> ListOfThreads;
typedef std::shared_ptr<int[sizeBuffer]> Buffer;
typedef std::shared_ptr<int> BufferAllocation;
typedef std::counting_semaphore<sizeBuffer> ProducerConsumerSemaphore;
typedef std::shared_ptr<ProducerConsumerSemaphore> ProducerConsumerSemaphorePtr;
typedef std::shared_ptr<std::mutex> MutexPtr;
void
producerThread(Buffer buffer, BufferAllocation bufferAllocation,
ProducerConsumerSemaphorePtr producerAvailableSlots,
ProducerConsumerSemaphorePtr consumerAvailableSlots,
MutexPtr bufferMutex) {
// We are going to produce in the firsts iterations
// values from 0 to 50 and then values from
// 10 to 50.
int result = 0;
while (true) {
if (result > 50) {
result = 10;
}
// We reduce the available slots for producers.
// If we cant this would wait.
producerAvailableSlots->acquire();
{
// We lock the buffer to avoid other
// threads to mangle when we are
// still writing.
std::lock_guard<std::mutex> lk{*bufferMutex};
// We set the last element of the buffer
// to be the generated value.
buffer[*bufferAllocation] = result;
// We increment the buffer size, although I
// do not know if this is needed to be commented.
(*bufferAllocation)++;
}
// Now there is one more available slot to be consumed.
consumerAvailableSlots->release();
result++;
}
}
void
consumerThread(Buffer buffer, BufferAllocation bufferAllocation,
ProducerConsumerSemaphorePtr producerAvailableSlots,
ProducerConsumerSemaphorePtr consumerAvailableSlots,
MutexPtr bufferMutex) {
// We keep consuming forever in every consumer thread.
while (true) {
// This attempt to release the consumerAvailableSlots semaphore.
// It will wait if the operation cannot be done because
// it is already 0.
consumerAvailableSlots->acquire();
{
// We block the buffer and the variable containing its
// size so no other thread attempts to write or read there
// while we are doing this.
std::lock_guard<std::mutex> lk{*bufferMutex};
// We read the first value to create a first in first out.
// If we do not do this and the producer threads
// are more than the consumers or producing values
// way faster than they can be processed we would
// end with values that are never proccesed.
int result = buffer[0];
// We move the buffer -1 position each element
// so buffer[1] is now buffer[0]
for (int i = 1; i < *bufferAllocation; i++) {
buffer[i-1] = buffer[i];
}
std::cout << "Read " << result << " from the pool." << std::endl;
// We decrement the buffer allocated size.
(*bufferAllocation)--;
}
// We increment the producerAvailableSlots since there is
// now more room to insert values in the buffer.
producerAvailableSlots->release();
}
}
int
main(int argc, char **argv) {
// This values are defaults that the user may change, argumentAsNumber function takes care
// of this.
int numberOfProducers = 5;
int numberOfConsumers = 6;
// First argument is the numberOfProducers, the second is the numberOfConsumers, sadly
// C++ won't allow us to have runtime defined semaphore sizes so we cannot have
// runtime defined buffer size..
argumentAsNumber(argc, argv, 0, std::string("numberOfProducers"), &numberOfProducers);
argumentAsNumber(argc, argv, 1, std::string("numberOfConsumers"), &numberOfConsumers);
// We do not want the threads to go out of scope in the for loop of creation.
ListOfThreads producers;
ListOfThreads consumers;
// This variable contains the produced values.
Buffer buffer(new int[sizeBuffer]);
// This variable takes care of the buffer size.
BufferAllocation bufferAllocation(new int);
*bufferAllocation = 0;
/*
* We need two semaphores, one to control the buffer is still not completly filled up and
* other to control the buffer is not empty.
*/
ProducerConsumerSemaphorePtr producerAvailableSlots(new ProducerConsumerSemaphore(sizeBuffer));
ProducerConsumerSemaphorePtr consumerAvailableSlots(new ProducerConsumerSemaphore(0));
// When we edit the buffer we do not want other threads to access the buffer because of race conditions.
MutexPtr bufferMutex(new std::mutex);
// The creation of the producer threads.
for (unsigned long i = 0; i < (unsigned long) numberOfProducers; i++) {
producers.push_back(std::jthread([buffer, bufferAllocation, producerAvailableSlots, consumerAvailableSlots, bufferMutex] {
producerThread(buffer, bufferAllocation, producerAvailableSlots, consumerAvailableSlots, bufferMutex);
}));
}
// The creation of the consumer threads.
for (unsigned long i = 0; i < (unsigned long) numberOfConsumers; i++) {
consumers.push_back(std::jthread([buffer, bufferAllocation, producerAvailableSlots, consumerAvailableSlots, bufferMutex] {
consumerThread(buffer, bufferAllocation, producerAvailableSlots, consumerAvailableSlots, bufferMutex);
}));
}
}
void
argumentAsNumber(int argc, char **argv, int numberOfArgument, std::string nameOfArgument, int *output) {
// arg: 0 has to have argc 2 2 < 2(0+2) pass, 2 < 3 (1+2) not pass.
if (argc < numberOfArgument + 2) {
std::cerr << "Argument with number " << numberOfArgument << " missing, using default for " << nameOfArgument << std::endl;
// Not enough arguments.
return;
}
try {
// Argument to number conversion.
*output = std::atoi(argv[numberOfArgument + 1]);
} catch (std::exception &exception) {
// Warning the user, that the argument could not be proccesed.
std::cerr << "Unable to convert to number " << numberOfArgument << " as number using default for " << nameOfArgument << std::endl;
}
}