mario::konrad
programming / C++ / sailing / nerd stuff
Multithreading: Tutorial 06: Producer-Consumer in C++
© 2003 / Mario Konrad

Introduction

Prequisites for this tutorial is knowledge in programming lanugage (C++98), basics about threading (see previous tutorials).

This tutorial shows how to write a wrapper arround the PThreads API (for C) in C++. It also shows a well known pattern (not GoF): Producer-Consumer using the wrapper. To fully understand this tutorial you should be able at least to read C++ code and templates. I try to explain as clear as possible, but I think is necessary you have at least some minor experience in writing a template.

Design

The design is more complicated than in the predecessor tutorial, but still very easy. The best is, it shows the relations between the classes.

Walkthrough

This chapter goes through the classes and describes the important things. All language and syntax specific things are omitted.

Thread

This class has already been examined in the previous tutorial, see Tutorial 5.

Mutex

The declaration of the Mutex class relveals no secrets. This is because there are no secrets. This class is very straight forward. The most common method are there, nothing special.

class Mutex
{
    private:
        pthread_mutex_t mutex;
    public:
        Mutex();
        ~Mutex();
        void lock();
        void unlock();
        bool trylock();
};

The implementation of the Mutex class shows our expectations. There is nothing spectacular, just a wrapper arround the C API. Nothing new, we have already seen all elements in previous tutorials.

Mutex::Mutex()
{
    pthread_mutex_init(&mutex, 0);
}

Mutex::~Mutex()
{
    pthread_mutex_destroy(&mutex);
}

void Mutex::lock()
{
    pthread_mutex_lock(&mutex);
}

void Mutex::unlock()
{
    pthread_mutex_unlock(&mutex);
}

bool Mutex::trylock()
{
    return (pthread_mutex_trylock(&mutex) == 0);
}

Pool

The Pool is somewhat special. It is a template that provides a queue for any data type. The special thing is it supports explicitly multithreading. All operations (size, push and pop) are thread-safe.

