How to solve the producer-consumer problem in Java — vivid example (multithreading)

Multithreading in Java (4 Part Series)

1 Multithreading in Java Part 1 – Process vs Thread
2 🤯 Thread, Runnable, Callable, ExecutorService, and Future – all the ways to create threads in Java
3 ️ What is a Race Condition in Java, and how it can be prevented using synchronized and AtomicInteger
4 How to solve the producer-consumer problem in Java — vivid example (multithreading)

Hello guys, in the previous tutorial, we created a simple program that consisted of DonutStorage, and a Consumer who consumed donuts. We had multiple consumers run by different threads at the same time. We had to synchronize them to solve a race condition.

But what if we also want to include producers? We’ll first need to change the app to follow the best OOP practices. Second, we’ll get a producer-consumer problem, find out what it is, and solve it with wait and notify methods. Then we’ll learn how it can be easily done with BlockingQueue. Let us begin!

Table of Contents

 1. Adding producers
 2. Producer-consumer problem
 3. Wait and notify
       1) On what object to call the methods
       2) What’s an InterruptedException
       3) What happens when a thread meets the wait method
       4) What happens when the thread gets notified
       5) Why use notifyAll over notify
       6) The importance of keeping wait within a while loop
       7) When should you notify?

 4. BlockingQueue
 5. BlockingQueue vs wait and notify
 6. Further reading

Adding producers

Code recap from the previous tutorial:



public class DonutStorage {
    private int donutsNumber;
    public DonutStorage(int donutsNumber) { this.donutsNumber = donutsNumber; }
    public int getDonutsNumber() { return donutsNumber; }
    public void setDonutsNumber(int donutsNumber) { this.donutsNumber = donutsNumber; }
}

public class Consumer {
    private final DonutStorage donutStorage;
    public Consumer(DonutStorage donutStorage) {
        this.donutStorage = donutStorage;
    }

    /** * Subtracts the given number from the DonutStorage's donutsNumber. If the given number is bigger * than the number of donuts in stock, sets the donutsNumber to 0. * @param numberOfItemsToConsume Number that will be subtracted from the donutsNumber * @return the number of consumed items */
    public int consume(int numberOfItemsToConsume) {
        synchronized (donutStorage) {
            int donutsNumber = donutStorage.getDonutsNumber();
            // if there aren't enough donuts in stock, consume as many as there are
            if (numberOfItemsToConsume > donutsNumber) {
                donutStorage.setDonutsNumber(0);
                return donutsNumber;
            }
            donutStorage.setDonutsNumber(donutsNumber - numberOfItemsToConsume);
            return numberOfItemsToConsume;
        }
    }
}

public class Main {
    public static void main(String[] args) {
        int consumersNumber = 10;
        DonutStorage donutStorage = new DonutStorage(20);
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        List<Future<?>> futures = new ArrayList<>(consumersNumber);
        for (int i = 0; i < consumersNumber; i++) {
            futures.add(executor.submit(() -> {
                Consumer consumer = new Consumer(donutStorage);
                System.out.println(Thread.currentThread().getName() + " consumed " +
                        consumer.consume(3));
            }));
        }
        executor.shutdown();

        // make the main thread wait for others to finish
        for (Future<?> future: futures) {
            try {
                future.get();
            } catch (InterruptedException | ExecutionException e) {
                System.out.println("Exception while getting from future" + e.getMessage());
                e.printStackTrace();
            }
        }

        System.out.println("Number of remaining donuts: " + donutStorage.getDonutsNumber());
    }
}


Enter fullscreen mode Exit fullscreen mode

So we have the DonutStorage with a single variable donutsNumber, getters and setters for it, and a constructor. We also have the Consumer class, which has its instance of DonutStorage and a single method for consuming items. In the Main class, we create a DonutStorage with the initial donutsNumber equal to 20. Then, we create 10 separate threads. Each creates a new Consumer and consumes 3 items. In addition, we want to print how many donuts are left before the program exits, but to make this work, we need to create a list of futures and loop through them. This way, the main thread will wait until the others finish.

To add producers, let’s first figure out what they are going to do. They will simply add the number specified in the param to the donutsNumber. But we also want some protection because the storage probably has some limits. So let’s also add a donutsCapacity field to the DonutStorage. And before updating the donutsNumber, we’ll check if there is enough space to add this number of donuts. If not, we’ll add as many as possible and return the number of added donuts. Here is the code:



public int produce(int numberOfItems) {
    int donutsNumber = donutStorage.getDonutsNumber();
  int donutsCapacity = donutStorage.getDonutsCapacity();

    // Represents a number of donuts the client can put before the storage
    // capacity is reached.
  int availableSpace = donutsCapacity - donutsNumber;
  if (numberOfItems > availableSpace) {
      donutStorage.setDonutsNumber(donutsCapacity);
      return availableSpace;
  }
  donutStorage.setDonutsNumber(donutsNumber + numberOfItems);
  return numberOfItems;
}


Enter fullscreen mode Exit fullscreen mode

But it wouldn’t be a good decision to create a separate class called Producer. That’s because the Producer and Consumer classes would be almost identical. They would both have a DonutStorage instance and a single method with the same argument. The only thing that would be different is the operation they would do, which would be defined in the body of that method. In addition, the operation should be done synchronously, which would be difficult to realize using a superclass and its subclasses.

That’s why I propose creating one class and passing the operation it’s meant to do in the constructor. To achieve this in Java, we’ll need to create a FunctionalInterface and pass its instance in the constructor (ala functional programming):



/** * Client is meant to be a producer or consumer. Each client has access to the * DonutStorage does the respective action with donuts. */
public class Client {

    private final ClientOperation clientOperation;
    private final DonutStorage donutStorage;

    public Client(ClientOperation clientOperation, DonutStorage donutStorage) {
        this.clientOperation = clientOperation;
        this.donutStorage = donutStorage;
    }

     /** * *@paramnumberOfItems number that represents how to change the donutsNumber *@return number that represents how the donutsNumber was changed */
        public int operate(int numberOfItems) {
        synchronized (donutStorage) {
            return clientOperation.operate(donutStorage, numberOfItems);
        }
    }

        @FunctionalInterface
    public interface ClientOperation {
        int operate(DonutStorage donutStorage, int numberOfItems);
    }
}



Enter fullscreen mode Exit fullscreen mode

This way, when someone creates a Client, they will have to specify the operation they want the Client to do. And this also guarantees that the operation will be done synchronously because of the synchronized block within the operate method.

For now, we plan the Client to be either a producer or a consumer, whereas other classes are allowed to define their own functionality of the Client. To avoid writing boilerplate code and possibly some bugs in the future, let’s define this functionality right inside the Client class. We can achieve this using public static final variables:



public static final ClientOperation consume = (donutStorage, numberOfItems) -> {
    int donutsNumber = donutStorage.getDonutsNumber();
    // if there aren't enough donuts in stock, consume as many as there are
    if (numberOfItems > donutsNumber) {
        donutStorage.setDonutsNumber(0);
        return donutsNumber;
    }
    donutStorage.setDonutsNumber(donutsNumber - numberOfItems);
    return numberOfItems;
};

public static final ClientOperation produce = (donutStorage, numberOfItems) -> {
    int donutsNumber = donutStorage.getDonutsNumber();
    int donutsCapacity = donutStorage.getDonutsCapacity();

// Represents a number of donuts the client can put before the storage
// capacity is reached.
    int availableSpace = donutsCapacity - donutsNumber;
    if (numberOfItems > availableSpace) {
        donutStorage.setDonutsNumber(donutsCapacity);
        return availableSpace;
    }
    donutStorage.setDonutsNumber(donutsNumber + numberOfItems);
    return numberOfItems;
};


Enter fullscreen mode Exit fullscreen mode

Here lambdas shorten the code of creating a new instance of ClientOperation and overriding the operate method.

And finally, let’s make the ClientOperation private. This way, we’ll restrict defining a custom functionality to the Client. It will either produce or consume items.

Now we don’t need the Consumer class anymore. The only thing left is to make minor changes to the Main class:

You can compare the old and the updated versions here.



Client consumer = new Client(Client.consume, donutStorage);
System.out.println(Thread.currentThread().getName() + " consumed " +
            consumer.operate(3));



Enter fullscreen mode Exit fullscreen mode

Producer-consumer problem

Now let’s test our program. Let’s create 10 consumers, each consuming 5 items, and 5 producers, each producing 6 items.



