# Producer-Consumer Problem Solution. ## Description This is a very common problem in multithreading, one or more threads generate data in a buffer and one or more threads have to process that data. This solution is very useful when for example listening for network connections in a multithreaded manner, various threads wait for client requests while a pool of consumer threads dispatch those requests. ## The full example Just because the solution is in this case a single file I decided to copy it in the README.md so you do not have to navigate by the project, I could not unfortunately do this with the philosophers having a dinner. ```cpp #include #include #include #include #include #include #include void argumentAsNumber(int argc, char **argv, int numberOfArgument, std::string nameOfArgument, int *output); // The buffer size can be changed, the bigger it // is, the less the producer threads would // have to wait for a slot in the event of // consumers being unable to keep up with // the generated values. const int sizeBuffer = 10; // Handy typedefs to reduce the line noise. // This is very useful for shared_ptr. typedef std::list ListOfThreads; typedef std::shared_ptr Buffer; typedef std::shared_ptr BufferAllocation; typedef std::counting_semaphore ProducerConsumerSemaphore; typedef std::shared_ptr ProducerConsumerSemaphorePtr; typedef std::shared_ptr MutexPtr; void producerThread(Buffer buffer, BufferAllocation bufferAllocation, ProducerConsumerSemaphorePtr producerAvailableSlots, ProducerConsumerSemaphorePtr consumerAvailableSlots, MutexPtr bufferMutex) { // We are going to produce in the firsts iterations // values from 0 to 50 and then values from // 10 to 50. int result = 0; while (true) { if (result > 50) { result = 10; } // We reduce the available slots for producers. // If we cant this would wait. producerAvailableSlots->acquire(); { // We lock the buffer to avoid other // threads to mangle when we are // still writing. std::lock_guard lk{*bufferMutex}; // We set the last element of the buffer // to be the generated value. buffer[*bufferAllocation] = result; // We increment the buffer size, although I // do not know if this is needed to be commented. (*bufferAllocation)++; } // Now there is one more available slot to be consumed. consumerAvailableSlots->release(); result++; } } void consumerThread(Buffer buffer, BufferAllocation bufferAllocation, ProducerConsumerSemaphorePtr producerAvailableSlots, ProducerConsumerSemaphorePtr consumerAvailableSlots, MutexPtr bufferMutex) { // We keep consuming forever in every consumer thread. while (true) { // This attempt to release the consumerAvailableSlots semaphore. // It will wait if the operation cannot be done because // it is already 0. consumerAvailableSlots->acquire(); { // We block the buffer and the variable containing its // size so no other thread attempts to write or read there // while we are doing this. std::lock_guard lk{*bufferMutex}; // We read the first value to create a first in first out. // If we do not do this and the producer threads // are more than the consumers or producing values // way faster than they can be processed we would // end with values that are never proccesed. int result = buffer[0]; // We move the buffer -1 position each element // so buffer[1] is now buffer[0] for (int i = 1; i < *bufferAllocation; i++) { buffer[i-1] = buffer[i]; } std::cout << "Read " << result << " from the pool." << std::endl; // We decrement the buffer allocated size. (*bufferAllocation)--; } // We increment the producerAvailableSlots since there is // now more room to insert values in the buffer. producerAvailableSlots->release(); } } int main(int argc, char **argv) { // This values are defaults that the user may change, argumentAsNumber function takes care // of this. int numberOfProducers = 5; int numberOfConsumers = 6; // First argument is the numberOfProducers, the second is the numberOfConsumers, sadly // C++ won't allow us to have runtime defined semaphore sizes so we cannot have // runtime defined buffer size.. argumentAsNumber(argc, argv, 0, std::string("numberOfProducers"), &numberOfProducers); argumentAsNumber(argc, argv, 1, std::string("numberOfConsumers"), &numberOfConsumers); // We do not want the threads to go out of scope in the for loop of creation. ListOfThreads producers; ListOfThreads consumers; // This variable contains the produced values. Buffer buffer(new int[sizeBuffer]); // This variable takes care of the buffer size. BufferAllocation bufferAllocation(new int); *bufferAllocation = 0; /* * We need two semaphores, one to control the buffer is still not completly filled up and * other to control the buffer is not empty. */ ProducerConsumerSemaphorePtr producerAvailableSlots(new ProducerConsumerSemaphore(sizeBuffer)); ProducerConsumerSemaphorePtr consumerAvailableSlots(new ProducerConsumerSemaphore(0)); // When we edit the buffer we do not want other threads to access the buffer because of race conditions. MutexPtr bufferMutex(new std::mutex); // The creation of the producer threads. for (unsigned long i = 0; i < (unsigned long) numberOfProducers; i++) { producers.push_back(std::jthread([buffer, bufferAllocation, producerAvailableSlots, consumerAvailableSlots, bufferMutex] { producerThread(buffer, bufferAllocation, producerAvailableSlots, consumerAvailableSlots, bufferMutex); })); } // The creation of the consumer threads. for (unsigned long i = 0; i < (unsigned long) numberOfConsumers; i++) { consumers.push_back(std::jthread([buffer, bufferAllocation, producerAvailableSlots, consumerAvailableSlots, bufferMutex] { consumerThread(buffer, bufferAllocation, producerAvailableSlots, consumerAvailableSlots, bufferMutex); })); } } void argumentAsNumber(int argc, char **argv, int numberOfArgument, std::string nameOfArgument, int *output) { // arg: 0 has to have argc 2 2 < 2(0+2) pass, 2 < 3 (1+2) not pass. if (argc < numberOfArgument + 2) { std::cerr << "Argument with number " << numberOfArgument << " missing, using default for " << nameOfArgument << std::endl; // Not enough arguments. return; } try { // Argument to number conversion. *output = std::atoi(argv[numberOfArgument + 1]); } catch (std::exception &exception) { // Warning the user, that the argument could not be proccesed. std::cerr << "Unable to convert to number " << numberOfArgument << " as number using default for " << nameOfArgument << std::endl; } } ```