In 2003, Herb Sutter exposed the industry's biggest "dirty little secret" with his "The Free Lunch Is Over" article, demonstrating clearly that the era of ever-faster processors was at an end, to be replaced by a new era of parallelism via "cores" (virtual CPUs) on a single chip. The revelation sent shockwaves through the programming community because getting thread-safe code correct has always remained, in theory if not in practice, the province of high-powered software developers way too expensive for your company to hire. A privileged few, it seemed, understood enough about Java's threading model and concurrency APIs and "synchronized" keyword to write code that both provided safety and throughput ... and most of those had learned it the hard way.
It is presumed that the rest of the industry was left to fend for itself, clearly not a desirable conclusion, at least not to the IT departments paying for that software being developed.
Back-to-concurrency basicsLike Scala's sister language in the .NET space, F#, Scala stands as one of several purported solutions to the "concurrency problem." In this column, I have touched on several of Scala's properties that make it more amenable to writing thread-safe code such as immutable objects by default and a design preference for returning copies of objects rather than modifying their contents. Scala's support for concurrency reaches far deeper than just this though; it's high time to start poking around in the Scala libraries to see what lives there.
Before we can get too deep into Scala's concurrency support, it's a good idea to make sure that you have a good understanding of Java's basic concurrency model because Scala's support for concurrency builds, at some level, on top of the features and functionality provided by the JVM and supporting libraries. Toward that end, the code in Listing 1 contains a basic concurrency problem known as the Producer/Consumer problem (as described in the "Guarded Blocks" section of the Sun Java Tutorial). Note that the Java Tutorial version doesn't use the
java.util.concurrent
classes in its solution, preferring instead to use the old wait()
/notifyAll()
methods from java.lang.Object
:Listing 1. Producer/Consumer (pre-Java5)
package com.tedneward.scalaexamples.notj5; class Producer implements Runnable { private Drop drop; private String importantInfo[] = { "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" }; public Producer(Drop drop) { this.drop = drop; } public void run() { for (int i = 0; i < importantInfo.length; i++) { drop.put(importantInfo[i]); } drop.put("DONE"); } } class Consumer implements Runnable { private Drop drop; public Consumer(Drop drop) { this.drop = drop; } public void run() { for (String message = drop.take(); !message.equals("DONE"); message = drop.take()) { System.out.format("MESSAGE RECEIVED: %s%n", message); } } } class Drop { //Message sent from producer to consumer. private String message; //True if consumer should wait for producer to send message, //false if producer should wait for consumer to retrieve message. private boolean empty = true; //Object to use to synchronize against so as to not "leak" the //"this" monitor private Object lock = new Object(); public String take() { synchronized(lock) { //Wait until message is available. while (empty) { try { lock.wait(); } catch (InterruptedException e) {} } //Toggle status. empty = true; //Notify producer that status has changed. lock.notifyAll(); return message; } } public void put(String message) { synchronized(lock) { //Wait until message has been retrieved. while (!empty) { try { lock.wait(); } catch (InterruptedException e) {} } //Toggle status. empty = false; //Store message. this.message = message; //Notify consumer that status has changed. lock.notifyAll(); } } } public class ProdConSample { public static void main(String[] args) { Drop drop = new Drop(); (new Thread(new Producer(drop))).start(); (new Thread(new Consumer(drop))).start(); } }
Note: The code I present here is slightly modified from the Sun tutorial solution; there's a small design flaw in the code they present (see The Java Tutorial "bug").
The core of the Producer/Consumer problem is a simple one to understand: one (or more) producer entities want to provide data for one (or more) consumer entities to consume and do something with (in this case it consists of printing the data to the console). The
Producer
andConsumer
classes are pretty straightforward Runnable
-implementing classes: TheProducer
takes String
s from an array and put
s them into a buffer for the Consumer
totake
as desired.
The hard part of the problem is that if the
Producer
runs too fast, data will be potentially lost as it is overwritten; if the Consumer
runs too fast, data will be potentially double-processed as the Consumer
reads the same data twice. The buffer (called the Drop
in the Java Tutorial code) must ensure that neither condition occurs. Not to mention that there is no potential for data corruption (hard in the case of String references, but still a concern) as messages are put
in and take
n out of the buffer.
A full discussion of the subject is best left to Brian Goetz's Java Concurrency in Practice or Doug Lea's earlier Concurrent Programming in Java (see Resources), but a quick rundown of how this code works is necessary before you apply Scala to it.
When the Java compiler sees the
synchronized
keyword, it generates a try
/finally
block in place of the synchronized block with a monitorenter
opcode at the top of the block and a monitorexit
opcode in the finally
block to ensure that the monitor (the Java basis for atomicity) is released regardless of how the code exits. Thus, the put
code in Drop
gets rewritten to look like Listing 2:Listing 2. Drop.put after compiler helpfulness
// This is pseudocode public void put(String message) { try { monitorenter(lock) //Wait until message has been retrieved. while (!empty) { try { lock.wait(); } catch (InterruptedException e) {} } //Toggle status. empty = false; //Store message. this.message = message; //Notify consumer that status has changed. lock.notifyAll(); } finally { monitorexit(lock) } }
The
wait()
method tells the current thread to go into an inactive state and wait for another thread to call notifyAll()
on that object. The thread just notified must then attempt to acquire the monitor again after which point it is free to continue execution. In essence, wait()
andnotify()
/notifyAll()
act as a simple signaling mechanism, allowing the Drop
to coordinate between the Producer
and the Consumer
threads, one take
to each put
.
The code download that accompanies this article uses the Java5 concurrency enhancements (the
Lock
and Condition
interfaces and theReentrantLock
lock implementation) to provide timeout-based versions of Listing 2, but the basic code pattern remains the same. That is the problem: Developers who write code like in Listing 2 have to focus too exclusively on the details, the low-level implementation code, of the threading and locking required to make it all work correctly. What's more, developers have to reason about each and every line in the code, looking to see if it needs to be protected because too much synchronization is just as bad as too little.
Now let's look at Scala alternatives.
Good old Scala concurrency (v1)
One way to start working with concurrency in Scala is to simply translate the Java code directly over to Scala, taking advantage of Scala's syntax in places to simplify the code, a least a little:
Listing 3. ProdConSample (Scala)
object ProdConSample { class Producer(drop : Drop) extends Runnable { val importantInfo : Array[String] = Array( "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" ); override def run() : Unit = { importantInfo.foreach((msg) => drop.put(msg)) drop.put("DONE") } } class Consumer(drop : Drop) extends Runnable { override def run() : Unit = { var message = drop.take() while (message != "DONE") { System.out.format("MESSAGE RECEIVED: %s%n", message) message = drop.take() } } } class Drop { var message : String = "" var empty : Boolean = true var lock : AnyRef = new Object() def put(x: String) : Unit = lock.synchronized { // Wait until message has been retrieved await (empty == true) // Toggle status empty = false // Store message message = x // Notify consumer that status has changed lock.notifyAll() } def take() : String = lock.synchronized { // Wait until message is available. await (empty == false) // Toggle status empty=true // Notify producer that staus has changed lock.notifyAll() // Return the message message } private def await(cond: => Boolean) = while (!cond) { lock.wait() } } def main(args : Array[String]) : Unit = { // Create Drop val drop = new Drop(); // Spawn Producer new Thread(new Producer(drop)).start(); // Spawn Consumer new Thread(new Consumer(drop)).start(); } }
The
Producer
and Consumer
classes are almost identical to their Java cousins, again extending (implementing) the Runnable
interface and overriding the run()
method, and — in Producer
's case — using the built-in iteration method for each to walk the contents of theimportantInfo
array. (Actually, to make it more like Scala, importantInfo
should probably be a List
instead of an Array
, but in this first pass, I want to keep things as close to the original Java code as possible.)
The
Drop
class also looks similar to the Java version except that in Scala, "synchronized" isn't a keyword, it's a method defined on the classAnyRef
, the Scala "root of all reference types." This means that to synchronize on a particular object, you simply call the synchronize method on that object; in this case, on the object held in the lock field on Drop
.
Note that we also make use of a Scala-ism in the
Drop
class in the definition of the await()
method: The cond
parameter is a block of code waiting to be evaluated rather than evaluated prior to being passed in to the method. Formally in Scala, this is known as "call-by-name"; here it serves as a useful way of capturing the conditional-waiting logic that had to be repeated twice (once in put
, once in take
) in the Java version.
Finally, in
main()
, you create the Drop
instance, instantiate two threads, kick them off with start()
, and then simply fall off of the end ofmain()
, trusting that the JVM will have started those two threads before you finish with main()
. (In production code, this probably shouldn't be taken for granted, but for a simple example like this, it's going to be OK 99.99 percent of the time. Caveat emptor.)
However, having said all that, the same basic problem remains: Programmers still have to worry way too much about the issues of signaling and coordinating the two threads. While some of the Scala-isms might make the syntax easier to live with, it's not really an incredibly compelling win so far.
Scala concurrency, v2
A quick look at the Scala Library Reference reveals an interesting package:
scala.concurrency
. This package contains a number of different concurrency constructs, including the first one we're going to make use of, the MailBox
class.
As its name implies,
MailBox
is essentially the Drop
by itself, a single-slot buffer that holds a piece of data until it has been retrieved. However, the big advantage of MailBox
is that it completely encapsulates the details of the sending and receiving behind a combination of pattern-matching and case classes, making it more flexible than the simple Drop
(or the Drop
's big multi-slot data-holding brother,java.util.concurrent.BoundedBuffer
).Listing 4. ProdConSample, v2 (Scala)
package com.tedneward.scalaexamples.scala.V2 { import concurrent.{MailBox, ops} object ProdConSample { class Producer(drop : Drop) extends Runnable { val importantInfo : Array[String] = Array( "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" ); override def run() : Unit = { importantInfo.foreach((msg) => drop.put(msg)) drop.put("DONE") } } class Consumer(drop : Drop) extends Runnable { override def run() : Unit = { var message = drop.take() while (message != "DONE") { System.out.format("MESSAGE RECEIVED: %s%n", message) message = drop.take() } } } class Drop { private val m = new MailBox() private case class Empty() private case class Full(x : String) m send Empty() // initialization def put(msg : String) : Unit = { m receive { case Empty() => m send Full(msg) } } def take() : String = { m receive { case Full(msg) => m send Empty(); msg } } } def main(args : Array[String]) : Unit = { // Create Drop val drop = new Drop() // Spawn Producer new Thread(new Producer(drop)).start(); // Spawn Consumer new Thread(new Consumer(drop)).start(); } } }
The only difference here between v2 and v1 is in the implementation of
Drop
, which now makes use of the MailBox
class to handle the blocking and signaling of messages coming in and being removed from the Drop
. (We could have rewritten Producer
and Consumer
to use the MailBox
directly, but for simplicity's sake, I assume that we want to keep the Drop
API consistent across all the examples.) Using a MailBox
is a bit different from the classic BoundedBuffer
(Drop
) that we've been using, so let's walk through that code in detail.MailBox
has two basic operations: send
and receive
. The receiveWithin
method is simply a timeout-based version of receive
. MailBox
takes messages that can be of any type whatsoever. The send()
method essentially drops the message into the mailbox, notifying any pending receivers immediately if it's of a type they care about, and appending it to a linked list of messages for later retrieval. The receive()
method blocks until a message appropriate to the function block that's passed in to it is received.
Therefore, in this situation, we create two case classes, one containing nothing (
Empty
) that indicates the MailBox
is empty and one containing the data (Full
) with the message data in it.- The
put
method, because it is putting data into theDrop
, callsreceive()
on theMailBox
looking for anEmpty
instance, thus blocking untilEmpty
has been sent. At this point, it sends aFull
instance to theMailBox
containing the new data. - The
take
method, because it is removing data from theDrop
, callsreceive()
on theMailBox
looking for aFull
instance, extracts the message (again thanks to pattern-matching's ability to extract values from inside the case class and bind them to local variables), and sends anEmpty
instance to theMailBox
.
No explicit locking required, and no thinking about monitors.
Scala concurrency, v3
In fact, we can shorten the code up considerably if it turns out that
Producer
and Consumer
don't really have to be full-fledged classes at all (which is the case here) — both are essentially thin wrappers around the Runnable.run()
method, which Scala can do away with entirely by using the scala.concurrent.ops
object's spawn
method, like in Listing 5:Listing 5. ProdConSample, v3 (Scala)
package com.tedneward.scalaexamples.scala.V3 { import concurrent.MailBox import concurrent.ops._ object ProdConSample { class Drop { private val m = new MailBox() private case class Empty() private case class Full(x : String) m send Empty() // initialization def put(msg : String) : Unit = { m receive { case Empty() => m send Full(msg) } } def take() : String = { m receive { case Full(msg) => m send Empty(); msg } } } def main(args : Array[String]) : Unit = { // Create Drop val drop = new Drop() // Spawn Producer spawn { val importantInfo : Array[String] = Array( "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" ); importantInfo.foreach((msg) => drop.put(msg)) drop.put("DONE") } // Spawn Consumer spawn { var message = drop.take() while (message != "DONE") { System.out.format("MESSAGE RECEIVED: %s%n", message) message = drop.take() } } } } }
The
spawn
method (imported via the ops
object just at the top of the package block) takes a block of code (another by-name parameter example) and wraps it inside the run()
method of an anonymously-constructed thread object. In fact, it's not too difficult to understand what spawn
's definition looks like inside of the ops
class:Listing 6. scala.concurrent.ops.spawn()
def spawn(p: => Unit) = { val t = new Thread() { override def run() = p } t.start() }
... which once again highlights the power of by-name parameters.
One drawback to the
ops.spawn
method is the basic fact that it was written in 2003 before the Java 5 concurrency classes had taken effect. In particular, the java.util.concurrent.Executor
and its ilk were created to make things easier for developers to spawn threads without having to actually handle the details of creating thread objects directly. Fortunately, spawn
's definition is simple enough to recreate in a custom library of your own, making use of Executor
(or ExecutorService
or ScheduledExecutorService
) to do the actual launching of the thread.
In fact, Scala's support for concurrency goes well beyond the
MailBox
and ops
classes; Scala also supports a similar concept called "Actors," which uses a similar kind of message-passing approach that the MailBox
uses, but to a much greater degree and with much more flexibility. But that's for next time.Conclusion
Scala provides two levels of support for concurrency, much as it does for other Java-related topics:
- The first, full access to the underlying libraries (such as java.util.concurrent) and support for the "traditional" Java concurrency semantics (such as monitors and
wait()
/notifyAll()
). - The second, a layer of abstraction on top of those basic mechanics, as exemplified by the
MailBox
class discussed in this article and the Actors library that we'll discuss in the next article in the series.
The goal is the same in both cases: to make it easier for developers to focus on the meat of the problem rather than having to think about the low-level details of concurrent programming (obviously the second approach achieves that better than the first, at least to those who aren't too deeply invested in thinking at the low-level primitives level).
One clear deficiency to the current Scala libraries, however, is the obvious lack of Java 5 support; the
scala.concurrent.ops
class should have operations like spawn
that make use of the new Executor
interfaces. It should also support versions of synchronized
that make use of the new Lock
interfaces. Fortunately, these are all library improvements that can be done at any point during Scala's life cycle without breaking existing code; they can even be done by Scala developers themselves without having to wait for Scala's core development team to provide it to them (all it takes is a little time).
No comments:
Post a Comment