template <class T> class Pool
{

In the private section are the encapsulated attributes of this class. For the data container, we choose the queue of the standard library. To protect them from mutual exclusive accesses, there is the mutex variable mtx. The two other variables are just configurable values for the maximum size (max_size) of the container and the time in usec that should be waited between two checks of the mutex.

    std::queue<T> data;
    Mutex mtx;
    size_t max_size;
    size_t spin_time;

The constructor initializes the values, nothing more.

    Pool(size_t max_size = 10, size_t spin_time = 50)
        : max_size(max_size), spin_time(spin_time)
        {}

The descructor clears all the data.

    ~Pool()
        {
            mtx.lock();
            while (data.size()) data.pop();
            mtx.unlock();
        }

The method size returns the number of elements within the container. This value is between 0 and max_size. Access to data is done within the cricital part.

    size_t size() const
        {
            mtx.lock();
            size_t result = data.size();
            mtx.unlock();
            return result;
        }

The method push pushes a new data item into the queue. The mutual exclusion takes care of the data, so it doesn’t get accessed during this critical operation. If the queue is already full, the loop takes care with leaving the critical part (unlocking the mutex), waits some time and makes another try. This is tried until there is a free place in the queue.

This mechanism could be implemented more elegantly if we would use condition variable. But this is not the focus of this tutorial.

    void push(T item)
        {
            mtx.lock();
            while (data.size() >= max_size)
            {
                mtx.unlock();
                usleep(spin_time);
                mtx.lock();
            }
            data.push(item);
            mtx.unlock();
        }

The method pop is the exact opposite of push. It tries to read from the queue. If there is no data within the queue, it waits until there is data available again. Condition variables would be a nicer solution but, as already mentioned, not part of this tutorial.

    T pop()
        {
            mtx.lock();
            while (data.size() <= 0)
            {
                mtx.unlock();
                usleep(spin_time);
                mtx.lock();
            }
            T item = data.front(); data.pop();
            mtx.unlock();
            return item;
        }

Producer

Producers are those that generate the data and push them into the queue. The declaration of the class is quite unspectacular because the thread part is already done in the superclass. The real work (producer related) is done in the implemenation of the method run.

class Producer : public Thread
{
    private:
        Pool`<string>` * pool;
        string id;
        size_t num_items;
    protected:
        virtual void run();
    public:
        Producer(Pool`<string>` * pool, string id)
            : pool(pool), id(id), num_items(0) {}
        void produce(size_t);
};

The normal thread gets started calling the method start. The producers should be called using the method produce with the number of data item to generate as parameter. This method calls start within the code. To use the method directly it would mean that the user of this class would have to call two methods: one to tell the amount of items, one to start the producer.

void Producer::produce(size_t num_items)
{
    if (!pool) return;
    if (num_items <= 0) return;
    this->num_items = num_items;
    start();
}

This is the heart of the producer. The method run pushed the specified (num_items) number of items into the data pool and then terminates.

void Producer::run()
{
    if (!pool) return;
    for (size_t i = 0; i < num_items; ++i)
    {
        pool->push(id);
        usleep(100);
    }
}

Consumer

Consumers are responsible to take data items from a queue and do something with it. The declaration is also not very difficult.

class Consumer : public Thread
{
    private:
        static Mutex mtx;
        Pool`<string>` * pool;
        string id;
        size_t num_items;
    protected:
        virtual void run();
    public:
        Consumer(Pool`<string>` * pool, string id)
            : pool(pool), id(id), num_items(0) {}
        void consume(size_t);
};

We need another mutex to make sure that, even with more than one consumer, the output does not get “dirty” (multiple outputs merged together).

Mutex Consumer::mtx;

To start the thread use this method: consume. You may specify the amount of data items this consumer shall process.

void Consumer::consume(size_t num_items)
{
    if (!pool) return;
    if (num_items <= 0) return;
    this->num_items = num_items;
    start();
}

The work within the consumer is done in this method: run. The consumer takes data items from the pool and prints them out to the standard output. This is done for the specified amount of data items.

void Consumer::run()
{
    if (!pool) return;
    for (size_t i = 0; i < num_items; ++i)
    {
        string item = pool->pop();
        mtx.lock();
        cout `<< "[" << id << "] : <" << item << ">`" << endl;
        mtx.unlock();
    }
}

Main Program

The inter-thread communication is shown in the figure below. There are three threads: one producer and two consumers. The communication between the producer and the consumers is done using a pool of data. Since it’s a queue, it made sure the first produced data is processed as early as possible (FIFO = first in, first out). Have a look at this MASCOT-2 diagram:

The implementation of the main function is narrowed to the creation of some objects, starting the threads and then waiting until the threads have done their jobs. Nothing we haven’t already seen.

int main(void)
{
    Pool`<string>` pool;
    Producer prod(&pool, "Producer 1");
    Consumer cons0(&pool, "Consumer 0");
    Consumer cons1(&pool, "Consumer 1");

    cons0.consume(5);
    cons1.consume(5);
    prod.produce(10);

    prod.join();
    cons0.join();
    cons1.join();

    return 0;
}

Compilation and Execution

Since this tutorial does not contain one single file, this chapter shows briefly how to compile and run the code. Don’t expect much explanation.

Compile and link the tutorial:

$ g++ -o Thread.o -c Thread.cpp
$ g++ -o Mutex.o -c Mutex.cpp
$ g++ -o Producer.o -c Producer.cpp
$ g++ -o Consumer.o -c Consumer.cpp
$ g++ -c tutorial6.cpp -o tutorial6.o
$ g++ -o tutorial6 tutorial6.o Thread.o Mutex.o Producer.o Consumer.o -lstdc++ -lpthread

Where is Pool you might ask. Pool is a template in therefore implicitly included.

Feel free to write a Makefile on your own.

Now run the program, as usual. Please note that the tutorial has no checks whether the operations worked or not, but nothing should go wrong.

$ ./tutorial6

Download Source Code

All source code files provided by this page is free to copy, modify, redistribute and use for any purpose. Use it on your own risk. The tutorial is copyrighted by Mario Konrad.

The zip archive contains: