Go's Concurrency Examples in Java 19

by: Ethan McCue

Preface

Threads are usually expensive.

There is no way for your operating system to know exactly how much stack space a thread will need so it allocates an amount on the order of around a kilobyte initially and then around a megabyte once the thread starts to be used. You only have around a bakers dozen gigabytes of RAM, so you can only have give or take 10,000 active threads.

The way around this is to implement a some mechanism that takes a limited number of operating system threads and juggles a much larger number of "logical threads" on top of them.

For most languages, this means adding some form of async/await syntax. Where you put an await the language knows it can switch to handling another task. You can only put awaits inside of code marked async. This has problems.

The Go programming language is different than most in that it implemented this juggling "non-cooperatively". You don't explicitly mark your code with async and await, the runtime slices it up for you. They call these cheap threads "goroutines."

The Java Virtual Machine is going to get an analogous feature called "Virtual Threads."

This won't just benefit Java, but every language on the JVM including Clojure, Groovy, Kotlin, and Scala.

Virtual Threads are slated to appear as a "Preview" feature in Java 19 on September 20, 2022. This means that the implementation of the underlying feature is complete and tested, but the public API is subject to breaking changes and must be opted into explicitly.

Many of Go's patterns around concurrency arise from the conceit that you can create threads with abandon.

Since Java is about to join that club, it seems a good time to go through some of the Go concurrency examples and see what they might look like translated over.

If you want to follow along, you can get an early access build here. Unzip the files and add the bin/ directory to your path.

All the examples can be followed in sequence by using jshell.

$ java --version
openjdk 19-loom 2022-09-20
OpenJDK Runtime Environment (build 19-loom+6-625)
OpenJDK 64-Bit Server VM (build 19-loom+6-625, mixed mode, sharing)

$ jshell --enable-preview --add-modules=jdk.incubator.concurrent

Example 1. Goroutines

https://go.dev/tour/concurrency/1

package main

import (
    "fmt"
    "time"
)

