chymyst-core

The four levels of concurrency

Written by Sergei Winitzki

The words “concurrent programming”, “asynchronous programming”, and “parallel programming” are used in many ways and sometimes interchangeably. However, there do in fact exist different, inequivalent levels of complexity that we encounter when programming applications that run multiple tasks simultaneously. I will call these levels “parallel data”, “acyclic dataflow”, “cyclic dataflow”, and “general concurrency”.

As we will see, each level is a strict subset of the next.

Level 1: Parallel data

The main task here is to process a large volume of data more quickly, by splitting the data in smaller subsets and processing all subsets in parallel (on different CPU cores and/or on different machines).

The computation is sequential, and we could have computed the same results on a single processing thread, without any splitting. Typically, the fully sequential, single-thread computation would be too slow, and thus we are trying to speed it up by using multiple threads.

A typical data-parallel task is to produce a table of word counts in 10,000 different text files. This class of problems is solved by such frameworks as Scala’s parallel collections, Hadoop, and Spark. The main difficulty for the implementers of these frameworks is to “parallelize” the task, that is, to split the task correctly into subtasks that are as independent as possible, and to run these subtasks in parallel as efficiently as possible.

Parallel data = applicative stream

From the type-theoretic point of view, the splitting of the large data set into small chunks is equivalent to replacing the whole data set by a parameterized container type Stream[T], where T is the type of one chunk of data. For example, in Spark this parameterized container type is the RDD[T] type.

A parallel data framework provides a number of operations on the Stream[T] type, such as map or filter. These operations make this type into a functor. Usually, there is also an operation such as map2 that applies a function of several arguments to several containers at once. This operation makes Stream[T] into an applicative functor.

However, importantly, there is no flatMap operation that would make Stream[T] into a monad. A data-parallel framework limits its expressive power to that of an applicative functor.

It is this limitation that makes data-parallel frameworks so effective in their domain of applicability. As is well known in the functional programming folklore, applicative functor operations are easily and efficiently parallelized, but the monadic flatMap operation is not easily parallelized.

Level 2: Acyclic dataflow

The main task here is to process a large volume of data that is organized as one or more data streams. Each element of a stream is a chunk of data that can be processed independently from other chunks. The streams form an acyclic graph that starts “upstream” at “source” vertices and flows “downstream”, finally ending at “sink” vertices. Other vertices of the graph are processing steps that transform the chunks of data in some way. Usually, programmers want to achieve the maximum throughput (number of chunks consumed at “sources” and delivered to “sinks” per unit time).

The dataflow is asynchronous: each vertex of the graph can perform its computation and deliver a result to a next vertex, even though that vertex might be still busy with its own processing. Since the computations could take different amounts of time at different vertices, certain processing steps will have lower throughput and incur longer wait times for subsequent steps. To compensate for this, we could run the slower processing steps in parallel on several data chunks at once, while other steps may run sequentially on each chunk. Therefore, acyclic dataflow systems can be also called “asynchronous parallel streaming systems”.

A typical use case of acyclic dataflow is to implement a high-throughput asynchronous Web server that can start responding to the next request long before the previous request is answered. This class of problems can be solved by using Future[T], by asynchronous streaming frameworks such as Akka Streaming, scala/async, or FS2, and by various functional reactive programming (FRP) frameworks. The main difficulty for the implementers of these frameworks is to interleave the wait times on each thread as much as possible and to avoid wasting CPU cycles when some threads are blocked.

As in the case of data-parallel computations, the acyclic dataflow computation is still equivalent a sequential computation, in the following sense: We could have computed the same results on a single thread and without using any asynchronous computations. However, this would be too slow, and thus we are trying to speed it up by interleaving the wait times and optimizing thread usage.

Acyclic dataflow = monadic stream

A stream that carries values of type T is naturally represented by a parameterized container type Stream[T]. Arbitrary acyclic dataflow can be implemented only if Stream[T] is a monadic functor: in particular, processing a single chunk of type T could yield another Stream[T] as a result. Accordingly, most streaming frameworks provide a flatMap operation for streams.

A monadic functor is strictly more powerful than an applicative functor, and accordingly the user has more power in implementing the processing pipeline, in which the next steps can depend in arbitrary ways on other steps and on previous data chunks in the stream. However, a monadic computation is difficult to parallelize automatically. Therefore, it is the user who now needs to specify which steps of the pipeline should be parallelized, and which steps should be separated by an asynchronous “boundary” from other steps.

Level 3: Cyclic dataflow

The class of problems I call “general dataflow” is very similar to “acyclic dataflow”, except for removing the limitation that the dataflow graph should be acyclic.

The main task of general dataflow remains the same — to process data that comes as a stream, chunk after chunk. However, now we allow any step of the processing pipeline to use a “downstream” step as input, thus creating a loop in the dataflow graph. This loop, of course, must cross an asynchronous boundary somewhere, or else we will have an actual, synchronous infinite loop in the program. An “asynchronous infinite loop” means that the output of some downstream processing step will be later (asynchronously) fed into the input of some upstream step.

A typical program that requires a dataflow graph with an asynchronous loop is an event-driven graphical user interface (GUI) in the FRP paradigm. Consider, for example, an interactive Excel table with auto-updating cells will have to recompute a number of cells depending on user input events. User input events will depend on what is shown on the screen at a given time; and the contents of the screen depends on data in the cells.

This mutual dependency creates a loop in the dataflow graph: First, the user creates an input event, sending a chunk of data to the Excel computation engine. Second, the engine consults the table cells stored in memory and updates the values in the cells. Third, the updated values are shown on the screen, which allows the user to create further input events that will depend on the new contents of the cells.

