Go to file
2023-04-20 01:25:20 +02:00
include Adding initial commit. 2023-04-20 01:25:20 +02:00
src Adding initial commit. 2023-04-20 01:25:20 +02:00
Doxyfile Adding initial commit. 2023-04-20 01:25:20 +02:00
LICENSE Initial commit 2023-04-20 01:19:38 +02:00
meson.build Adding initial commit. 2023-04-20 01:25:20 +02:00
README.md Adding initial commit. 2023-04-20 01:25:20 +02:00

Producer-Consumer Problem Solution.

Description

This is a very common problem in multithreading, one or more threads generate data in a buffer and one or more threads have to process that data.

This solution is very useful when for example listening for network connections in a multithreaded manner, various threads wait for client requests while a pool of consumer threads dispatch those requests.

The full example

Just because the solution is in this case a single file I decided to copy it in the README.md so you do not have to navigate by the project, I could not unfortunately do this with the philosophers having a dinner.

#include <memory>
#include <iostream>
#include <thread>
#include <semaphore>
#include <string>
#include <thread>
#include <list>

void
argumentAsNumber(int argc, char **argv, int numberOfArgument, std::string nameOfArgument, int *output);

// The buffer size can be changed, the bigger it
// is, the less the producer threads would
// have to wait for a slot in the event of
// consumers being unable to keep up with
// the generated values.
const int sizeBuffer = 10;

// Handy typedefs to reduce the line noise.
// This is very useful for shared_ptr.
typedef std::list<std::jthread> ListOfThreads;
typedef std::shared_ptr<int[sizeBuffer]> Buffer;
typedef std::shared_ptr<int> BufferAllocation;
typedef std::counting_semaphore<sizeBuffer> ProducerConsumerSemaphore;
typedef std::shared_ptr<ProducerConsumerSemaphore> ProducerConsumerSemaphorePtr;
typedef std::shared_ptr<std::mutex> MutexPtr;

void
producerThread(Buffer buffer, BufferAllocation bufferAllocation,
        ProducerConsumerSemaphorePtr producerAvailableSlots,
        ProducerConsumerSemaphorePtr consumerAvailableSlots,
        MutexPtr bufferMutex) {
    // We are going to produce in the firsts iterations 
    // values from 0 to 50 and then values from
    // 10 to 50.
    int result = 0;
    while (true) {
        if (result > 50) {
            result = 10;
        }
        // We reduce the available slots for producers.
        // If we cant this would wait.
        producerAvailableSlots->acquire();
        {
            // We lock the buffer to avoid other
            // threads to mangle when we are
            // still writing.
            std::lock_guard<std::mutex> lk{*bufferMutex};
            // We set the last element of the buffer
            // to be the generated value.
            buffer[*bufferAllocation] = result;
            // We increment the buffer size, although I
            // do not know if this is needed to be commented.
            (*bufferAllocation)++;
        }
        // Now there is one more available slot to be consumed.
        consumerAvailableSlots->release();
        result++;
    }
}

void
consumerThread(Buffer buffer, BufferAllocation bufferAllocation,
        ProducerConsumerSemaphorePtr producerAvailableSlots,
        ProducerConsumerSemaphorePtr consumerAvailableSlots,
        MutexPtr bufferMutex) {
    // We keep consuming forever in every consumer thread.
    while (true) {
        // This attempt to release the consumerAvailableSlots semaphore.
        // It will wait if the operation cannot be done because
        // it is already 0.
        consumerAvailableSlots->acquire();
        {
            // We block the buffer and the variable containing its
            // size so no other thread attempts to write or read there
            // while we are doing this.
            std::lock_guard<std::mutex> lk{*bufferMutex};
            // We read the first value to create a first in first out.
            // If we do not do this and the producer threads
            // are more than the consumers or producing values
            // way faster than they can be processed we would
            // end with values that are never proccesed.
            int result = buffer[0];
            // We move the buffer -1 position each element
            // so buffer[1] is now buffer[0]
            for (int i = 1; i < *bufferAllocation; i++) {
                buffer[i-1] = buffer[i];
            }
            std::cout << "Read " << result << " from the pool." << std::endl;
            // We decrement the buffer allocated size.
            (*bufferAllocation)--;
        }
        // We increment the producerAvailableSlots since there is
        // now more room to insert values in the buffer.
        producerAvailableSlots->release();
    }
}
int
main(int argc, char **argv) {
    // This values are defaults that the user may change, argumentAsNumber function takes care
    // of this.
    int numberOfProducers = 5;
    int numberOfConsumers = 6;

    // First argument is the numberOfProducers, the second is the numberOfConsumers, sadly
    // C++ won't allow us to have runtime defined semaphore sizes so we cannot have
    // runtime defined buffer size..
    argumentAsNumber(argc, argv, 0, std::string("numberOfProducers"), &numberOfProducers);
    argumentAsNumber(argc, argv, 1, std::string("numberOfConsumers"), &numberOfConsumers);

    // We do not want the threads to go out of scope in the for loop of creation.
    ListOfThreads producers;
    ListOfThreads consumers;

    // This variable contains the produced values.
    Buffer buffer(new int[sizeBuffer]);

    // This variable takes care of the buffer size.
    BufferAllocation bufferAllocation(new int);
    *bufferAllocation = 0;

    /*
     * We need two semaphores, one to control the buffer is still not completly filled up and
     * other to control the buffer is not empty.
     */
    ProducerConsumerSemaphorePtr producerAvailableSlots(new ProducerConsumerSemaphore(sizeBuffer));
    ProducerConsumerSemaphorePtr consumerAvailableSlots(new ProducerConsumerSemaphore(0));

    // When we edit the buffer we do not want other threads to access the buffer because of race conditions.
    MutexPtr bufferMutex(new std::mutex);

    // The creation of the producer threads.
    for (unsigned long i = 0; i < (unsigned long) numberOfProducers; i++) {
        producers.push_back(std::jthread([buffer, bufferAllocation, producerAvailableSlots, consumerAvailableSlots, bufferMutex] {
            producerThread(buffer, bufferAllocation, producerAvailableSlots, consumerAvailableSlots, bufferMutex);
        }));
    }
    // The creation of the consumer threads.
    for (unsigned long i = 0; i < (unsigned long) numberOfConsumers; i++) {
        consumers.push_back(std::jthread([buffer, bufferAllocation, producerAvailableSlots, consumerAvailableSlots, bufferMutex] {
            consumerThread(buffer, bufferAllocation, producerAvailableSlots, consumerAvailableSlots, bufferMutex);
        }));
    }
}

void
argumentAsNumber(int argc, char **argv, int numberOfArgument, std::string nameOfArgument, int *output) {
    // arg: 0 has to have argc 2 2 < 2(0+2) pass, 2 < 3 (1+2) not pass.
    if (argc < numberOfArgument + 2) {
        std::cerr << "Argument with number " <<  numberOfArgument << " missing, using default  for " << nameOfArgument << std::endl;
        // Not enough arguments.
        return;
    }
    try {
        // Argument to number conversion.
        *output = std::atoi(argv[numberOfArgument + 1]);
    } catch (std::exception &exception) {
        // Warning the user, that the argument could not be proccesed.
        std::cerr << "Unable to convert to number " << numberOfArgument << " as number using default for " << nameOfArgument << std::endl;
    }
}