Brian Goetz lists AtomicReference in his the book Java Concurrency in Practice in the in the section advanced topics. Yet we will see that AtomicReference are, for specific use cases, easier to use than synchronized blocks. And the new JDK 8 getAndUpdate and updateAndGet methods make AtomicReference, even more, easier to use.
But let us start with the first topic, a use case which can be easier implemented by an AtomicReference than by a synchronized block: A concurrent state machine.
How to use compareAndSet: A concurrent state machine
The class CommandReader from the maven surefire plugin uses the compareAndSet method to implement a concurrent state machine:
public final class CommandReader {
private static final CommandReader READER = new CommandReader();
private final Thread commandThread =
newDaemonThread( new CommandRunnable(), "surefire-forkedjvm-command-thread" );
private final AtomicReference<Thread.State> state =
new AtomicReference<Thread.State>( NEW );
public static CommandReader getReader() {
final CommandReader reader = READER;
if ( reader.state.compareAndSet( NEW, RUNNABLE ) ) {
reader.commandThread.start();
}
return reader;
}
}
Enter fullscreen mode Exit fullscreen mode
The class AtomicReference wraps another class to enrich a variable with atomic update functionality. In line 5 the AtomicReference represents an atomic variable of the Enum type Thread.State. The AtomicReference gets initialized in line 6 to the value NEW.
The method getReader must start the commandThread when the current state is NEW and update its value to RUNNABLE. Since the method can be called by multiple threads in parallel, setting and checking must be done atomically. This is done by the method compareAndSet, line 9. The compareAndSet method only updates its value to the new value when the current value is the same as the expected. In the example, it only updates the variable to RUNNING when the current value is NEW. If the updated succeed the method returns true and the thread gets started, otherwise, it returns false and nothing happens. The check and update are done atomically.
Here is, as a comparison, the same functionality implemented with a synchronized block.
public final class CommandReader {
private static final CommandReader READER = new CommandReader();
private final Thread commandThread =
newDaemonThread( new CommandRunnable(), "surefire-forkedjvm-command-thread" );
private final Thread.State state = NEW;
private final Object LOCK = new Object();
public static CommandReader getReader() {
final CommandReader reader = READER;
synchronized(reader.LOCK) {
if(reader.state == NEW) {
reader.commandThread.start();
reader.state = RUNNABLE;
}
}
return reader;
}
}
Enter fullscreen mode Exit fullscreen mode
We use a synchronized block around the check and update of the variable state, line 10. This example shows why we need to atomically check and update. Without synchronization, two threads might read a state NEW calling the start method from the commandThread multiple times.
As we see we can replace the synchronized block, the if statement and the write to the state by one method call to compareAndSet. In the next example, we see how to use the compareAndSet method to update values.
Updating values: Retry till success
The idea behind using compareAndSet for updates is to retry till the update succeeds. The class AsyncProcessor from RXJava uses this technique to update an array of subscribers in the method add:
final AtomicReference<AsyncSubscription<T>[]> subscribers;
boolean add(AsyncSubscription<T> ps) {
for (;;) {
AsyncSubscription<T>[] a = subscribers.get();
if (a == TERMINATED) {
return false;
}
int n = a.length;
@SuppressWarnings("unchecked")
AsyncSubscription<T>[] b = new AsyncSubscription[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = ps;
if (subscribers.compareAndSet(a, b)) {
return true;
}
}
}
Enter fullscreen mode Exit fullscreen mode
The update is retried using a for loop, line 3. The loop is only terminated if either the subscriber array is in the state terminated, line 6, or the compareAndSet operation succeeds, line 14. In all other cases, the update is repeated on a copy of the array.
Starting with JDK 8 the class AtomicReference provides this functionality in the two utility methods getAndUpdate and updateAndGet The following shows the implementation of the getAndUpdate method in JDK 8:
public final V getAndUpdate(UnaryOperator<V> updateFunction) {
V prev, next;
do {
prev = get();
next = updateFunction.apply(prev);
} while (!compareAndSet(prev, next));
return prev;
}
Enter fullscreen mode Exit fullscreen mode
The method uses the same technique as the add method from the class AsyncProcessor. It retries the compareAndSet method in a loop, line 6. The updateFunction will be called multiple times when the update fails. So this function must be either side effect free or idempotent.
And here is the add method from above implemented with the new updateAndGet method:
boolean add(AsyncSubscription<T> ps) {
AsyncSubscription<T>[] result = subscribers.updateAndGet( ( a ) -> {
if (a != TERMINATED) {
int n = a.length;
@SuppressWarnings("unchecked")
AsyncSubscription<T>[] b = new AsyncSubscription[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = ps;
return b;
}
else {
return a;
}
});
return result != TERMINATED;
}
Enter fullscreen mode Exit fullscreen mode
As we see the while loop is hidden in the updateAndGet method. We only need to implement a function calculating a new value from an old one.
Conclusion and next steps
We have seen two examples of compareAndSet. If you are interested in more examples take a look at the book The Art of Multiprocessor Programming. It shows you how to implement typical concurrent data structures using compareAndSet. And this article shows how to test atomic updates.
I would be glad to hear from you about how you use AtomicReference in your application.
原文链接:AtomicReference, a sometimes easier alternative to synchronized blocks
暂无评论内容