The loop in the graph is asynchronous because the user creates new input events at a later time than the cells are updated on the screen.

This class of problems can be solved by functional reactive programming (FRP) frameworks, Akka Streams, and some other asynchronous streaming systems.

Despite the fact that the general dataflow is strictly more powerful than the acyclic dataflow, the entire computation is still possible to perform synchronously on a single thread without any concurrency.

Cyclic dataflow = recursive monadic stream

To formalize the difference between general and acyclic dataflow, we note that a loop in the dataflow graph is equivalent to a recursive definition of an asynchronous stream. In other words, we need the Stream[T] type to be a monad with a monadFix operation.

For instance, in a typical GUI application implemented in the FRP paradigm, one defines three streams: Stream[Model], Stream[View], and Stream[Input]. The following table illustrates the meaning of these types and the way the three streams depend on each other.

Stream[T] A value of type T represents: Stream depends on:
Stream[Model] the data model of the application at a given time Stream[Input]
Stream[View] all windows and UI elements shown on the screen at a given time Stream[Model]
Stream[Input] any of the possible input events that the user might create Stream[View]

Note that the last dependency is asynchronous because the user can create input events only after the view is shown. Therefore, the three streams are defined mutually recursively, and the stream graph contains an asynchronous loop.

The streaming frameworks that do not support a recursive definition of streams fall into the acyclic dataflow class.

Level 4: General concurrency

Finally, we consider the most general concurrency problem, where we need to manage many computation threads running concurrently in unknown order and interacting in arbitrary ways. For instance, one thread may start another, then at some point stop and wait until the other thread computes a certain result, and then examine that result to decide whether to continue its own computation, to wait further, or to create new computation threads.

The main difficulty here is to ensure that different threads are synchronized in the desired manner.

Frameworks such as Akka Actors, Go coroutines/channels, and the Java concurrency primitives (Thread, wait/notify, synchronized) are all Level 4 concurrency frameworks. The chemical machine (known in the academic world as “join calculus”) is also a Level 4 framework.

A typical program that requires this level of concurrency is an operating system where many running processes can synchronize and communicate with each other in arbitrary ways.

(It seems to me that the “dining philosophers” problem is also an example of a concurrency task that cannot be implemented by any concurrency framework other than a Level 4 framework. However, I do not know how to prove that this is so.)

Why is Level 4 higher than Level 3

The following argument helps establish that Level 4 is strictly more powerful than Level 3.

The key observation is that concurrency at Level 3 (and below) can be run on a single execution thread. In other words, any program at Level 3 or below will give the same results when run on multiple threads and on a single thread (although it may run slower).

If we find an example of a program that cannot be implemented on a single thread, it will follow that Level 4 is strictly more powerful than Level 3. To obtain such an example, consider the following situation:

We are given two functions, a() and b(), that either eventually produce a result or go into an infinite loop, never returning a result. It is known that, when we call a() and b() concurrently, at most one of them may go into an infinite loop (but it could be a different process every time). We need to implement a function firstResult(a, b) that will run processes a() and b() concurrently and wait for the value returned by whichever process finishes first. By assumption, at least one of them will certainly produce a result value. The function firstResult(a, b) needs to return that value.

Now, we claim that this task cannot be run on a single thread, because no program running on a single thread can decide correctly which of the two processes returns first. Here is why: Regardless of how we implement firstResult(), a single-threaded program will have to call either a() or b() on that single thread. Sometimes, that call will go into an infinite loop, and then the single thread will be infinitely blocked. In that case, our program will never finish computing firstResult().

So, if implemented on a single thread, firstResult() will sometimes fail to return a value; in other words, it is a partial function. However, if we are allowed to use many threads, we can implement firstResult() as a total function, always returning a value. To do that, we simply run the processes a() and b() simultaneously on two different threads, and we are guaranteed that at least one of the processes will return a result.

Are there other levels?

How can we be sure that there are no other levels of expressive power in concurrency?

Some assurance comes from the mathematical description of these levels:

It is possible to take a level 3 framework and add a single feature that goes beyond the expressive power of Level 3. For instance, we could add firstResult(a, b) as a primitive to a streaming framework. However, this single feature might not be sufficient to implement other Level 4 tasks.

Similarly, adding a primitive for starting a new thread to a Level 1 framework will enable users to perform certain tasks but not others.

I conjecture that the result of adding a single high-level feature to a lower-level framework will be a new framework that is hard to use because some concurrency tasks cannot be naturally expressed in it while others can. Users will have to work around these deficiencies or constantly ask for new features to be added to the framework, and the design of concurrent programs will become difficult to understand.

Which level to use?

It is best to use the level of concurrency that is no higher than what is required to solve the task at hand.

For a data-parallel computation, Spark (Level 1) is the right choice while Akka Streaming (Level 3) is an overkill.

For a streaming pipeline for data processing, Scala standard streams (Level 2) are already adequate, although FS2 or Akka Streaming (Level 3) will offer more flexibility while not over-complicating the user code. Akka Actors will be an overkill, not adding much value to the application, although you will certainly be able to implement a streaming pipeline using raw actors.

For implementing a GUI, a good recursive FRP (Level 3) framework is a must. Raw Akka actors are not necessary but could be used instead of FRP.

If your task is to implement an alternative to Spark (Level 1) or to Scala streams (Level 2), you need a higher-powered framework, and possibly a Level 4 if you need fine-grained control over threads.