Structured concurrency
Programmer-friendly structured concurrency scopes, building upon the lower-level API available as a preview in Java 21, JEP 453.
Requires the current LTS release of Java - JDK 21 (won’t work with newer versions).
Javadocs: https://javadoc.io.
Dependency
Maven:
<dependency>
<groupId>com.softwaremill.jox</groupId>
<artifactId>structured</artifactId>
<version>0.4.0</version>
</dependency>
Gradle:
implementation 'com.softwaremill.jox:structured:0.4.0'
Creating scopes and forking computations
import java.util.concurrent.ExecutionException;
import static com.softwaremill.jox.structured.Scopes.supervised;
public class Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
var result = supervised(scope -> {
var f1 = scope.fork(() -> {
Thread.sleep(500);
return 5;
});
var f2 = scope.fork(() -> {
Thread.sleep(1000);
return 6;
});
return f1.join() + f2.join();
});
System.out.println("result = " + result);
}
}
the
supervised
scope will only complete once any forks started within complete as wellin other words, it’s guaranteed that no forks will remain running, after a
supervised
block completesfork
starts a concurrently running computation, which can be joined in a blocking way. These computatioins are backed by virtual threads
Error handling in scopes
import java.util.concurrent.ExecutionException;
import static com.softwaremill.jox.structured.Scopes.supervised;
public class Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
var result = supervised(scope -> {
var f1 = scope.fork(() -> {
Thread.sleep(1000);
return 6;
});
var f2 = scope.<Integer>fork(() -> {
Thread.sleep(500);
throw new RuntimeException("I can’t count to 5!");
});
return f1.join() + f2.join();
});
System.out.println("result = " + result);
}
}
an exception thrown from the scope’s body, or from any of the forks, causes the scope to end
any forks that are still running are then interrupted
once all forks complete, an
JoxScopeExecutionException
is thrown by thesupervised
methodthe cause of the
JoxScopeExecutionException
is the original exceptionany other exceptions (e.g.
InterruptedExceptions
) that have been thrown while ending the scope, are added as suppressed
Jox implements the “let it crash” model. When an error occurs, the entire scope ends, propagating the exception higher, so that it can be properly handled. Moreover, no detail is lost: all exceptions are preserved, either as causes, or suppressed exceptions.
As JoxScopeExecutionException
is unchecked, we introduced utility method called
JoxScopeExecutionException#unwrapAndThrow
. If the wrapped exception is an instance of any of the passed classes, this
method unwraps original exception and throws it as checked exception; then the throws
signature forces exception
handling. If the wrapped exception is not instance of any of the passed classes, nothing happens. All suppressed
exceptions from JoxScopeExecutionException
are added as suppressed to the unwrapped one.
Note throws
signature points to the closest super class of passed arguments.
Method does not rethrow JoxScopeExecutionException
by default.
So it is advised to manually rethrow it after calling unwrapAndThrow
method, e.g.:
import com.softwaremill.jox.structured.JoxScopeExecutionException;
import com.softwaremill.jox.structured.Scopes;
public class Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
/* ... */
try {
Scopes.supervised(scope -> {
throw new TestException("x");
});
} catch (JoxScopeExecutionException e) {
e.unwrapAndThrow(OtherException.class, TestException.class, YetAnotherException.class);
throw e;
}
/* ... */
}
}
Other types of scopes & forks
There are 4 types of forks:
fork
: daemon fork, supervised; when the scope’s body ends, such forks are interruptedforkUser
: user fork, supervised; when the scope’s body ends, the scope’s method waits until such a fork completes normallyforkUnsupervised
: daemon fork, unsupervised; any thrown exceptions don’t cause the scope to end, but instead can be discovered when the fork is.join
edforkCancellable
: daemon fork, unsupervised, which can be manually cancelled (interrupted)
There are also 2 types of scopes:
supervised
: the default scope, which ends when all forks user forks complete successfully, or when there’s any exception in supervised scopesunsupervised
: a scope where only unsupervised forks can be started
Running computations in parallel
import java.util.List;
import java.util.concurrent.ExecutionException;
import static com.softwaremill.jox.structured.Par.par;
public class Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
var result = par(List.of(() -> {
Thread.sleep(500);
return 5;
}, () -> {
Thread.sleep(1000);
return 6;
}));
System.out.println("result = " + result);
}
}
// result = [5, 6]
Uses supervised
scopes underneath.
Racing computations
import java.util.concurrent.ExecutionException;
import static com.softwaremill.jox.structured.Race.race;
public class Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
var result = race(() -> {
Thread.sleep(1000);
return 10;
}, () -> {
Thread.sleep(500);
return 5;
});
// result will be 5, the other computation will be interrupted on the Thread.sleep
System.out.println("result = " + result);
}
}
// result = 5
Timing out a computation
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import static com.softwaremill.jox.structured.Race.timeout;
public class Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
var result = timeout(1000, () -> {
Thread.sleep(500);
return 5;
});
System.out.println("result = " + result);
}
}
// result = 5