The Java programming language supports multiple threads, using a monitor construction for synchronization. In the Java language, a method can be declared as synchronized. A built-in mechanism ensures that only one Java thread can execute an object's synchronized methods at a time. The mechanism also allows threads to wait for resources to become available, and allows a thread that makes a resource available to notify other threads that are waiting for the resource. This web page describes this mechanism and its use. But first, we need to look at an example that illustrates why synchronization is needed.
In the Bounded Buffers problem, there are two kinds of processes or threads: producers and consumers. Producers take buffers from a queue of empty buffers, put content into the buffers, and place the buffers into a queue of full buffers. Consumers take buffers from a queue of full buffers, do something with the buffers' contents, and place the buffers back onto the queue of empty buffers. Each of the buffer queues has a bounded capacity for buffers. Processes communicating through a pipe or communicating through mailboxes are examples of the Bounded Buffers problem.
There is generally only one queue of empty buffers, maintained by the operating system. It provides all buffers for processes or threads. There can be multiple producers, consumers, and queues of full buffers. The queues of full buffers usually have a small capacity, on the order of 10 buffers. We generally only need to consider a single queue of full buffers at a time. The following picture illustrates the flow of buffers.
|
When we have a good solution to the Bounded Buffers problem, the coordination between producers and consumers will be controlled by two BufferQueue objects representing the queue of empty buffers and the queue of full buffers. The following code is an implementation of a BufferQueue class that works well in a single-threaded environment.
We are limited in the kind of testing we can do on this first version of the BufferQueue class. There is no good way of making producers wait for empty buffers or making consumers wait for full buffers. So we will only use producers. Initially the queue of empty buffers has 10 buffers and the queue of full buffers has none. We first run a single producer that moves buffers from the queue of empty buffers to the queue of full buffers as long as the queue of empty buffers is not empty. The producer kills time for 20 milliseconds on the average after removing a buffer from the queue of empty buffers and kills time for 20 milliseconds on the average after putting the buffer on the queue of full buffers.
Here is the log for this kind of test.
0001: Producer 1 got Buffer 0 from BufferQueue empty. 0042: Producer 1 removed Buffer 0 from BufferQueue empty. 0068: Producer 1 got Buffer 1 from BufferQueue empty. 0087: Producer 1 removed Buffer 1 from BufferQueue empty. 0134: Producer 1 got Buffer 2 from BufferQueue empty. 0140: Producer 1 removed Buffer 2 from BufferQueue empty. 0173: Producer 1 got Buffer 3 from BufferQueue empty. 0222: Producer 1 removed Buffer 3 from BufferQueue empty. 0251: Producer 1 got Buffer 4 from BufferQueue empty. 0276: Producer 1 removed Buffer 4 from BufferQueue empty. 0294: Producer 1 got Buffer 5 from BufferQueue empty. 0298: Producer 1 removed Buffer 5 from BufferQueue empty. 0316: Producer 1 got Buffer 6 from BufferQueue empty. 0335: Producer 1 removed Buffer 6 from BufferQueue empty. 0380: Producer 1 got Buffer 7 from BufferQueue empty. 0390: Producer 1 removed Buffer 7 from BufferQueue empty. 0423: Producer 1 got Buffer 8 from BufferQueue empty. 0438: Producer 1 removed Buffer 8 from BufferQueue empty. 0464: Producer 1 got Buffer 9 from BufferQueue empty. 0532: Producer 1 removed Buffer 9 from BufferQueue empty.As you can see, the queue of empty buffers appears to be working correctly.
Now let's try the same test with two producers. Here is the log.
0001: Producer 1 got Buffer 0 from BufferQueue empty. 0004: Producer 2 got Buffer 0 from BufferQueue empty. 0018: Producer 2 removed Buffer 0 from BufferQueue empty. 0050: Producer 2 got Buffer 1 from BufferQueue empty. 0059: Producer 1 removed Buffer 1 from BufferQueue empty. 0063: Producer 2 removed Buffer 2 from BufferQueue empty. 0064: Producer 1 got Buffer 3 from BufferQueue empty. 0086: Producer 1 removed Buffer 3 from BufferQueue empty. 0098: Producer 1 got Buffer 4 from BufferQueue empty. 0110: Producer 2 got Buffer 4 from BufferQueue empty. 0114: Producer 2 removed Buffer 4 from BufferQueue empty. 0120: Producer 1 removed Buffer 5 from BufferQueue empty. 0146: Producer 1 got Buffer 6 from BufferQueue empty. 0161: Producer 1 removed Buffer 6 from BufferQueue empty. 0167: Producer 1 got Buffer 7 from BufferQueue empty. 0174: Producer 1 removed Buffer 7 from BufferQueue empty. 0177: Producer 2 got Buffer 8 from BufferQueue empty. 0184: Producer 1 got Buffer 8 from BufferQueue empty. 0200: Producer 1 removed Buffer 8 from BufferQueue empty. 0202: Producer 2 removed Buffer 9 from BufferQueue empty.If you look at the "got Buffer" reports, you can see that some of the buffers were gotten twice (Buffer 0, for example) and others weren't gotten at all (Buffer 2, for example).
The first solution has some obvious problems in a multithreaded environment. First of all, it makes no provision for making a thread wait. To get the BufferQueue class tested, we needed to get rid of the consumers, who need to wait for a full buffer. Second, multiple threads that both read and modify shared data can make a mess of the data.
Part of the problem is the interface for the BufferQueue class.
The fact that the functionality of the getBuffer() and
removeBuffer() methods are separated causes trouble whenever
one process does a getBuffer() in between another process's
getBuffer() and removeBuffer() - both processes
get the same buffer.
If the first process did a removeBuffer() between another
process's getBuffer() and removeBuffer() then the
second buffer to be removed will never have been gotten.
When a data structure is modified, you usually need to get local information about the structure, then modify it. You cannot do this correctly if the structure is modified in between these two steps. When you deal with multiple threads, there are usually operations that have to be executed atomically. That is, the operations have to be performed completely without the possibility of another thread trying to make modifications at the same time.
To fix our BufferQueue, we must start by modifying its methods.
We need to combine the getBuffer() and
removeBuffer() functionality.
So we combine them into a takeBuffer() method that removes
and returns a buffer.
While we're at it, we may as well get rid of the isEmpty() and
isFull() methods.
The boolean values that they return can easily change before they are used.
We thus end up with the second version of BufferQueue.java.
This version of the BufferQueue class cannot properly be tested since there
is no way of properly stopping a producer when the queue of empty buffers
is empty.
If we did come up with a way of doing that would it work?
Yes, for a while, but eventually it would fail.
Since a thread swith can occur at any time, we do not have any guarantee
that takeBuffer() and putBuffer() are executed
atomically.
As a matter of fact, these methods execute so fast relative to the time
between thread switches that failure would probably occur only once every
few hundred thousand operations.
Imagine examining logs that big to try to figure out what is going wrong.
Problems that show up rarely are much harder to troubleshoot than problems
that occur frequently.
A correct solution to a synchronization problem requires the following.
putBuffer()
method that wakes up threads that are waiting.
A monitor is a convenient mechanism that tackles addresses these needs. In order to understand the monitor concept, let's take a look at how monitors are implemented in Java.
Any object in Java is a monitor if some of its methods are declared with
the synchronized keyword.
As a monitor, there are three mechanisms for addressing the needs described
above.
synchronized keyword:
wait() method:wait() method can only be invoked by the object's lock
holder.
When the lock holder invokes the wait() method it is placed
into the object's wait set.
If a thread other than the lock holder invokes the wait()
method it will cause an IllegalMonitorStateException to be thrown.
notifyAll() method:notifyAll() method also throws an
IllegalMonitorStateException if invoked by a thread other than the lock
holder.
In order to properly synchronize a BufferQueue for the Bounded Buffers problem, the following changes need to be made.
takeBuffer() and putBuffer()
synchronized.
Put the keyword synchronized before the return types for the
methods.
takeBuffer() and
putBuffer().
This wait loop has the following form.
while (unable to proceed) {
try {
wait();
} catch(InterruptedException ex) {
}
}
The takeBuffer() method is unable to proceed when the
count variable is 0.
The putBuffer() method is unable to proceed when the
count variable is equal to buffers.length.
notifyAll() to wake up the wait set.
Normally, this method is invoked at the end of a synchronized method.
It should only be invoked when the change in state can make some thread in
the wait set able to proceed.
When the count variable is buffers.length - 1 at
the end of the putBuffer() method then a thread that is
waiting because the queue is full is able to proceed.
When the count variable is equal to 1 at the end of the
takeBuffer() method then a thread that is waiting because the
queue is empty is able to proceed.
The following table shows the monitor state for the queue of full buffers. For this example, each producer fills one buffer, puts it onto the queue of full buffers, then terminates. Each consumer gets one buffer from the queue of full buffers, consumes it, then terminates. At the start, there are two consumers and two producers, and the queue of full buffers is empty. All threads are attempting to access the queue sumultaneously.
In this table, count is the number of buffers in the queue, lh is its lock holder, lw is its lock waiters, and ws is its wait set.
| Consumer | Producer | full BufferQueue | runable | |||||
|---|---|---|---|---|---|---|---|---|
| A | B | C | D | count | lh | lw | ws | threads |
| takeBuffer | takeBuffer | putBuffer | putBuffer | 0 | - | - | - | A, B, C, D |
| runs | blocks | blocks | blocks | | | A | B, C, D | A | |
| wait | | | - | - | A | B, C, D | |||
| resumes | blocks | blocks | | | B | C, D | | | B | |
| wait | | | - | - | A, B | C, D | |||
| resumes | blocks | | | C | D | | | C | ||
| add Buffer | 1 | | | | | | | | | |||
| notifyAll | | | | | A, B, D | - | | | |||
| returns | | | - | - | A, B, D | ||||
| resumes | blocks | blocks | | | A | B, D | A | ||
| take buffer | 0 | | | | | | | ||||
| returns | | | - | - | B, D | ||||
| resumes | blocks | | | B | D | B | |||
| wait | | | - | - | B | D | |||
| resumes | | | D | | | | | ||||
| add buffer | 1 | | | | | | | ||||
| notifyAll | | | | | B | - | | | |||
| returns | | | - | - | B | ||||
| resumes | | | B | | | |||||
| take buffer | 0 | | | | | |||||
| returns | | | - | | | |||||