Threads are usually expensive.
There is no way for your operating system to know exactly how much stack space a thread will need so it allocates an amount on the order of around a kilobyte initially and then around a megabyte once the thread starts to be used. You only have around a bakers dozen gigabytes of RAM, so you can only have give or take 10,000 active threads.
The way around this is to implement a some mechanism that takes a limited number of operating system threads and juggles a much larger number of "logical threads" on top of them.
For most languages, this means adding some form of async/await
syntax. Where you put an await
the language knows it can switch to handling another task. You can only put await
s inside of code marked async
. This has problems.
The Go programming language is different than most in that it implemented this juggling "non-cooperatively". You don't explicitly mark your code with async
and await
, the runtime slices it up for you. They call these cheap threads "goroutines."
The Java Virtual Machine is going to get an analogous feature called "Virtual Threads."
This won't just benefit Java, but every language on the JVM including Clojure, Groovy, Kotlin, and Scala.
Virtual Threads are slated to appear as a "Preview" feature in Java 19 on September 20, 2022. This means that the implementation of the underlying feature is complete and tested, but the public API is subject to breaking changes and must be opted into explicitly.
Many of Go's patterns around concurrency arise from the conceit that you can create threads with abandon.
Since Java is about to join that club, it seems a good time to go through some of the Go concurrency examples and see what they might look like translated over.
If you want to follow along, you can get an early access build here. Unzip the files and add the bin/
directory to your path.
All the examples can be followed in sequence by using jshell
.
$ java --version
openjdk 19-loom 2022-09-20
OpenJDK Runtime Environment (build 19-loom+6-625)
OpenJDK 64-Bit Server VM (build 19-loom+6-625, mixed mode, sharing)
$ jshell --enable-preview --add-modules=jdk.incubator.concurrent
https://go.dev/tour/concurrency/1
package main
import (
"fmt"
"time"
)
func say(s string) {
for i := 0; i < 5; i++ {
.Sleep(100 * time.Millisecond)
time.Println(s)
fmt}
}
func main() {
go say("world")
("hello")
say}
This is a pretty classic example, and frankly can be done with operating system threads just as well.
import java.time.Duration;
import java.util.concurrent.Executors;
public final class VirtualThreads {
private VirtualThreads() {}
static void say(String s) {
try {
for (int i = 0; i < 5; i++) {
Thread.sleep(Duration.ofMillis(100));
System.out.println(s);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
try (var executor =
Executors.newVirtualThreadPerTaskExecutor()) {
.submit(() -> say("world"));
executorsay("hello");
}
}
}
.main(new String[]{}); VirtualThreads
A few key things to notice.
say
method around handling what will happen if the thread is interrupted.In this case we just choose to throw a RuntimeException
to indicate we just want to crash.
In Go there is less noise, but also there no way to interrupt Go's time.Sleep
.
It is also an option to propagate the InterruptedException
up if we add a return null;
to target the Callable
overload.
public final class VirtualThreads {
private VirtualThreads() {}
static void say(String s) throws InterruptedException {
for (int i = 0; i < 5; i++) {
Thread.sleep(Duration.ofMillis(100));
System.out.println(s);
}
}
public static void main(String[] args)
throws InterruptedException {
try (var executor =
Executors.newVirtualThreadPerTaskExecutor()) {
.submit(() -> {
executorsay("world");
return null;
});
say("hello");
}
}
}
go say("world")
Executors.newVirtualThreadPerTaskExecutor()
creates an ExecutorService
. This is a thing which you can submit tasks to and it will run them "somehow".
Today most ExecutorService
s are backed by some pool of threads. The purpose of the interface is to be able to write code without needing to know about the underlying strategy for maintaining that pool.
Virtual Threads are cheap, so you don't need to pool them. The interface still serves a use though. ExecutorService
s will extend AutoClosable
, so when used with the "try-with-resources" syntax you can make a block of code where you wait until all tasks have completed before moving on.
If you wanted to do the same creating threads directly it would look like this.
public final class VirtualThreads {
private VirtualThreads() {}
static void say(String s) {
try {
for (int i = 0; i < 5; i++) {
Thread.sleep(Duration.ofMillis(100));
System.out.println(s);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args)
throws InterruptedException {
= Thread.startVirtualThread(
var worldThread () -> say("world")
);
say("hello");
// Explicitly join to wait for the other thread.
.join();
worldThread}
}
https://go.dev/tour/concurrency/2
package main
import "fmt"
func sum(s []int, c chan int) {
:= 0
sum for _, v := range s {
+= v
sum }
<- sum // send sum to c
c }
func main() {
:= []int{7, 2, 8, -9, 4, 0}
s
:= make(chan int)
c go sum(s[:len(s)/2], c)
go sum(s[len(s)/2:], c)
, y := <-c, <-c // receive from c
x
.Println(x, y, x+y)
fmt}
Go has the concept of a "channel." This is a lightweight pipe along which values can be sent between "Communicating Sequential Processes".
Java does not have this concept in its standard library. There are similar constructs in libraries and it may come in the future, but for now no dice.
A somewhat close analogue is a BlockingQueue
, so that is what I am going to use for the purposes of these examples.
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
public final class Queues {
private Queues() {}
static void sum(
int[] s,
int start,
int end,
BlockingQueue<Integer> queue
) throws InterruptedException {
int sum = 0;
for (int i = start; i < end; i++) {
+= s[i];
sum }
.put(sum);
queue}
public static void main(String[] args)
throws InterruptedException {
int[] s = { 7, 2, 8, -9, 4, 0 };
try (var executor =
Executors.newVirtualThreadPerTaskExecutor()) {
= new ArrayBlockingQueue<Integer>(1);
var queue .submit(() -> {
executorsum(s, 0, s.length / 2, queue);
return null;
});
.submit(() -> {
executorsum(s, s.length / 2, s.length, queue);
return null;
});
int x = queue.take();
int y = queue.take();
System.out.printf("%d %d %d\n", x, y, x + y);
}
}
}
.main(new String[]{}); Queues
Instead of Go's syntax for making slices of arrays, I opted to instead pass the indexes that each sum
call was expected to work on.
It is only safe to share the memory for the array like this because no other threads are changing its contents. If there was we would have summoned Gorslax. This would be true in both Go and Java.
In both cases the way this works is each worker sends the results of its computation to a logical queue. Once we have read two values off the shared queue we implicitly know that the two tasks we started have finished.
For "one shot" use cases such as this, you could also use Java's CompletableFuture
for the same purpose.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
public final class Queues {
private Queues() {}
static void sum(
int[] s,
int start,
int end,
<Integer> future
CompletableFuture) {
int sum = 0;
for (int i = start; i < end; i++) {
+= s[i];
sum }
.complete(sum);
future}
public static void main(String[] args)
throws InterruptedException, ExecutionException {
int[] s = { 7, 2, 8, -9, 4, 0 };
try (var executor =
Executors.newVirtualThreadPerTaskExecutor()) {
= new CompletableFuture<Integer>();
var futureOne = new CompletableFuture<Integer>();
var futureTwo
.submit(() -> {
executorsum(s, 0, s.length / 2, futureOne);
return null;
});
.submit(() -> {
executorsum(s, s.length / 2, s.length, futureTwo);
return null;
});
int x = futureOne.get();
int y = futureTwo.get();
System.out.printf("%d %d %d\n", x, y, x + y);
}
}
}
This adds ExecutionException
to the explicit list of things that can go wrong, but is a more direct api for a task that will run and produce one value as a result.
In fact, if we were to change sum
to return its result directly then we could eliminate its awareness that it is being run asynchronously.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
public final class Queues {
private Queues() {}
static int sum(int[] s, int start, int end) {
int sum = 0;
for (int i = start; i < end; i++) {
+= s[i];
sum }
return sum;
}
public static void main(String[] args)
throws InterruptedException, ExecutionException {
int[] s = { 7, 2, 8, -9, 4, 0 };
try (var executor =
Executors.newVirtualThreadPerTaskExecutor()) {
= CompletableFuture
var futureOne .supplyAsync(
() -> sum(s, 0, s.length / 2),
executor);
= CompletableFuture
var futureTwo .supplyAsync(
() -> sum(s, s.length / 2, s.length),
executor);
int x = futureOne.get();
int y = futureTwo.get();
System.out.printf("%d %d %d\n", x, y, x + y);
}
}
}
And if we don't need any of the fancier capabilities of CompletableFuture
, then the plain Future
objects returned by submitting directly to the ExecutorService
are also an option.
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
public final class Queues {
private Queues() {}
static int sum(int[] s, int start, int end) {
int sum = 0;
for (int i = start; i < end; i++) {
+= s[i];
sum }
return sum;
}
public static void main(String[] args)
throws InterruptedException, ExecutionException {
int[] s = { 7, 2, 8, -9, 4, 0 };
try (var executor =
Executors.newVirtualThreadPerTaskExecutor()) {
= executor.submit(
var futureOne () -> sum(s, 0, s.length / 2)
);
= executor.submit(
var futureTwo () -> sum(s, s.length / 2, s.length)
);
int x = futureOne.get();
int y = futureTwo.get();
System.out.printf("%d %d %d\n", x, y, x + y);
}
}
}
https://go.dev/tour/concurrency/3
package main
import "fmt"
func main() {
:= make(chan int, 2)
ch <- 1
ch <- 2
ch .Println(<-ch)
fmt.Println(<-ch)
fmt}
There isn't much to this one. Go's channels can be "buffered", meaning they can accept multiple values before they will be "full". If a channel is full then any thread that wants to put a value onto that channel will have to wait until another thread takes a value off.
The ArrayBlockingQueue
class we've been using works the same way.
import java.util.concurrent.ArrayBlockingQueue;
public final class BufferedQueue {
private BufferedQueue() {}
public static void main(String[] args)
throws InterruptedException {
= new ArrayBlockingQueue<Integer>(2);
var queue .put(1);
queue.put(2);
queueSystem.out.println(queue.take());
System.out.println(queue.take());
}
}
.main(new String[]{}); BufferedQueue
https://go.dev/tour/concurrency/4
package main
import (
"fmt"
)
func fibonacci(n int, c chan int) {
, y := 0, 1
xfor i := 0; i < n; i++ {
<- x
c , y = y, x+y
x}
close(c)
}
func main() {
:= make(chan int, 10)
c go fibonacci(cap(c), c)
for i := range c {
.Println(i)
fmt}
}
Here is where the differences between a Java BlockingQueue
and a Go chan
start to manifest themselves.
There is no ability to "close" a BlockingQueue
. One way around this is to send a special "sentinel" value over the queue to indicate that a reader should stop reading. This only works cleanly when we have a single reader though.
There is also no equivalent to the range
operator. We need to write a normal while
loop.
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
interface TakeResult<T> {
sealed <T>(T value) implements TakeResult<T> {}
record GotValue<T>() implements TakeResult<T> {}
record NoValue}
public final class Fibonacci {
private Fibonacci() {}
static void fibonacci(
int n,
BlockingQueue<TakeResult<Integer>> queue
) throws InterruptedException {
int x = 0;
int y = 1;
for (int i = 0; i < n; i++) {
.put(new TakeResult.GotValue<>(x));
queueint temp = x;
= y;
x = temp + x;
y }
.put(new TakeResult.NoValue<>());
queue}
public static void main(String[] args)
throws InterruptedException {
try (var executor =
Executors.newVirtualThreadPerTaskExecutor()) {
=
var queue new ArrayBlockingQueue<TakeResult<Integer>>(10);
.submit(() -> {
executorfibonacci(queue.remainingCapacity(), queue);
return null;
});
while (queue.take() instanceof
<Integer> gotValue) {
GotValueSystem.out.println(gotValue.value());
}
}
}
}
.main(new String[]{}); Fibonacci
This snippet makes use of sealed interfaces, a relatively recent addition to Java, for modeling getting either a legitimate value over the queue or a signal to stop consuming.
The other options for the same result would be to drop the generic types from the BlockingQueue
and use a special sentinel instance of Object
or disallow null
values for normal use and have that indicate that the queue is closed.
https://go.dev/tour/concurrency/5
package main
import "fmt"
func fibonacci(c, quit chan int) {
, y := 0, 1
xfor {
select {
case c <- x:
, y = y, x+y
xcase <-quit:
.Println("quit")
fmtreturn
}
}
}
func main() {
:= make(chan int)
c := make(chan int)
quit go func() {
for i := 0; i < 10; i++ {
.Println(<-c)
fmt}
<- 0
quit }()
(c, quit)
fibonacci}
There is also no equivalent to select
for BlockingQueue
s. We have to implement that logic in a hand rolled loop.
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
public final class SelectQueues {
private SelectQueues() {}
static void fibonacci(BlockingQueue<Integer> queue,
BlockingQueue<Integer> quit) {
int x = 0;
int y = 1;
while (true) {
if (queue.offer(x)) {
int temp = x;
= y;
x = temp + x;
y }
if (quit.poll() != null) {
System.out.println("quit");
break;
}
}
}
public static void main(String[] args) {
= new ArrayBlockingQueue<Integer>(1);
var queue = new ArrayBlockingQueue<Integer>(1);
var quit
try (var executor =
Executors.newVirtualThreadPerTaskExecutor()) {
.submit(() -> {
executorfor (int i = 0; i < 10; i++) {
System.out.println(queue.take());
}
.put(0);
quitreturn null;
});
fibonacci(queue, quit);
}
}
}
.main(new String[]{}); SelectQueues
I'm unsure for what purpose the Go version uses a channel of integers as its quit mechanism. In Java it is more natural to use something like a shared AtomicBoolean
as a signal for shutdowwn.
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
public final class SelectQueues {
private SelectQueues() {}
static void fibonacci(BlockingQueue<Integer> queue,
AtomicBoolean quit) {
int x = 0;
int y = 1;
while (!quit.get()) {
if (queue.offer(x)) {
int temp = x;
= y;
x = temp + x;
y }
}
System.out.println("quit");
}
public static void main(String[] args) throws InterruptedException {
= new ArrayBlockingQueue<Integer>(1);
var queue = new AtomicBoolean(false);
var quit
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
.submit(() -> {
executorfor (int i = 0; i < 10; i++) {
System.out.println(queue.take());
}
.set(true);
quitreturn null;
});
fibonacci(queue, quit);
}
}
}
If it were a situation with multiple "one shot" queues then CompletableFuture#anyOf
and similar methods might suffice.
https://go.dev/tour/concurrency/6
package main
import (
"fmt"
"time"
)
func main() {
:= time.Tick(100 * time.Millisecond)
tick := time.After(500 * time.Millisecond)
boom for {
select {
case <-tick:
.Println("tick.")
fmtcase <-boom:
.Println("BOOM!")
fmtreturn
default:
.Println(" .")
fmt.Sleep(50 * time.Millisecond)
time}
}
}
There isn't a novel transformation of this default case syntax, but it is worth noting how Go's time library directly returns its channels as the mechanism for handling delays.
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
public final class GreenThreadDress {
private GreenThreadDress() {}
public static void main(String[] args)
throws InterruptedException {
=
var executor Executors.newVirtualThreadPerTaskExecutor();
try {
= new ArrayBlockingQueue<Instant>(1);
var tick = new ArrayBlockingQueue<Instant>(1);
var boom
.submit(() -> {
executorwhile (true) {
Thread.sleep(Duration.ofMillis(100));
.put(Instant.now());
tick}
});
.submit(() -> {
executorThread.sleep(500);
.put(Instant.now());
boomreturn null;
});
while (true) {
if (tick.poll() != null) {
System.out.println("tick.");
}
else if (boom.poll() != null) {
System.out.println("BOOM!");
break;
}
else {
System.out.println(" .");
Thread.sleep(Duration.ofMillis(50));
}
}
} finally {
.shutdownNow();
executor.close();
executor}
}
}
.main(new String[]{}); GreenThreadDress
Here we rely on the behavior of ExecutorService#shutdownNow
to interrupt the task pushing to the tick
queue. Unlike with the built in Go time.Tick
where the underlying goroutine is never cancelled and is a "leak."
https://go.dev/tour/concurrency/7 https://go.dev/tour/concurrency/8
package main
import "golang.org/x/tour/tree"
// Walk walks the tree t sending all values
// from the tree to the channel ch.
func Walk(t *tree.Tree, ch chan int)
// Same determines whether the trees
// t1 and t2 contain the same values.
func Same(t1, t2 *tree.Tree) bool
func main() {
}
This one is a little bit different since its not a straight example, but instead a challenge you are meant to complete.
A full solution can be found on this StackOverflow question.
package main
import "fmt"
import "golang.org/x/tour/tree"
// Walk walks the tree t sending all values
// from the tree to the channel ch.
func Walk(t *tree.Tree, ch chan int) {
var walker func(t *tree.Tree)
= func (t *tree.Tree) {
walker if (t == nil) {
return
}
(t.Left)
walker<- t.Value
ch (t.Right)
walker}
(t)
walkerclose(ch)
}
// Same determines whether the trees
// t1 and t2 contain the same values.
func Same(t1, t2 *tree.Tree) bool {
, ch2 := make(chan int), make(chan int)
ch1
go Walk(t1, ch1)
go Walk(t2, ch2)
for {
,ok1 := <- ch1
v1,ok2 := <- ch2
v2
if v1 != v2 || ok1 != ok2 {
return false
}
if !ok1 {
break
}
}
return true
}
func main() {
.Println("1 and 1 same: ", Same(tree.New(1), tree.New(1)))
fmt.Println("1 and 2 same: ", Same(tree.New(1), tree.New(2)))
fmt
}
Where the Tree
type is defined seperately here.
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package tree // import "golang.org/x/tour/tree"
import (
"fmt"
"math/rand"
)
// A Tree is a binary tree with integer values.
type Tree struct {
*Tree
Left int
Value *Tree
Right }
// New returns a new, random binary tree holding the values k, 2k, ..., 10k.
func New(k int) *Tree {
var t *Tree
for _, v := range rand.Perm(10) {
= insert(t, (1+v)*k)
t }
return t
}
func insert(t *Tree, v int) *Tree {
if t == nil {
return &Tree{nil, v, nil}
}
if v < t.Value {
.Left = insert(t.Left, v)
t} else {
.Right = insert(t.Right, v)
t}
return t
}
func (t *Tree) String() string {
if t == nil {
return "()"
}
:= ""
s if t.Left != nil {
+= t.Left.String() + " "
s }
+= fmt.Sprint(t.Value)
s if t.Right != nil {
+= " " + t.Right.String()
s }
return "(" + s + ")"
}
So before touching the concurrency bits we need to translate this Tree
type.
import java.util.Collections;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public sealed interface Tree {
insert(int v);
Tree
NotEmpty(
record ,
Tree leftint value,
Tree right) implements Tree {
@Override
public Tree insert(int v) {
if (v < this.value) {
return new NotEmpty(
this.left.insert(v),
this.value,
this.right
);
}
else {
return new NotEmpty(
this.left,
this.value,
this.right.insert(v)
);
}
}
@Override
public String toString() {
return "( " +
this.left +
this.value +
this.right +
" )";
}
}
Empty() implements Tree {
record @Override
public Tree insert(int v) {
return new NotEmpty(new Empty(), v, new Empty());
}
@Override
public String toString() {
return "";
}
}
static Tree random(int k) {
= IntStream.range(0, 10)
var vs .boxed()
.collect(Collectors.toList());
Collections.shuffle(vs);
= new Empty();
Tree t for (int v : vs) {
= t.insert((1 + v) * k);
t }
return t;
}
}
A 1-1 translation of the Go wouldn't be fun Java, so I opted to translate it instead to an immutable sum type. This won't affect the concurrent part other than a stronger conceptual guarentee that we can safely share the tree across multiple threads.
With this version the Go maps pretty straight forwardly to this.
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
interface TakeResult<T> {
sealed <T>(T value) implements TakeResult<T> {}
record GotValue<T>() implements TakeResult<T> {}
record NoValue}
public final class TreeWalker {
private TreeWalker() {}
private static void walkHelper(
,
Tree treeBlockingQueue<TakeResult<Integer>> queue
) throws InterruptedException {
if (tree == null) {
return;
}
walkHelper(tree.left, queue);
.put(new TakeResult.GotValue<>(
queue.value
tree));
walkHelper(tree.right, queue);
}
static void walk(
,
Tree treeBlockingQueue<TakeResult<Integer>> queue
) throws InterruptedException {
walkHelper(tree, queue);
.put(new TakeResult.NoValue<>());
queue}
static boolean same(Tree t1, Tree t2)
throws InterruptedException {
= new ArrayBlockingQueue<TakeResult<Integer>>(1);
var queue1 = new ArrayBlockingQueue<TakeResult<Integer>>(1);
var queue2
=
var executor Executors.newVirtualThreadPerTaskExecutor();
try {
.submit(() -> {
executorwalk(t1, queue1);
return null;
});
.submit(() -> {
executorwalk(t2, queue2);
return null;
});
while (true) {
= queue1.take();
var result1 = queue2.take();
var result2 if (!result1.equals(result2)) {
return false;
}
if (result1 instanceof TakeResult.NoValue<Integer>) {
break;
}
}
return true;
} finally {
.shutdownNow();
executor.close();
executor}
}
public static void main(String[] args)
throws InterruptedException {
System.out.println(
"1 and 1 same: " +
same(Tree.random(1), Tree.random(1))
);
System.out.println(
"1 and 2 same: " +
same(Tree.random(1), Tree.random(2))
);
}
}
.main(new String[]{}); TreeWalker
We use the same tricks as before to emulate a closable queue with TakeResult
. Then we translate the select
statement to a loop calling offer
and poll
.
The example Go solution had a recursive local closure for walk. While technically possible via some wizardry, its more straight forward in Java to make a helper method.
There is also a reliance on the walk tasks responding correctly to shutdownNow
. If they did not, executor.close()
would hang and the scope wouldn't exit.
https://go.dev/tour/concurrency/9
package main
import (
"fmt"
"sync"
"time"
)
// SafeCounter is safe to use concurrently.
type SafeCounter struct {
.Mutex
mu syncmap[string]int
v }
// Inc increments the counter for the given key.
func (c *SafeCounter) Inc(key string) {
.mu.Lock()
c// Lock so only one goroutine at a time can access the map c.v.
.v[key]++
c.mu.Unlock()
c}
// Value returns the current value of the counter for the given key.
func (c *SafeCounter) Value(key string) int {
.mu.Lock()
c// Lock so only one goroutine at a time can access the map c.v.
defer c.mu.Unlock()
return c.v[key]
}
func main() {
:= SafeCounter{v: make(map[string]int)}
c for i := 0; i < 1000; i++ {
go c.Inc("somekey")
}
.Sleep(time.Second)
time.Println(c.Value("somekey"))
fmt}
Java has a direct analogue to sync.Mutex
in ReentrantLock
. We can make this same program without much issue.
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
final class SafeCounter {
private final Map<String, Integer> v;
private final ReentrantLock lock;
public SafeCounter() {
this.v = new HashMap<>();
this.lock = new ReentrantLock();
}
void inc(String key) {
.lock();
locktry {
.put(key, v.getOrDefault(key, 0) + 1);
v} finally {
.unlock();
lock}
}
int value(String key) {
.lock();
locktry {
return v.getOrDefault(key, 0);
} finally {
.unlock();
lock}
}
}
public final class Mutex {
private Mutex() {}
public static void main(String[] args)
throws InterruptedException {
= new SafeCounter();
var c for (int i = 0; i < 1000; i++) {
Thread.startVirtualThread(
() -> c.inc("somekey")
);
}
Thread.sleep(Duration.ofSeconds(1));
System.out.println(c.value("somekey"));
}
}
.main(new String[]{}); Mutex
The only thing of note is that unlike Go where you can defer
some arbitrary action like releasing your hold on a lock, in Java the general mechanism for "cleanup that must happen" is using the finally
clause of a try
block.
For Java ReentrantLock
is special in that its locks can "escape" a lexical scope. You can lock before entering a method and unlock in a totally unrelated one.
If you don't need this ability then you can use the fact that every "identity" having object in Java can be used as a lock with synchronized
.
final class SafeCounter {
private final Map<String, Integer> v;
public SafeCounter() {
this.v = new HashMap<>();
}
void inc(String key) {
synchronized (this) {
.put(key, v.getOrDefault(key, 0) + 1);
v}
}
int value(String key) {
synchronized (this) {
return v.getOrDefault(key, 0);
}
}
}
If the thing being synchronized on is just this
and that synchronization lasts the entire scope of the method, we can just mark the method as synchronized
for the same effect.
final class SafeCounter {
private final Map<String, Integer> v;
public SafeCounter() {
this.v = new HashMap<>();
}
synchronized void inc(String key) {
.put(key, v.getOrDefault(key, 0) + 1);
v}
synchronized int value(String key) {
return v.getOrDefault(key, 0);
}
}
https://go.dev/tour/concurrency/10
package main
import (
"fmt"
)
type Fetcher interface {
// Fetch returns the body of URL and
// a slice of URLs found on that page.
(url string) (body string, urls []string, err error)
Fetch}
// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher) {
// TODO: Fetch URLs in parallel.
// TODO: Don't fetch the same URL twice.
// This implementation doesn't do either:
if depth <= 0 {
return
}
, urls, err := fetcher.Fetch(url)
bodyif err != nil {
.Println(err)
fmtreturn
}
.Printf("found: %s %q\n", url, body)
fmtfor _, u := range urls {
(u, depth-1, fetcher)
Crawl}
return
}
func main() {
("https://golang.org/", 4, fetcher)
Crawl}
// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResult
type fakeResult struct {
string
body []string
urls }
func (f fakeFetcher) Fetch(url string) (string, []string, error) {
if res, ok := f[url]; ok {
return res.body, res.urls, nil
}
return "", nil, fmt.Errorf("not found: %s", url)
}
// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{
"https://golang.org/": &fakeResult{
"The Go Programming Language",
[]string{
"https://golang.org/pkg/",
"https://golang.org/cmd/",
},
},
"https://golang.org/pkg/": &fakeResult{
"Packages",
[]string{
"https://golang.org/",
"https://golang.org/cmd/",
"https://golang.org/pkg/fmt/",
"https://golang.org/pkg/os/",
},
},
"https://golang.org/pkg/fmt/": &fakeResult{
"Package fmt",
[]string{
"https://golang.org/",
"https://golang.org/pkg/",
},
},
"https://golang.org/pkg/os/": &fakeResult{
"Package os",
[]string{
"https://golang.org/",
"https://golang.org/pkg/",
},
},
}
Another challenge problem. Before going to a reference solution, I am going to translate the synchronous example.
Go has a pattern of returning an error as a conditionally filled in extra return value. This isn't super idiomatic in Java, so instead I am going to model the error case as an Exception
.
The Go version also returns both a body and an array of urls from a single call. For Java we can accomplish this by making an aggregate containing both values.
final class FetcherException extends Exception {
FetcherException(String message) {
super(message);
}
}
interface Fetcher {
Result(String body, String[] urls) {
record }
Result fetch(String url) throws FetcherException;
}
We can make a "fake" implementation of fetcher using the same technique as Go by backing it with an in-memory map.
import java.util.Map;
final class FakeFetcher implements Fetcher {
private final Map<String, Result> results;
public FakeFetcher(Map<String, Result> results) {
this.results = results;
}
@Override
public Result fetch(String url) throws FetcherException {
= this.results.get(url);
var result if (result == null) {
throw new FetcherException("Not Found: " + url);
} else {
return result;
}
}
public static Fetcher example() {
return new FakeFetcher(Map.of(
"https://golang.org/", new Fetcher.Result(
"The Go Programming Language",
new String[]{
"https://golang.org/pkg/",
"https://golang.org/cmd/"
}
),
"https://golang.org/pkg/", new Fetcher.Result(
"Packages",
new String[]{
"https://golang.org/",
"https://golang.org/cmd/",
"https://golang.org/pkg/fmt/",
"https://golang.org/pkg/os/",
}
),
"https://golang.org/pkg/fmt/", new Fetcher.Result(
"Package fmt",
new String[]{
"https://golang.org/",
"https://golang.org/pkg/",
}
),
"https://golang.org/pkg/os/", new Fetcher.Result(
"Package os",
new String[]{
"https://golang.org/",
"https://golang.org/pkg/",
}
)
));
}
}
Then the synchronous fetcher is just a regular recursive function as it was in Go.
public final class WebCrawler {
private WebCrawler() {
}
static void crawl(
String url,
int depth,
Fetcher fetcher) {
if (depth <= 0) {
return;
}
.Result result;
Fetchertry {
= fetcher.fetch(url);
result } catch (FetcherException e) {
System.out.println(e.getMessage());
return;
}
= result.body();
var body = result.urls();
var urls
System.out.printf(
"Found: %s %s\n",
,
bodyArrays.toString(urls)
);
for (var u : urls) {
crawl(u, depth - 1, fetcher);
}
}
public static void main(String[] args) {
= FakeFetcher.example();
var fetcher
crawl("https://golang.org/", 4, fetcher);
}
}
.main(new String[]{}); WebCrawler
Now I am going to pull an answer to the exercise from this StackOverflow post.
// SafeUrlMap is safe to use concurrently.
type SafeUrlMap struct {
map[string]string
v .Mutex
mux sync}
func (c *SafeUrlMap) Set(key string, body string) {
.mux.Lock()
c// Lock so only one goroutine at a time can access the map c.v.
.v[key] = body
c.mux.Unlock()
c}
// Value returns mapped value for the given key.
func (c *SafeUrlMap) Value(key string) (string, bool) {
.mux.Lock()
c// Lock so only one goroutine at a time can access the map c.v.
defer c.mux.Unlock()
, ok := c.v[key]
valreturn val, ok
}
// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher, urlMap SafeUrlMap) {
defer wg.Done()
.Set(url, body)
urlMap
if depth <= 0 {
return
}
, urls, err := fetcher.Fetch(url)
bodyif err != nil {
.Println(err)
fmtreturn
}
for _, u := range urls {
if _, ok := urlMap.Value(u); !ok {
.Add(1)
wggo Crawl(u, depth-1, fetcher, urlMap)
}
}
return
}
var wg sync.WaitGroup
func main() {
:= SafeUrlMap{v: make(map[string]string)}
urlMap
.Add(1)
wggo Crawl("http://golang.org/", 4, fetcher, urlMap)
.Wait()
wg
for url := range urlMap.v {
, _ := urlMap.Value(url)
body.Printf("found: %s %q\n", url, body)
fmt}
}
This solution makes use of a sync.WaitGroup
. There is no direct analogue in Java, but we can pretty easily make something that has similar semantics.
I am taking the implementation from this StackOverflow question.
final class WaitGroup {
private int jobs = 0;
public synchronized void add(int i) {
+= i;
jobs }
public synchronized void done() {
if (--jobs == 0) {
notifyAll();
}
}
public synchronized void await()
throws InterruptedException {
while (jobs > 0) {
wait();
}
}
}
The SafeUrlMap
type is also fairly trivial to assemble, but instead of that I am going to pass around a normal HashSet
and manually synchronize on it.
There are many other options, including using Collections#synchronizedSet
or wrapping a ConcurrentHashMap
, but I think this will be the easiest to follow.
public final class WebCrawler {
private WebCrawler() {
}
static void crawlTask(
String url,
int depth,
,
Fetcher fetcherExecutorService executor,
Set<String> seen,
WaitGroup waitGroup) {
try {
if (depth <= 0) {
return;
}
.Result result;
Fetchertry {
= fetcher.fetch(url);
result } catch (FetcherException e) {
System.out.println(e.getMessage());
return;
}
= result.body();
var body = result.urls();
var urls
System.out.printf(
"Found: %s %s\n",
,
bodyArrays.toString(urls)
);
for (var u : urls) {
synchronized (seen) {
if (!seen.contains(u)) {
.add(u);
seen.add(1);
waitGroup.submit(() -> crawlTask(
executor,
u- 1,
depth ,
fetcher,
executor,
seen
waitGroup));
}
}
}
} finally {
.done();
waitGroup}
}
static void crawl(String url, int depth, Fetcher fetcher)
throws InterruptedException {
try (var executor =
Executors.newVirtualThreadPerTaskExecutor()) {
= new WaitGroup();
var waitGroup .add(1);
waitGroup
.submit(() -> crawlTask(
executor,
url,
depth,
fetcher,
executornew HashSet<>(),
waitGroup));
.await();
waitGroup}
}
public static void main(String[] args)
throws InterruptedException {
= FakeFetcher.example();
var fetcher crawl("https://golang.org/", 4, fetcher);
}
}
.main(new String[]{}); WebCrawler
We could stop there, but there is one api that is in an incubator module in the early access builds that can replace our home rolled WaitGroup
.
This is why I had --add-modules=jdk.incubator.concurrent
in the jshell command up top.
A StructuredTaskScope.ShutdownOnFailure
lets us submit an arbitrary number of tasks to the scope recursively and will only close after either all those tasks are complete. There is another implementation StructuredTaskScope.ShutdownOnSuccess
that will finish after a single task succeeds.
This obviates the need to manually count up and down with a WaitGroup
.
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import jdk.incubator.concurrent.StructuredTaskScope;
public final class WebCrawler {
private WebCrawler() {
}
static void crawlTask(
String url,
int depth,
,
Fetcher fetcher.ShutdownOnFailure
StructuredTaskScope,
structuredTaskScopeSet<String> seen
) {
if (depth <= 0) {
return;
}
.Result result;
Fetchertry {
= fetcher.fetch(url);
result } catch (FetcherException e) {
System.out.println(e.getMessage());
return;
}
= result.body();
var body = result.urls();
var urls
System.out.printf(
"Found: %s %s\n",
,
bodyArrays.toString(urls)
);
for (var u : urls) {
synchronized (seen) {
if (!seen.contains(u)) {
.add(u);
seen.fork(() -> {
structuredTaskScopecrawlTask(
,
u- 1,
depth ,
fetcher,
structuredTaskScope
seen);
return null;
});
}
}
}
}
static void crawl(String url, int depth, Fetcher fetcher)
throws InterruptedException {
try (var structuredTaskScope =
new StructuredTaskScope.ShutdownOnFailure()) {
.fork(() -> {
structuredTaskScopecrawlTask(
,
url,
depth,
fetcher,
structuredTaskScopenew HashSet<>()
);
return null;
});
.join();
structuredTaskScope}
}
public static void main(String[] args)
throws InterruptedException {
= FakeFetcher.example();
var fetcher crawl("https://golang.org/", 4, fetcher);
}
}
.main(new String[]{}); WebCrawler
This combines the role of the ExecutorService
and the WorkGroup
into one object which at the least makes for one less point of coordination and slightly cleaner code.
The exact shape the StructuredTaskScope
api will take is very much in flux so if you are reading this a year or so in the future this snippet might not work.
Hopefully this was informative. If not thats fine too.
Leave questions, comments, and suggestions in the comments.