int consumersNumber = 10;
int producersNumber = 5;
DonutStorage donutStorage = new DonutStorage(20);
ExecutorService executor = Executors.newFixedThreadPool(consumersNumber+producersNumber);
List<Future<?>> futures = new ArrayList<>(consumersNumber);
for (int i = 0; i < consumersNumber; i++) {
    futures.add(executor.submit(() -> {
        Client consumer = new Client(Client.consume, donutStorage);
        System.out.println(Thread.currentThread().getName() + " consumed " +
                consumer.operate(5));
    }));
}
for (int i = 0; i < producersNumber; i++) {
    futures.add(executor.submit(() -> {
        Client producer = new Client(Client.produce, donutStorage);
        System.out.println(Thread.currentThread().getName() + " produced " +
                producer.operate(6));
    }));
}


Enter fullscreen mode Exit fullscreen mode

The initial number of donuts in the storage is 20. So we expect the remaining number of items to be 0. But in reality, I get the resulting number equal to 30. Here is the full output:



pool-1-thread-2 consumed 0
pool-1-thread-3 consumed 5
pool-1-thread-1 consumed 5
pool-1-thread-7 consumed 5
pool-1-thread-5 consumed 0
pool-1-thread-1 produced 6
pool-1-thread-7 produced 6
pool-1-thread-8 consumed 0
pool-1-thread-6 consumed 5
pool-1-thread-7 produced 6
pool-1-thread-1 produced 6
pool-1-thread-5 produced 6
pool-1-thread-3 consumed 0
pool-1-thread-2 consumed 0
pool-1-thread-4 consumed 0
Number of remaining donuts: 30


Enter fullscreen mode Exit fullscreen mode

What happened is all the consumers finished their work (consumed 20 items) before the producers came and added 30 extra donuts. And these events weren’t printed in the order because of the thread interleaving.

Wait and notify

The problem can be solved with wait and notify. They are methods of the Object class that block the current thread until it gets notified. So we could just call wait on the consumer threads when there are not enough items in the stock, and notify them when we are sure that the stock is not empty anymore. Similarly, we could block the producers when the storage is full and notify them, when a consumer takes an item.

So is that how the updated consumer part would look like?



public static final ClientOperation consume= (donutStorage, numberOfItems) -> {
    int donutsNumber = donutStorage.getDonutsNumber();
    // if there aren't enough donuts in stock, consume as many as there are
    if (numberOfItems > donutsNumber) {
        donutStorage.setDonutsNumber(0);
        wait();     // wait until producers add enough items
    }
    donutStorage.setDonutsNumber(donutsNumber - numberOfItems);
    // notify producers that it's not full anymore as we've just took some items
    notify();
    return numberOfItems;
};


Enter fullscreen mode Exit fullscreen mode

Unfortunately, it’s not that easy. We must consider the following:

1. On what object to call the methods

The code above would end with the exceptions:



Non-static method 'wait()' cannot be referenced from a static context
Non-static method 'notify()' cannot be referenced from a static context


Enter fullscreen mode Exit fullscreen mode

It also wouldn’t work if we defined separate objects that represent the fullness and emptiness of the storage. We would get an

IllegalMonitorStateException – thrown if the current thread is not the owner of the object’s monitor.

This phrase is written so clearly that we don’t understand anything. But I hope to clear it up in a moment ⬇️

If we were allowed to call wait() and notify() on some random object we would end up with lots of bugs. That’s because we may have another part of the program where other threads are waiting for something else to happen.

The exception means that they want us to call wait and notify on that object we synchronized our method on. In our case, it’s the donutStorage. If we call it on another object, it will not work. It also won’t work if we call the methods in an unsynchronized block of code.

2. What’s an InterruptedException

We are obliged to handle this exception that may be thrown by the wait method. It’s an exception that indicates that someone canceled the operation or the OS wants to terminate the thread. If that happens, the thread does not finish its work. It will do what is written in the try/catch block. Read more about how to handle InterruptedException.

In our case, we will just print that the thread was interrupted and return the number of consumed/produced items.

3. What happens when a thread meets the wait method

When the wait method is called, the first thing the thread does is release the lock it has held. Otherwise, other threads would be waiting for that lock and will not be able to update the variable. And the program would come to a deadlock situation.

4. What happens when the thread gets notified