func say(s string) {
    for i := 0; i < 5; i++ {
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}

func main() {
    go say("world")
    say("hello")
}

This is a pretty classic example, and frankly can be done with operating system threads just as well.

import java.time.Duration;
import java.util.concurrent.Executors;

public final class VirtualThreads {
    private VirtualThreads() {}

    static void say(String s) {
        try {
            for (int i = 0; i < 5; i++) {
                Thread.sleep(Duration.ofMillis(100));
                System.out.println(s);
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public static void main(String[] args) {
        try (var executor = 
                Executors.newVirtualThreadPerTaskExecutor()) {
            executor.submit(() -> say("world"));
            say("hello");
        }
    }
}
VirtualThreads.main(new String[]{});

A few key things to notice.

  1. There is some noise in the say method around handling what will happen if the thread is interrupted.

In this case we just choose to throw a RuntimeException to indicate we just want to crash.

In Go there is less noise, but also there no way to interrupt Go's time.Sleep.

It is also an option to propagate the InterruptedException up if we add a return null; to target the Callable overload.

public final class VirtualThreads {
    private VirtualThreads() {}

    static void say(String s) throws InterruptedException {
        for (int i = 0; i < 5; i++) {
            Thread.sleep(Duration.ofMillis(100));
            System.out.println(s);
        }
    }

    public static void main(String[] args) 
            throws InterruptedException {
        try (var executor = 
                Executors.newVirtualThreadPerTaskExecutor()) {
            executor.submit(() -> {
                say("world");
                return null;
            });
            say("hello");
        }
    }
}
  1. You need more than go say("world")

Executors.newVirtualThreadPerTaskExecutor() creates an ExecutorService. This is a thing which you can submit tasks to and it will run them "somehow".

Today most ExecutorServices are backed by some pool of threads. The purpose of the interface is to be able to write code without needing to know about the underlying strategy for maintaining that pool.

Virtual Threads are cheap, so you don't need to pool them. The interface still serves a use though. ExecutorServices will extend AutoClosable, so when used with the "try-with-resources" syntax you can make a block of code where you wait until all tasks have completed before moving on.

If you wanted to do the same creating threads directly it would look like this.

public final class VirtualThreads {
    private VirtualThreads() {}

    static void say(String s) {
        try {
            for (int i = 0; i < 5; i++) {
                Thread.sleep(Duration.ofMillis(100));
                System.out.println(s);
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public static void main(String[] args) 
            throws InterruptedException {
        var worldThread = Thread.startVirtualThread(
            () -> say("world")
        );
        
        say("hello");
        
        // Explicitly join to wait for the other thread.
        worldThread.join();
    }
}

Example 2. Channels

https://go.dev/tour/concurrency/2

package main

import "fmt"

func sum(s []int, c chan int) {
    sum := 0
    for _, v := range s {
        sum += v
    }
    c <- sum // send sum to c
}
    
func main() {
    s := []int{7, 2, 8, -9, 4, 0}

    c := make(chan int)
    go sum(s[:len(s)/2], c)
    go sum(s[len(s)/2:], c)
    x, y := <-c, <-c // receive from c

    fmt.Println(x, y, x+y)
}

Go has the concept of a "channel." This is a lightweight pipe along which values can be sent between "Communicating Sequential Processes".

Java does not have this concept in its standard library. There are similar constructs in libraries and it may come in the future, but for now no dice.

A somewhat close analogue is a BlockingQueue, so that is what I am going to use for the purposes of these examples.

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;

public final class Queues {
    private Queues() {}

    static void sum(
        int[] s, 
        int start, 
        int end, 
        BlockingQueue<Integer> queue
    ) throws InterruptedException {
        int sum = 0;
        for (int i = start; i < end; i++) {
            sum += s[i];
        }
        queue.put(sum);
    }

    public static void main(String[] args) 
        throws InterruptedException {
        int[] s = { 7, 2, 8, -9, 4, 0 };
        try (var executor = 
                Executors.newVirtualThreadPerTaskExecutor()) {
            var queue = new ArrayBlockingQueue<Integer>(1);
            executor.submit(() -> {
                sum(s, 0, s.length / 2, queue);
                return null;
            });
            executor.submit(() -> {
                sum(s, s.length / 2, s.length, queue);
                return null;
            });

            int x = queue.take();
            int y = queue.take();

            System.out.printf("%d %d %d\n", x, y, x + y);
        }
    }
}
Queues.main(new String[]{});

Instead of Go's syntax for making slices of arrays, I opted to instead pass the indexes that each sum call was expected to work on.

It is only safe to share the memory for the array like this because no other threads are changing its contents. If there was we would have summoned Gorslax. This would be true in both Go and Java.

In both cases the way this works is each worker sends the results of its computation to a logical queue. Once we have read two values off the shared queue we implicitly know that the two tasks we started have finished.

For "one shot" use cases such as this, you could also use Java's CompletableFuture for the same purpose.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;

public final class Queues {
    private Queues() {}

    static void sum(
        int[] s, 
        int start, 
        int end, 
        CompletableFuture<Integer> future
    ) {
        int sum = 0;
        for (int i = start; i < end; i++) {
            sum += s[i];
        }
        future.complete(sum);
    }

    public static void main(String[] args)
        throws InterruptedException, ExecutionException {
        int[] s = { 7, 2, 8, -9, 4, 0 };
        try (var executor = 
                Executors.newVirtualThreadPerTaskExecutor()) {
            var futureOne = new CompletableFuture<Integer>();
            var futureTwo = new CompletableFuture<Integer>();
            
            executor.submit(() -> {
                sum(s, 0, s.length / 2, futureOne);
                return null;
            });
            executor.submit(() -> {
                sum(s, s.length / 2, s.length, futureTwo);
                return null;
            });

            int x = futureOne.get();
            int y = futureTwo.get();

            System.out.printf("%d %d %d\n", x, y, x + y);
        }
    }
}

This adds ExecutionException to the explicit list of things that can go wrong, but is a more direct api for a task that will run and produce one value as a result.

In fact, if we were to change sum to return its result directly then we could eliminate its awareness that it is being run asynchronously.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;

public final class Queues {
    private Queues() {}

    static int sum(int[] s, int start, int end)  {
        int sum = 0;
        for (int i = start; i < end; i++) {
            sum += s[i];
        }
        return sum;
    }

    public static void main(String[] args) 
        throws InterruptedException, ExecutionException {
        int[] s = { 7, 2, 8, -9, 4, 0 };
        try (var executor = 
                Executors.newVirtualThreadPerTaskExecutor()) {
            var futureOne = CompletableFuture
                    .supplyAsync(
                        () ->  sum(s, 0, s.length / 2),
                        executor
                    );
            var futureTwo = CompletableFuture
                    .supplyAsync(
                        () ->  sum(s, s.length / 2, s.length),
                        executor
                    );
                    
            int x = futureOne.get();
            int y = futureTwo.get();

            System.out.printf("%d %d %d\n", x, y, x + y);
        }
    }
}

And if we don't need any of the fancier capabilities of CompletableFuture, then the plain Future objects returned by submitting directly to the ExecutorService are also an option.

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;

public final class Queues {
    private Queues() {}

    static int sum(int[] s, int start, int end)  {
        int sum = 0;
        for (int i = start; i < end; i++) {
            sum += s[i];
        }
        return sum;
    }

    public static void main(String[] args) 
        throws InterruptedException, ExecutionException {
        int[] s = { 7, 2, 8, -9, 4, 0 };
        try (var executor = 
                Executors.newVirtualThreadPerTaskExecutor()) {
            var futureOne = executor.submit(
                () ->  sum(s, 0, s.length / 2)
            );
            var futureTwo = executor.submit(
                () ->  sum(s, s.length / 2, s.length)
            );

            int x = futureOne.get();
            int y = futureTwo.get();

            System.out.printf("%d %d %d\n", x, y, x + y);
        }
    }
}

Example 3. Buffered Channels

https://go.dev/tour/concurrency/3

package main

import "fmt"

func main() {
    ch := make(chan int, 2)
    ch <- 1
    ch <- 2
    fmt.Println(<-ch)
    fmt.Println(<-ch)
}

There isn't much to this one. Go's channels can be "buffered", meaning they can accept multiple values before they will be "full". If a channel is full then any thread that wants to put a value onto that channel will have to wait until another thread takes a value off.

The ArrayBlockingQueue class we've been using works the same way.

import java.util.concurrent.ArrayBlockingQueue;

public final class BufferedQueue {
    private BufferedQueue() {}

    public static void main(String[] args) 
        throws InterruptedException {
        var queue = new ArrayBlockingQueue<Integer>(2);
        queue.put(1);
        queue.put(2);
        System.out.println(queue.take());
        System.out.println(queue.take());
    }
}
BufferedQueue.main(new String[]{});

Example 4. Range and Close

https://go.dev/tour/concurrency/4

package main

import (
    "fmt"
)

func fibonacci(n int, c chan int) {
    x, y := 0, 1
    for i := 0; i < n; i++ {
        c <- x
        x, y = y, x+y
    }
    close(c)
}

func main() {
    c := make(chan int, 10)
    go fibonacci(cap(c), c)
    for i := range c {
        fmt.Println(i)
    }
}

Here is where the differences between a Java BlockingQueue and a Go chan start to manifest themselves.

There is no ability to "close" a BlockingQueue. One way around this is to send a special "sentinel" value over the queue to indicate that a reader should stop reading. This only works cleanly when we have a single reader though.

There is also no equivalent to the range operator. We need to write a normal while loop.

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;

sealed interface TakeResult<T> {
    record GotValue<T>(T value) implements TakeResult<T> {}
    record NoValue<T>() implements TakeResult<T> {}
}

public final class Fibonacci {
    private Fibonacci() {}

    static void fibonacci(
        int n, 
        BlockingQueue<TakeResult<Integer>> queue
    ) throws InterruptedException {
        int x = 0;
        int y = 1;

        for (int i = 0; i < n; i++) {
            queue.put(new TakeResult.GotValue<>(x));
            int temp = x;
            x = y;
            y = temp + x;
        }
        queue.put(new TakeResult.NoValue<>());
    }

    public static void main(String[] args) 
        throws InterruptedException {
        try (var executor = 
                Executors.newVirtualThreadPerTaskExecutor()) {
            var queue = 
                new ArrayBlockingQueue<TakeResult<Integer>>(10);
            executor.submit(() -> {
                fibonacci(queue.remainingCapacity(), queue);
                return null;
            });

            while (queue.take() instanceof 
                    GotValue<Integer> gotValue) {
                System.out.println(gotValue.value());
            }
        }
    }
}
Fibonacci.main(new String[]{});

This snippet makes use of sealed interfaces, a relatively recent addition to Java, for modeling getting either a legitimate value over the queue or a signal to stop consuming.

The other options for the same result would be to drop the generic types from the BlockingQueue and use a special sentinel instance of Object or disallow null values for normal use and have that indicate that the queue is closed.

Example 5. Select

https://go.dev/tour/concurrency/5

package main

import "fmt"

func fibonacci(c, quit chan int) {
    x, y := 0, 1
    for {
        select {
        case c <- x:
            x, y = y, x+y
        case <-quit:
            fmt.Println("quit")
            return
        }
    }
}

func main() {
    c := make(chan int)
    quit := make(chan int)
    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println(<-c)
        }
        quit <- 0
    }()
    fibonacci(c, quit)
}

There is also no equivalent to select for BlockingQueues. We have to implement that logic in a hand rolled loop.

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;

public final class SelectQueues {
    private SelectQueues() {}

    static void fibonacci(BlockingQueue<Integer> queue,
                          BlockingQueue<Integer> quit) {
        int x = 0;
        int y = 1;

        while (true) {
            if (queue.offer(x)) {
                int temp = x;
                x = y;
                y = temp + x;
            }
            if (quit.poll() != null) {
                System.out.println("quit");
                break;
            }
        }
    }

    public static void main(String[] args) {
        var queue = new ArrayBlockingQueue<Integer>(1);
        var quit = new ArrayBlockingQueue<Integer>(1);

        try (var executor = 
                Executors.newVirtualThreadPerTaskExecutor()) {
            executor.submit(() -> {
                for (int i = 0; i < 10; i++) {
                    System.out.println(queue.take());
                }
                quit.put(0);
                return null;
            });

            fibonacci(queue, quit);
        }
    }
}
SelectQueues.main(new String[]{});

I'm unsure for what purpose the Go version uses a channel of integers as its quit mechanism. In Java it is more natural to use something like a shared AtomicBoolean as a signal for shutdowwn.

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

public final class SelectQueues {
    private SelectQueues() {}

    static void fibonacci(BlockingQueue<Integer> queue,
                          AtomicBoolean quit) {
        int x = 0;
        int y = 1;

        while (!quit.get()) {
            if (queue.offer(x)) {
                int temp = x;
                x = y;
                y = temp + x;
            }
        }

        System.out.println("quit");
    }

    public static void main(String[] args) throws InterruptedException {
        var queue = new ArrayBlockingQueue<Integer>(1);
        var quit = new AtomicBoolean(false);

        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            executor.submit(() -> {
                for (int i = 0; i < 10; i++) {
                    System.out.println(queue.take());
                }
                quit.set(true);
                return null;
            });

            fibonacci(queue, quit);
        }
    }
}

If it were a situation with multiple "one shot" queues then CompletableFuture#anyOf and similar methods might suffice.

Example 6. Default Selection

https://go.dev/tour/concurrency/6

package main

import (
    "fmt"
    "time"
)

func main() {
    tick := time.Tick(100 * time.Millisecond)
    boom := time.After(500 * time.Millisecond)
    for {
        select {
        case <-tick:
            fmt.Println("tick.")
        case <-boom:
            fmt.Println("BOOM!")
            return
        default:
            fmt.Println("    .")
            time.Sleep(50 * time.Millisecond)
        }
    }
}

There isn't a novel transformation of this default case syntax, but it is worth noting how Go's time library directly returns its channels as the mechanism for handling delays.

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;

public final class GreenThreadDress {
    private GreenThreadDress() {}

    public static void main(String[] args) 
            throws InterruptedException {
        var executor = 
                Executors.newVirtualThreadPerTaskExecutor();
        try {
            var tick = new ArrayBlockingQueue<Instant>(1);
            var boom = new ArrayBlockingQueue<Instant>(1);

            executor.submit(() -> {
                while (true) {
                    Thread.sleep(Duration.ofMillis(100));
                    tick.put(Instant.now());
                }
            });

            executor.submit(() -> {
                Thread.sleep(500);
                boom.put(Instant.now());
                return null;
            });

            while (true) {
                if (tick.poll() != null) {
                    System.out.println("tick.");
                }
                else if (boom.poll() != null) {
                    System.out.println("BOOM!");
                    break;
                }
                else {
                    System.out.println("    .");
                    Thread.sleep(Duration.ofMillis(50));
                }
            }
        } finally {
            executor.shutdownNow();
            executor.close();
        }
    }
}
GreenThreadDress.main(new String[]{});

Here we rely on the behavior of ExecutorService#shutdownNow to interrupt the task pushing to the tick queue. Unlike with the built in Go time.Tick where the underlying goroutine is never cancelled and is a "leak."

Example 7: Equivalent Binary Trees

https://go.dev/tour/concurrency/7 https://go.dev/tour/concurrency/8

package main

import "golang.org/x/tour/tree"

// Walk walks the tree t sending all values
// from the tree to the channel ch.
func Walk(t *tree.Tree, ch chan int)

// Same determines whether the trees
// t1 and t2 contain the same values.
func Same(t1, t2 *tree.Tree) bool

func main() {
}

This one is a little bit different since its not a straight example, but instead a challenge you are meant to complete.

A full solution can be found on this StackOverflow question.

package main

import "fmt"
import "golang.org/x/tour/tree"

// Walk walks the tree t sending all values
// from the tree to the channel ch.
func Walk(t *tree.Tree, ch chan int) {
    var walker func(t *tree.Tree)
    walker = func (t *tree.Tree) {
        if (t == nil) {
            return
        }
        walker(t.Left)
        ch <- t.Value
        walker(t.Right)
    }
    walker(t)
    close(ch)
}

// Same determines whether the trees
// t1 and t2 contain the same values.
func Same(t1, t2 *tree.Tree) bool {
    ch1, ch2 := make(chan int), make(chan int)

    go Walk(t1, ch1)
    go Walk(t2, ch2)

    for {
        v1,ok1 := <- ch1
        v2,ok2 := <- ch2

        if v1 != v2 || ok1 != ok2 {
            return false
        }

        if !ok1 {
            break
        }
    }

    return true
}

func main() {
    fmt.Println("1 and 1 same: ", Same(tree.New(1), tree.New(1)))
    fmt.Println("1 and 2 same: ", Same(tree.New(1), tree.New(2)))

}

Where the Tree type is defined seperately here.

// Copyright 2011 The Go Authors.  All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package tree // import "golang.org/x/tour/tree"

import (
    "fmt"
    "math/rand"
)

// A Tree is a binary tree with integer values.
type Tree struct {
    Left  *Tree
    Value int
    Right *Tree
}

// New returns a new, random binary tree holding the values k, 2k, ..., 10k.
func New(k int) *Tree {
    var t *Tree
    for _, v := range rand.Perm(10) {
        t = insert(t, (1+v)*k)
    }
    return t
}

func insert(t *Tree, v int) *Tree {
    if t == nil {
        return &Tree{nil, v, nil}
    }
    if v < t.Value {
        t.Left = insert(t.Left, v)
    } else {
        t.Right = insert(t.Right, v)
    }
    return t
}

func (t *Tree) String() string {
    if t == nil {
        return "()"
    }
    s := ""
    if t.Left != nil {
        s += t.Left.String() + " "
    }
    s += fmt.Sprint(t.Value)
    if t.Right != nil {
        s += " " + t.Right.String()
    }
    return "(" + s + ")"
}

So before touching the concurrency bits we need to translate this Tree type.

import java.util.Collections;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public sealed interface Tree {
    Tree insert(int v);

    record NotEmpty(
            Tree left,
            int value,
            Tree right
    ) implements Tree {
        @Override
        public Tree insert(int v) {
            if (v < this.value) {
                return new NotEmpty(
                        this.left.insert(v),
                        this.value,
                        this.right
                );
            }
            else {
                return new NotEmpty(
                        this.left,
                        this.value,
                        this.right.insert(v)
                );
            }
        }

        @Override
        public String toString() {
            return "( " +
                this.left +
                this.value +
                this.right +
                " )";
        }
    }

    record Empty() implements Tree {
        @Override
        public Tree insert(int v) {
            return new NotEmpty(new Empty(), v, new Empty());
        }

        @Override
        public String toString() {
            return "";
        }
    }

    static Tree random(int k) {
        var vs = IntStream.range(0, 10)
                .boxed()
                .collect(Collectors.toList());
        Collections.shuffle(vs);
        
        Tree t = new Empty();
        for (int v : vs) {
            t = t.insert((1 + v) * k);
        }
        return t;
    }
}

A 1-1 translation of the Go wouldn't be fun Java, so I opted to translate it instead to an immutable sum type. This won't affect the concurrent part other than a stronger conceptual guarentee that we can safely share the tree across multiple threads.

With this version the Go maps pretty straight forwardly to this.

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;

sealed interface TakeResult<T> {
    record GotValue<T>(T value) implements TakeResult<T> {}
    record NoValue<T>() implements TakeResult<T> {}
}

public final class TreeWalker {
    private TreeWalker() {}

    private static void walkHelper(
            Tree tree,
            BlockingQueue<TakeResult<Integer>> queue
    ) throws InterruptedException {
        if (tree == null) {
            return;
        }
        walkHelper(tree.left, queue);
        queue.put(new TakeResult.GotValue<>(
                tree.value
        ));
        walkHelper(tree.right, queue);
    }

    static void walk(
            Tree tree,
            BlockingQueue<TakeResult<Integer>> queue
    ) throws InterruptedException {
        walkHelper(tree, queue);
        queue.put(new TakeResult.NoValue<>());
    }

    static boolean same(Tree t1, Tree t2) 
            throws InterruptedException {
        var queue1 = new ArrayBlockingQueue<TakeResult<Integer>>(1);
        var queue2 = new ArrayBlockingQueue<TakeResult<Integer>>(1);

        var executor =
                Executors.newVirtualThreadPerTaskExecutor();
        try {
            executor.submit(() -> {
                walk(t1, queue1);
                return null;
            });

            executor.submit(() -> {
                walk(t2, queue2);
                return null;
            });

            while (true) {
                var result1 = queue1.take();
                var result2 = queue2.take();
                if (!result1.equals(result2)) {
                    return false;
                }

                if (result1 instanceof TakeResult.NoValue<Integer>) {
                    break;
                }
            }
            return true;
        } finally {
            executor.shutdownNow();
            executor.close();
        }
    }

    public static void main(String[] args) 
            throws InterruptedException {
        System.out.println(
            "1 and 1 same: " + 
            same(Tree.random(1), Tree.random(1))
        );
        System.out.println(
            "1 and 2 same: " + 
            same(Tree.random(1), Tree.random(2))
        );
    }
}
TreeWalker.main(new String[]{});

We use the same tricks as before to emulate a closable queue with TakeResult. Then we translate the select statement to a loop calling offer and poll.

The example Go solution had a recursive local closure for walk. While technically possible via some wizardry, its more straight forward in Java to make a helper method.

There is also a reliance on the walk tasks responding correctly to shutdownNow. If they did not, executor.close() would hang and the scope wouldn't exit.

Example 8: sync.Mutex

https://go.dev/tour/concurrency/9

package main

import (
    "fmt"
    "sync"
    "time"
)

// SafeCounter is safe to use concurrently.
type SafeCounter struct {
    mu sync.Mutex
    v  map[string]int
}

// Inc increments the counter for the given key.
func (c *SafeCounter) Inc(key string) {
    c.mu.Lock()
    // Lock so only one goroutine at a time can access the map c.v.
    c.v[key]++
    c.mu.Unlock()
}

// Value returns the current value of the counter for the given key.
func (c *SafeCounter) Value(key string) int {
    c.mu.Lock()
    // Lock so only one goroutine at a time can access the map c.v.
    defer c.mu.Unlock()
    return c.v[key]
}

func main() {
    c := SafeCounter{v: make(map[string]int)}
    for i := 0; i < 1000; i++ {
        go c.Inc("somekey")
    }

    time.Sleep(time.Second)
    fmt.Println(c.Value("somekey"))
}

Java has a direct analogue to sync.Mutex in ReentrantLock. We can make this same program without much issue.

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;

final class SafeCounter {
    private final Map<String, Integer> v;
    private final ReentrantLock lock;

    public SafeCounter() {
        this.v = new HashMap<>();
        this.lock = new ReentrantLock();
    }
    void inc(String key) {
        lock.lock();
        try {
            v.put(key, v.getOrDefault(key, 0) + 1);
        } finally {
            lock.unlock();
        }
    }

    int value(String key) {
        lock.lock();
        try {
            return v.getOrDefault(key, 0);
        } finally {
            lock.unlock();
        }
    }
}

public final class Mutex {
    private Mutex() {}

    public static void main(String[] args) 
            throws InterruptedException {
        var c = new SafeCounter();
        for (int i = 0; i < 1000; i++) {
            Thread.startVirtualThread(
                    () -> c.inc("somekey")
            );
        }

        Thread.sleep(Duration.ofSeconds(1));
        System.out.println(c.value("somekey"));
    }
}
Mutex.main(new String[]{});

The only thing of note is that unlike Go where you can defer some arbitrary action like releasing your hold on a lock, in Java the general mechanism for "cleanup that must happen" is using the finally clause of a try block.

For Java ReentrantLock is special in that its locks can "escape" a lexical scope. You can lock before entering a method and unlock in a totally unrelated one.

If you don't need this ability then you can use the fact that every "identity" having object in Java can be used as a lock with synchronized.

final class SafeCounter {
    private final Map<String, Integer> v;

    public SafeCounter() {
        this.v = new HashMap<>();
    }
    void inc(String key) {
        synchronized (this) {
            v.put(key, v.getOrDefault(key, 0) + 1);
        }
    }

    int value(String key) {
        synchronized (this) {
            return v.getOrDefault(key, 0);
        }
    }
}

If the thing being synchronized on is just this and that synchronization lasts the entire scope of the method, we can just mark the method as synchronized for the same effect.

final class SafeCounter {
    private final Map<String, Integer> v;

    public SafeCounter() {
        this.v = new HashMap<>();
    }

    synchronized void inc(String key) {
        v.put(key, v.getOrDefault(key, 0) + 1);
    }

    synchronized int value(String key) {
        return v.getOrDefault(key, 0);
    }
}

Example 9: Web Crawler

https://go.dev/tour/concurrency/10

package main

import (
    "fmt"
)

type Fetcher interface {
    // Fetch returns the body of URL and
    // a slice of URLs found on that page.
    Fetch(url string) (body string, urls []string, err error)
}

// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher) {
    // TODO: Fetch URLs in parallel.
    // TODO: Don't fetch the same URL twice.
    // This implementation doesn't do either:
    if depth <= 0 {
        return
    }
    body, urls, err := fetcher.Fetch(url)
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Printf("found: %s %q\n", url, body)
    for _, u := range urls {
        Crawl(u, depth-1, fetcher)
    }
    return
}

func main() {
    Crawl("https://golang.org/", 4, fetcher)
}

// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResult

type fakeResult struct {
    body string
    urls []string
}

func (f fakeFetcher) Fetch(url string) (string, []string, error) {
    if res, ok := f[url]; ok {
        return res.body, res.urls, nil
    }
    return "", nil, fmt.Errorf("not found: %s", url)
}

// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{
    "https://golang.org/": &fakeResult{
        "The Go Programming Language",
        []string{
            "https://golang.org/pkg/",
            "https://golang.org/cmd/",
        },
    },
        "https://golang.org/pkg/": &fakeResult{
            "Packages",
            []string{
                "https://golang.org/",
                "https://golang.org/cmd/",
                "https://golang.org/pkg/fmt/",
                "https://golang.org/pkg/os/",
            },
        },
        "https://golang.org/pkg/fmt/": &fakeResult{
            "Package fmt",
            []string{
                "https://golang.org/",
                "https://golang.org/pkg/",
            },
        },
        "https://golang.org/pkg/os/": &fakeResult{
            "Package os",
            []string{
                "https://golang.org/",
                "https://golang.org/pkg/",
            },
        },
}

Another challenge problem. Before going to a reference solution, I am going to translate the synchronous example.

Go has a pattern of returning an error as a conditionally filled in extra return value. This isn't super idiomatic in Java, so instead I am going to model the error case as an Exception.

The Go version also returns both a body and an array of urls from a single call. For Java we can accomplish this by making an aggregate containing both values.

final class FetcherException extends Exception {
    FetcherException(String message) {
        super(message);
    }
}

interface Fetcher {
    record Result(String body, String[] urls) {
    }

    Result fetch(String url) throws FetcherException;
}

We can make a "fake" implementation of fetcher using the same technique as Go by backing it with an in-memory map.

import java.util.Map;

final class FakeFetcher implements Fetcher {
    private final Map<String, Result> results;

    public FakeFetcher(Map<String, Result> results) {
        this.results = results;
    }

    @Override
    public Result fetch(String url) throws FetcherException {
        var result = this.results.get(url);
        if (result == null) {
            throw new FetcherException("Not Found: " + url);
        } else {
            return result;
        }
    }

    public static Fetcher example() {
        return new FakeFetcher(Map.of(
                "https://golang.org/", new Fetcher.Result(
                        "The Go Programming Language",
                        new String[]{
                                "https://golang.org/pkg/",
                                "https://golang.org/cmd/"
                        }
                ),
                "https://golang.org/pkg/", new Fetcher.Result(
                        "Packages",
                        new String[]{
                                "https://golang.org/",
                                "https://golang.org/cmd/",
                                "https://golang.org/pkg/fmt/",
                                "https://golang.org/pkg/os/",
                        }
                ),
                "https://golang.org/pkg/fmt/", new Fetcher.Result(
                        "Package fmt",
                        new String[]{
                                "https://golang.org/",
                                "https://golang.org/pkg/",
                        }
                ),
                "https://golang.org/pkg/os/", new Fetcher.Result(
                        "Package os",
                        new String[]{
                                "https://golang.org/",
                                "https://golang.org/pkg/",
                        }
                )
        ));
    }
}

Then the synchronous fetcher is just a regular recursive function as it was in Go.

public final class WebCrawler {
    private WebCrawler() {
    }

    static void crawl(
        String url, 
        int depth, 
        Fetcher fetcher
    ) {
        if (depth <= 0) {
            return;
        }

        Fetcher.Result result;
        try {
            result = fetcher.fetch(url);
        } catch (FetcherException e) {
            System.out.println(e.getMessage());
            return;
        }

        var body = result.body();
        var urls = result.urls();

        System.out.printf(
            "Found: %s %s\n", 
            body, 
            Arrays.toString(urls)
        );

        for (var u : urls) {
            crawl(u, depth - 1, fetcher);
        }
    }

    public static void main(String[] args) {
        var fetcher = FakeFetcher.example();

        crawl("https://golang.org/", 4, fetcher);
    }
}
WebCrawler.main(new String[]{});

Now I am going to pull an answer to the exercise from this StackOverflow post.

// SafeUrlMap is safe to use concurrently.
type SafeUrlMap struct {
    v   map[string]string
    mux sync.Mutex
}

func (c *SafeUrlMap) Set(key string, body string) {
    c.mux.Lock()
    // Lock so only one goroutine at a time can access the map c.v.
    c.v[key] = body
    c.mux.Unlock()
}

// Value returns mapped value for the given key.
func (c *SafeUrlMap) Value(key string) (string, bool) {
    c.mux.Lock()
    // Lock so only one goroutine at a time can access the map c.v.
    defer c.mux.Unlock()
    val, ok := c.v[key]
    return val, ok
}

// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher, urlMap SafeUrlMap) {
    defer wg.Done()
    urlMap.Set(url, body)

    if depth <= 0 {
        return
    }

    body, urls, err := fetcher.Fetch(url)
    if err != nil {
        fmt.Println(err)
        return
    }

    for _, u := range urls {
        if _, ok := urlMap.Value(u); !ok {
            wg.Add(1)
            go Crawl(u, depth-1, fetcher, urlMap)
        }
    }

    return
}

var wg sync.WaitGroup

func main() {
    urlMap := SafeUrlMap{v: make(map[string]string)}

    wg.Add(1)
    go Crawl("http://golang.org/", 4, fetcher, urlMap)
    wg.Wait()

    for url := range urlMap.v {
        body, _ := urlMap.Value(url)
        fmt.Printf("found: %s %q\n", url, body)
    }
}

This solution makes use of a sync.WaitGroup. There is no direct analogue in Java, but we can pretty easily make something that has similar semantics.

I am taking the implementation from this StackOverflow question.

final class WaitGroup {
    private int jobs = 0;

    public synchronized void add(int i) {
        jobs += i;
    }

    public synchronized void done() {
        if (--jobs == 0) {
            notifyAll();
        }
    }

    public synchronized void await() 
            throws InterruptedException {
        while (jobs > 0) {
            wait();
        }
    }
}

The SafeUrlMap type is also fairly trivial to assemble, but instead of that I am going to pass around a normal HashSet and manually synchronize on it.

There are many other options, including using Collections#synchronizedSet or wrapping a ConcurrentHashMap, but I think this will be the easiest to follow.

public final class WebCrawler {
    private WebCrawler() {
    }

    static void crawlTask(
            String url,
            int depth,
            Fetcher fetcher,
            ExecutorService executor,
            Set<String> seen,
            WaitGroup waitGroup
    ) {
        try {
            if (depth <= 0) {
                return;
            }

            Fetcher.Result result;
            try {
                result = fetcher.fetch(url);
            } catch (FetcherException e) {
                System.out.println(e.getMessage());
                return;
            }

            var body = result.body();
            var urls = result.urls();

            System.out.printf(
                "Found: %s %s\n", 
                body, 
                Arrays.toString(urls)
            );

            for (var u : urls) {
                synchronized (seen) {
                    if (!seen.contains(u)) {
                        seen.add(u);
                        waitGroup.add(1);
                        executor.submit(() -> crawlTask(
                                u,
                                depth - 1,
                                fetcher,
                                executor,
                                seen,
                                waitGroup
                        ));
                    }
                }
            }
        } finally {
            waitGroup.done();
        }
    }

    static void crawl(String url, int depth, Fetcher fetcher)
            throws InterruptedException {
        try (var executor =
                     Executors.newVirtualThreadPerTaskExecutor()) {
            var waitGroup = new WaitGroup();
            waitGroup.add(1);

            executor.submit(() -> crawlTask(
                    url,
                    depth,
                    fetcher,
                    executor,
                    new HashSet<>(),
                    waitGroup
            ));

            waitGroup.await();
        }
    }

    public static void main(String[] args) 
            throws InterruptedException {
        var fetcher = FakeFetcher.example();
        crawl("https://golang.org/", 4, fetcher);
    }
}
WebCrawler.main(new String[]{});

We could stop there, but there is one api that is in an incubator module in the early access builds that can replace our home rolled WaitGroup.

This is why I had --add-modules=jdk.incubator.concurrent in the jshell command up top.

A StructuredTaskScope.ShutdownOnFailure lets us submit an arbitrary number of tasks to the scope recursively and will only close after either all those tasks are complete. There is another implementation StructuredTaskScope.ShutdownOnSuccess that will finish after a single task succeeds.

This obviates the need to manually count up and down with a WaitGroup.

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import jdk.incubator.concurrent.StructuredTaskScope;

public final class WebCrawler {
    private WebCrawler() {
    }

    static void crawlTask(
            String url,
            int depth,
            Fetcher fetcher,
            StructuredTaskScope.ShutdownOnFailure
                    structuredTaskScope,
            Set<String> seen
    ) {
        if (depth <= 0) {
            return;
        }

        Fetcher.Result result;
        try {
            result = fetcher.fetch(url);
        } catch (FetcherException e) {
            System.out.println(e.getMessage());
            return;
        }

        var body = result.body();
        var urls = result.urls();

        System.out.printf(
            "Found: %s %s\n", 
            body, 
            Arrays.toString(urls)
        );

        for (var u : urls) {
            synchronized (seen) {
                if (!seen.contains(u)) {
                    seen.add(u);
                    structuredTaskScope.fork(() -> {
                        crawlTask(
                                u,
                                depth - 1,
                                fetcher,
                                structuredTaskScope,
                                seen
                        );
                        return null;
                    });
                }
            }
        }

    }

    static void crawl(String url, int depth, Fetcher fetcher)
            throws InterruptedException {
        try (var structuredTaskScope =
                     new StructuredTaskScope.ShutdownOnFailure()) {

            structuredTaskScope.fork(() -> {
                crawlTask(
                        url,
                        depth,
                        fetcher,
                        structuredTaskScope,
                        new HashSet<>()
                );
                return null;
            });

            structuredTaskScope.join();
        }
    }

    public static void main(String[] args) 
            throws InterruptedException {
        var fetcher = FakeFetcher.example();
        crawl("https://golang.org/", 4, fetcher);
    }
}
WebCrawler.main(new String[]{});

This combines the role of the ExecutorService and the WorkGroup into one object which at the least makes for one less point of coordination and slightly cleaner code.

The exact shape the StructuredTaskScope api will take is very much in flux so if you are reading this a year or so in the future this snippet might not work.

Wrapping up

Hopefully this was informative. If not thats fine too.

Leave questions, comments, and suggestions in the comments.


<- Index