Coroutines in Java




A few months ago, as I was attending the Devoxx’18 Conference in Antwerp, Belgium, one of the most spectacular topics which I was excitedly looking forward to following, was the Project Loom’s “Fibers”. Fibers are lightweight user mode threads scheduled by the JVM runtime — not by the Operating System. So, they have very low footprints. It means no context switches which are potentially expensive — or at least context switches under the control of the Java VM. The Fiber effort is driven by the project team, called Project Loom within the OpenJDK. Finally, a couple of months later, I got the chance to play around with Fibers. The feature is being planned to get released in upcoming Java releases (the source code I have recently checked out, was a JDK 13 fork — jdk-13-ea, so I presume the current plan is the Java’s 13th major release — or later).

The first concept we will visit is Continuations. Fiber tasks are wrapped in Continuations and yield the execution as soon as the yield() method gets called on them. Continuations represent a program state which can be suspended or resumed, they can even return a value. The concept does already exist in programming languages like Python, Go, Scheme, Haskell, etc. — or traded under a different name, which is “coroutines” which are Fibers without a scheduler, basically.

Fibers = Continuations + Scheduler

The Java Fiber tasks wrapped in continuations will be run by carrier threads. It is obviously not guaranteed that all Fibers within a program will be run by the same carrier thread.

In order to test Fibers on your machine, first you need to check out the source code from here. After selecting the JDK in your project, you need to add “- -enable-preview” VM flag to your java command as well as javac so as to enable Preview features before you compile and/or run the program.

Let’s have a quick look at our first example for Continuations. In the simple Java program below, while the for loop is iterating over a list of letters, the execution is yielded, so the the main thread goes on with the line 21. line with while loop. In this case, we first check out whether the continuation is done, if it is not the case, the execution resumes from the line 17:

package io.ryos.demos.continuations;

import static java.lang.System.out;
import java.util.Arrays;
import java.util.List;

public class ContinuationDemo {

  public static void main(String[] args) {
    var scope = new ContinuationScope("my-scope");
    var cont = new Continuation(scope, () -> {
      final List<String> letters = Arrays.asList("a", "b", "c");
      for (var letter : letters) {
        out.println("Your letter is: " + letter);
        out.println("In Cont the thread is: " + Thread.currentThread().getName());
        out.println("Yielding ...");
        Continuation.yield(scope);
      }
    });

    while (!cont.isDone()) {
      cont.run();
      out.println("After run, the thread is " +  Thread.currentThread().getName());
    }
  }
}

Below, you see the program output. The continuation is run by the main thread, though. The program state has just been left on the line 17 and went on with the line 21, and it resumed from the line 17 with run() call on the continuation instance again:

I had mentioned that the Fibers are wrapped in continuations. In the following example, it is similar to the continuation demo, the producer which is scheduled as a Fiber puts some numbers into the the queue from which another Fiber takes. take() and put() operations on the queue are blocking and the synchronised queue capacity is none. Both Fiber instances are synchronised on this synchronous queue instance, SynchronousQueue:

package io.ryos.demos.fibers;

import static java.lang.System.out;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.SynchronousQueue;

public class FiberDemo {

  public static void main(String[] args) {
    // sync queue without any capacity.
    SynchronousQueue<Integer> syncQueue = new SynchronousQueue<>();

    // schedule a producer fiber to create numbers from 1 to n.
    Fiber.schedule(() -> {
      int counter = 0;
      while (true) {
        ++counter;
        var ct = Thread.currentThread();
        out.println(String.format("Putting the value: %d, Thread: %s", counter, ct.getName()));
        syncQueue.put(counter); // blocks
      }
    });

    out.println("Scheduled the producer fiber... - " + Thread.currentThread().getName());

    takeWhile(syncQueue, 5);

    out.println("Scheduled the consumer fiber... - " + Thread.currentThread().getName());
  }

  private static List<Integer> takeWhile(final SynchronousQueue<Integer> syncQueue, final int n) {

    // schedule another fiber, which consumes n integers.
    final Fiber<List<Integer>> fiber = Fiber.schedule(() -> {
      List<Integer> result = new ArrayList<>();
      for (var i = 0; i < n + 1; i++) {
        var take = syncQueue.take();  // blocks
        var ct = Thread.currentThread();
        out.println(String.format("Reading the value: %d, Thread: %s", take, ct.getName()));
        result.add(take);
      }
      return result;
    });

    return fiber.join();
  }
}

FiberDemo, heavily inspired by Alan Bateman’s presentation at Devoxx’18

You can see the ping-pong between the producer and the consumer on the carrier thread, that is named Fiber:

As soon as the thread is blocked on the line 22, since the queue’s put operation is a blocking one, the while-loop yields its execution. Nevertheless, the carrier thread is not blocked. Immediately, it does pick another Fiber, which is the consumer in the example and runs it. Fortunately, the consumer is suspended and waiting there and attempts to take the element out of the queue, which gets the execution to resume in the producer Fiber.

Wrap-up

The Fibers/Continuations effort is in progress and do expect major updates in interfaces as well as in their implementations. Moreover, there are still some limitations in the current prototype which Alan mentioned in his talk. If your application is implemented in a blocking fashion, it comprises lots of blocking I/O operations, the Fibers might be the cure for you. Still, there is a long way to go since the Thread API is spread over the whole Java ecosystem and I believe the integration of the Fibers without breaking the existing APIs is going to be challenging.


By the way, if you want to learn more about the Fibers and their limitations, I can recommend the conference talk by Alan Bateman at Devoxx’18. Furthermore, I am currently working on some benchmarks so as to compare the performance of Fibers against the Java Threads, which I plan to publish here very soon.

Stay tuned.