The thread continues its work from that line of code with the wait method. So we should write some code to execute after that. In our case, the consumer will consume the rest of the items:



public static final ClientOperation consume= (donutStorage, numberOfItems) -> {
    // if there aren't enough donuts in stock, consume as many as there are and wait for more
    if (numberOfItems > donutStorage.getDonutsNumber()) {
        int numberOfTakenItems = donutStorage.getDonutsNumber();
        int numberOfItemsToTake = numberOfItems - numberOfTakenItems;
        donutStorage.setDonutsNumber(0);
        try {
            donutStorage.wait();     // wait until producers add enough items
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() +
                    " was interrupted and didn't consume the desired amount of items.");
            return numberOfTakenItems;
        }
        donutStorage.setDonutsNumber(donutStorage.getDonutsNumber() - numberOfItemsToTake);
    } else donutStorage.setDonutsNumber(donutStorage.getDonutsNumber() - numberOfItems);
    // notify producers that it's not full anymore as we've just took some items
    donutStorage.notify();
    return numberOfItems;
};


Enter fullscreen mode Exit fullscreen mode

Mind you that we no longer use a local variable as a contraction for donutStorage.getDonutsNumber(). That’s because this variable is no longer constant as the thread calling the wait() method releases the lock and the donutsNumber gets updated by other threads.

5. Why use notifyAll over notify

Another method is notifyAll(), which wakes up all the threads that are waiting. This is different from notify(), which only wakes up one random thread from the wait set. notify() is more efficient, but it has a risk. If the chosen thread cannot continue, it will be blocked again. If no other thread calls notify() again, the system will deadlock.

6. The importance of keeping wait within a while loop

So it’s better to use notifyAll(). This brings another problem. When one of the multiple consumers gets notified, it might consume some items, leaving the others with insufficient items.

Even though the access to the donutStorage is synchronized, a thread may acquire a lock, and consume items. When another thread acquires that lock, it will continue from that line of code where the wait method is and will not double-check the condition. So, for preventing that, it’s recommended to always put the wait method in the while loop, even when you use notify.

7. When should you notify?

The rule of thumb is to use notify whenever the object’s state changes in a way that could benefit the threads that are waiting. For example, when the numberOfDonuts changes, the waiting threads should have another opportunity to check it.

With all being said, here is the updated code ⬇️. You can also compare the old and the updated versions here.



public static final ClientOperation consume = (donutStorage, numberOfItems) -> {
    if (numberOfItems > donutStorage.getDonutsNumber()) {

        // if there aren't enough donuts in stock, consume as many as there are
        int numberOfTakenItems = donutStorage.getDonutsNumber();
        int numberOfItemsToTake = numberOfItems - numberOfTakenItems;
        donutStorage.setDonutsNumber(0);

        // but wait in case producers put some more items
        while (numberOfItemsToTake > donutStorage.getDonutsNumber()) {
            try {
                donutStorage.wait();
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() +
                        " was interrupted and didn't consume the desired amount of items.");
                return numberOfTakenItems;
            }
        }
        donutStorage.setDonutsNumber(donutStorage.getDonutsNumber() - numberOfItemsToTake);
    } else donutStorage.setDonutsNumber(donutStorage.getDonutsNumber() - numberOfItems);

    // notify the producers that it's not full anymore as we've just took some items
    donutStorage.notifyAll();
    return numberOfItems;
};

public static final ClientOperation produce = (donutStorage, numberOfItems) -> {
    final int donutsCapacity = donutStorage.getDonutsCapacity();

    // Represents a number of donuts the client can put before the storage
    // capacity is reached. We will take exactly this amount of donuts if
    // there is not enough space.
    int availableSpace = donutsCapacity - donutStorage.getDonutsNumber();

    // Number of items the producer hasn't put yet
    int numberOfItemsToPut = numberOfItems - availableSpace;

    if (numberOfItems > (availableSpace)) {
        donutStorage.setDonutsNumber(donutsCapacity);

        while (numberOfItems > (donutsCapacity - donutStorage.getDonutsNumber())) {
            try {
                donutStorage.wait();
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() +
                        " was interrupted and didn't produce the desired amount of items.");
                return availableSpace;
            }
        }
        donutStorage.setDonutsNumber(donutStorage.getDonutsNumber() + numberOfItemsToPut);
    } else donutStorage.setDonutsNumber(donutStorage.getDonutsNumber() + numberOfItems);
    // notify the consumers that it's not empty anymore as we've just took some items
    donutStorage.notifyAll();
    return numberOfItems;
};


Enter fullscreen mode Exit fullscreen mode

BlockingQueue

The code above runs well, and we’ve fixed the issue. But there is another one. When consumers want more items than producers ever put, the program will run infinitely. That’s because the consumers will wait for more producers to put some items. But all the producers may have already finished their work. The same will happen if producers want to put more items than the storage can hold, and all the consumers have left and don’t need more items.

You can check it yourself by setting the producers’ number to 5, the consumers’ number to 10, and let them produce and consume 10 items each. Make sure that the initial `donutsNumber’ is 20. The program will run infinitely.

This can be solved by specifying the time the producers and consumers will wait before they finish. But the program has got very complicated and I’m eager to introduce a simpler and more effective solution — BlockingQueue.

This basically does all the inter-thread communication and synchronization for us. It’s very easy to implement. BlockingQueue stores all the items in an array. When the array is empty, it internally blocks all the consumers and notifies them when some items have been added. A similar is done with the producers.

The primary methods for putting into the queue and taking from it are put and take. But I use offer and poll to solve the problem described above. They put one item into the queue or take one from it. I also use add to initially populate the queue with some items.

You can find the final code on my GitHub. Make sure to star the repo! ⭐

DanielRendox / ThreadSynchronizationInJava

A simple learning project that demonstrates ways to solve such multithreading problems as race conditions, deadlocks, and producer-consumer problem.

ThreadSynchronizationInJava

A simple learning project that demonstrates ways to solve such multithreading problems as race conditions, deadlocks, and producer-consumer problem.

The project forms the basis of the following tutorials:

  1. What is a Race Condition in Java, and how it can be prevented using synchronized and AtomicInteger
  2. How to solve the producer-consumer problem in Java


View on GitHub

BlockingQueue vs wait and notify

There are various tools like BlockingQueue out there that provide a simple and effective solution to the producer-consumer problem. It’s not recommended to use old-school wait and notify for this purpose as they require special effort and your code will be bug-prone as opposed to well-tested ready solutions.

However, wait and notify provide more flexibility and control over the synchronization mechanism. For example, using BlockingQueue is designed to work with custom objects. So ideally, we would create a class called Donut for that and put these donuts into the queue. But we don’t need them. What only matters in our program is the number of items not getting bigger than the capacity and not getting negative. So in our case, we waste some resources on creating an array of simple empty Objects. But even so, the pros justify the cons.

Nevertheless, if that’s not true for your app, it may be reasonable to use such tools as wait and notify. The reason why it’s the most popular solution to the producer-consumer problem is that it’s easier to understand inter-thread communication using wait and notify. You may also get asked to solve a producer-consumer problem with wait and notify on your interview.

I hope you learned how to solve a producer-consumer problem from this tutorial. Put a unicorn 🦄 on this article to indicate that you managed to read the whole post. Your and my effort should be seen! See you later!

Further reading

Check out my other articles about multithreading in Java:

图片[1]-How to solve the producer-consumer problem in Java — vivid example (multithreading) - 拾光赋-拾光赋

🤯 Thread, Runnable, Callable, ExecutorService, and Future – all the ways to create threads in Java

Daniel Rendox ・ Jun 7 ’23

#java #programming #tutorial #webdev
图片[1]-How to solve the producer-consumer problem in Java — vivid example (multithreading) - 拾光赋-拾光赋

️ What is a Race Condition in Java, and how it can be prevented using synchronized and AtomicInteger

Daniel Rendox ・ Jun 17 ’23

#java #programming #tutorial #webdev

And some other useful resources as well:

Multithreading in Java (4 Part Series)

1 Multithreading in Java Part 1 – Process vs Thread
2 🤯 Thread, Runnable, Callable, ExecutorService, and Future – all the ways to create threads in Java
3 ️ What is a Race Condition in Java, and how it can be prevented using synchronized and AtomicInteger
4 How to solve the producer-consumer problem in Java — vivid example (multithreading)

原文链接:How to solve the producer-consumer problem in Java — vivid example (multithreading)

© 版权声明
THE END
喜欢就支持一下吧
点赞8 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容