Tascalate Concurrent library provides an implementation of the CompletionStage interface and related classes these are designed to support long-running blocking tasks (typically, I/O bound). This functionality augments the sole Java 8 built-in implementation, CompletableFuture, that is primarily supports computational tasks. Also, the library helps with numerous asynchronous programing challenges like handling timeouts, retry/poll functionality, orchestrating results of multiple concurrent computations and similar.
The library is shipped as a multi-release JAR and may be used both with Java 8 as a classpath library or with Java 9+ as a module.
Why a CompletableFuture is not enough?
There are several shortcomings associated with CompletableFuture implementation that complicate its usage for real-life asynchronous programming, especially when you have to work with I/O-bound interruptible tasks:
-
CompletableFuture.cancel()
method does not interrupt underlying thread; it merely puts future to exceptionally completed state. So even if you use any blocking calls inside functions passed tothenApplyAsync
/thenAcceptAsync
/ etc – these functions will run till the end and never will be interrupted. Please see CompletableFuture can’t be interrupted by Tomasz Nurkiewicz. - By default, all
*Async
composition methods ofCompletableFutrure
useForkJoinPool.commonPool()
(see here) unless explicit Executor is specified. This thread pool is shared between all CompletableFuture-s and all parallel streams across all applications deployed on the same JVM. This hard-coded, unconfigurable thread pool is completely outside of application developers’ control, hard to monitor and scale. Therefore, in robust real-life applications you should always specify your ownExecutor
. With API enhancements in Java 9+, you can fix this drawback, but it will require some custom coding. - Additionally, built-in Java 8 concurrency classes provides pretty inconvenient API to combine several CompletionStage-s.
CompletableFuture.allOf
/CompletableFuture.anyOf
methods accept only CompletableFuture as arguments; you have no mechanism to combine arbitrary CompletionStage-s without converting them to CompletableFuture first. Also, the return type of the aforementionedCompletableFuture.allOf
is declared asCompletableFuture<Void>
– hence you are unable to extract conveniently individual results of the each future supplied.CompletableFuture.anyOf
is even worse in this regard; for more details please read on here: CompletableFuture in Action (see Shortcomings) by Tomasz Nurkiewicz. - Support for timeouts/delays was introduced to CompletableFuture only in Java 9, so still widely supported applications running on Java 8 are left out without this important functionality. Plus, some design decisions like using delayed executors instead of ‘delay’ operator, are somewhat questionable. There are numerous free open-source libraries that address some of the aforementioned shortcomings. However, none of them provides implementation of interruptible
CompletionStage
and no one solves all of the issues coherently.
How to use?
To use a library you have to add a single Maven dependency
<dependency>
<groupId>net.tascalate.concurrent</groupId>
<artifactId>net.tascalate.concurrent.lib</artifactId>
<version>0.7.1</version>
</dependency>
Enter fullscreen mode Exit fullscreen mode
What is inside?
1. Promise interface
This is the core interface of the Tascalate Concurrent library. It may be best described by the formula:
Promise == CompletionStage + Future
Enter fullscreen mode Exit fullscreen mode
I.e., it combines both blocking Future’s API, including cancel(boolean mayInterruptIfRunning)
method, AND composition capabilities of CompletionStage’s API. Importantly, all composition methods of CompletionStage API (thenAccept
, thenCombine
, whenComplete
etc.) are re-declared to return Promise
as well.
The decision to introduce an interface that merges CompletionStage
and Future
is aligned with the design of CompletableFuture
API. In addition, several useful methods of CompletableFuture
API are added as well:
T getNow(T valueIfAbsent) throws CancellationException, CompletionException;
T getNow(Supplier<? extends T> valueIfAbsent) throws CancellationException, CompletionException;
T join() throws CancellationException, CompletionException;
Enter fullscreen mode Exit fullscreen mode
So it should be pretty straightforward to use the Promise
as a drop-in replacement for the CompletableFuture
in many cases.
Besides this, there are numerous operators in the Promise
API to work with timeouts and delays, to override default asynchronous executor and similar. All of them will be discussed later.
When discussing Promise
interface, it’s mandatory to mention the accompanying class Promises
that provides several useful methods to adapt third-party CompletionStage
(including the standard CompletableFuture
) to the Promise
API. First, there are two unit operations to create successfully/faulty settled Promise
-es:
static <T> Promise<T> success(T value)
static <T> Promise<T> failure(Throwable exception)
Enter fullscreen mode Exit fullscreen mode
Second, there is an adapter method from
:
static <T> Promise<T> from(CompletionStage<T> stage)
Enter fullscreen mode Exit fullscreen mode
It behaves as the following:
- If the supplied
stage
is already aPromise
then it is returned unchanged - If
stage
is aCompletableFuture
then a specially-tailored wrapper is returned. - If
stage
additionally implementsFuture
then specialized wrapper is returned that delegates all the blocking methods defined inFuture
API - Otherwise generic wrapper is created with good-enough implementation of blocking
Future
API atop of asynchronousCompletionStage
API. To summarize, the returned wrapper delegates as much as possible functionality to the suppliedstage
and never resorts to CompletionStage.toCompletableFuture because in Java 8 API it’s an optional method. From documentation: “A CompletionStage implementation that does not choose to interoperate with others may throw UnsupportedOperationException.” (this text was dropped in Java 9+). In general, Tascalate Concurrent library does not depend on this method and should be interoperable with any minimal (but valid)CompletionStage
implementation.
It’s important to emphasize, that Promise
-s returned from Promises.success
, Promises.failure
and Promises.from
methods are cancellable in the same way as CompletableFuture
, but are not interruptible in general, while interruption depends on a concrete implementation. Next we discuss the concrete implementation of an interruptible Promise
provided by the Tascalate Concurrent library — the CompletableTask
class.
2. CompletableTask
This is why this project was ever started. CompletableTask
is the implementation of the Promise
API for long-running blocking tasks.
Typically, to create a CompletableTask
, you should submit Supplier / Runnable to the Executor right away, in a similar way as with CompletableFuture:
Promise<SomeValue> p1 = CompletableTask.supplyAsync(() -> {
return blockingCalculationOfSomeValue();
}, myExecutor);
Promise<Void> p2 = CompletableTask.runAsync(this::someIoBoundMethod, myExecutor);
Enter fullscreen mode Exit fullscreen mode
blockingCalculationOfSomeValue
and someIoBoundMethod
in the example above can have I/O code, work with blocking queues, do blocking get on regular Java-s Future
-s and alike. If at later time you decide to cancel either of the returned promises then corresponding blockingCalculationOfSomeValue
and someIoBoundMethod
will be interrupted (if not completed yet).
In the realm of I/O-related functionality, failures like connection time-outs, missing or locked files are pretty common, and checked exceptions mechanism is used frequently to signal failures. Therefore the library provides an entry point to the API that accepts Callable instead of Supplier
:
// Notice the checked exception in the method signature
byte[] loadFile(File file) throws IOException {
byte[] result = ... //load file content;
return result;
}
...
ExecutorService executorService = Executors.newFixedThreadPool(6);
Promise<byte[]> contentPromise = CompletableTask.submit(
() -> loadFile(new File("./myfile.dat")),
executorService
);
Enter fullscreen mode Exit fullscreen mode
Additionally, there are 2 unit operations to create a CompletableTask
:
a. CompletableTask.asyncOn(Executor executor)
Returns an already-completed null-valued Promise
that is “bound” to the specified executor. I.e. any function passed to asynchronous composition methods of Promise
(like thenApplyAsync
/ thenAcceptAsync
/ whenCompleteAsync
etc.) will be executed using this executor unless executor is overridden via explicit composition method parameter. Moreover, any nested composition calls will use same executor, if it’s not redefined via explicit composition method parameter:
CompletableTask
.asyncOn(myExecutor)
.thenApplyAsync(myValueGenerator)
.thenAcceptAsync(myConsumer)
.thenRunAsync(myAction);
Enter fullscreen mode Exit fullscreen mode
All of myValueGenerator
, myConsumer
, myAction
will be executed using myExecutor
.
b. CompletableTask.complete(T value, Executor executor)
Same as above, but the starting point is a Promise
completed with the specified value:
CompletableTask
.complete("Hello!", myExecutor)
.thenApplyAsync(myMapper)
.thenApplyAsync(myTransformer)
.thenAcceptAsync(myConsumer)
.thenRunAsync(myAction);
Enter fullscreen mode Exit fullscreen mode
All of myMapper
, myTransformer
, myConsumer
, myAction
will be executed using myExecutor
.
Crucially, all composed promises support true cancellation (incl. interrupting thread) for the functions supplied as arguments:
Promise<?> p1 = CompletableTask.asyncOn(myExecutor)
Promise<?> p2 = p1.thenApplyAsync(myValueGenerator)
Promise<?> p3 = p2.thenRunAsync(myAction);
...
p2.cancel(true);
Enter fullscreen mode Exit fullscreen mode
In the example above myValueGenerator
will be interrupted if already in progress. Both p2
and p3
will be settled with failure: p2
with a CancellationException and p3
with a CompletionException.
You may notice, that above the term “asynchronous composition methods” is used, as well as *Async
calls in examples (like thenApplyAsync
, thenRunAsync
. This is not accidental: non-asynchronous methods of CompletionStage
API are not interruptible. The grounding beneath the design decision is that invoking asynchronous methods involves inevitable overhead of putting command to the queue of the executor, starting new threads implicitly, etc. And for simple, non-blocking methods, like small calculations, trivial transformations and alike this overhead might outweigh method’s execution time itself. So the guideline is: use asynchronous composition methods for heavy I/O-bound blocking tasks, and use non-asynchronous composition methods for (typically lightweight) calculations.
Worth to mention, that CompletableTask
-s and Promise
-s composed out of it may be ever interruptible only if the Executor
used is interruptible by nature. For example, ThreadPoolExecutor supports interruptible tasks, but ForkJoinPool does not!
3. Overriding default asynchronous executor
One of the pitfalls of the CompletableFuture
implementation is how it works with default asynchronous executor. Consider the following example:
CompletionStage<String> p1 = CompletableFuture.supplyAsync(this::produceValue, executorInitial);
CompletionStage<String> p2 = p1.thenApplyAsync(this::transformValueA);
CompletionStage<String> p3 = p2.thenApplyAsync(this::transformValueB, executorNext);
CompletionStage<String> p4 = p3.thenApplyAsync(this::transformValueC);
Enter fullscreen mode Exit fullscreen mode
The call to produceValue
will be executed on the executorInitial
– it is passed explicitly. However, the call to transformValueA
will be excuted on… ForkJoinPool.commonPool()
! Hmmmm… Probably this makes sense, but how to force using alternative executor by default? No way! Probably this is possible with deeper calls? The answer is “NO” again! The invocation to transformValueB
ran on explicitly supplied executorNext
. But next call, transformValueC
will be executed on… you guess it… ForkJoinPool.commonPool()
!
So, once you use CompletableFuture
with JEE environment you must pass explicit instance of ManagedExecutorService to each and every method call. Not very convenient! To be fair, with Java 9+ API you can redefine this behavior via sub-classing CompletableFuture
and overriding two methods: defaultExecutor and newIncompleteFuture. Plus, you will have to define your own “entry points” instead of the standard CompletableFuture.runAsync
and CompletableFuture.supplyAsync
.
With CompletableTask
the situation is just the opposite. Let us rewrite the example above:
CompletionStage<String> p1 = CompletableTask.supplyAsync(this::produceValue, executorInitial);
CompletionStage<String> p2 = p1.thenApplyAsync(this::transformValueA);
CompletionStage<String> p3 = p2.thenApplyAsync(this::transformValueB, executorNext);
CompletionStage<String> p4 = p3.thenApplyAsync(this::transformValueC);
Enter fullscreen mode Exit fullscreen mode
The call to produceValue
will be executed on the executorInitial
, obviously. But now, the call to transformValueA
will be executed also on executorInitial
! What’s about deeper calls? The invocation to transformValueB
ran on explicitly supplied executorNext
. And next call, transformValueC
will be executed on… check your intuition… executorNext
. The logic behinds this is the following: the latest explicitly specified Executor
is what will be used for all nested asynchronous composition methods without an explicit Executor
parameter.
Obviously, it’s rarely the case when one size fits all. therefore two additional options exist to specify default asynchronous executor:
A. CompletableTask
has an overloaded method:
public static Promise<Void> asyncOn(Executor executor, boolean enforceDefaultAsync)
Enter fullscreen mode Exit fullscreen mode
When enforceDefaultAsync
is true
then all nested asynchronous composition methods without explicit Executor
parameter will use the provided executor, even if previous composition methods use alternative Executor
. This is somewhat similar to CompletableFuture
but with the ability to explicitly set the default asynchronous executor initially.
B. Promise
interface has the following operation:
Promise<T> defaultAsyncOn(Executor executor)
Enter fullscreen mode Exit fullscreen mode
The returned decorator will use the specified executor for all nested asynchronous composition methods without explicit Executor
parameter. So, at any point, you are able to switch to the desired default asynchronous executor and keep using it for all nested composition call.
To summarize, with Tascalate Concurrent you have the following options to control what is the default asynchronous executor:
- The latest explicit
Executor
passed to*Async
method is used for derivedPromise
-s – the default option. - Single default
Executor
passed to the rootCompletableTask.asyncOn(Executor executor, true)
call is propagated through the whole chain. This is the only variant supported withCompletableFuture
in Java 9+, though, with custom coding. - Redefine
Executor
withdefaultAsyncOn(Executor executor)
for all derivedPromise
-s. Having the best of three(!) worlds, the only responsibility of the library user is to use these options consistently! The last thing that should be mentioned is a typical task when you would like to start interruptible blocking method after completion of the standardCompletableFuture
. The following utility method is defined in theCompletableTask
:
public static <T> Promise<T> waitFor(CompletionStage<T> stage, Executor executor)
Enter fullscreen mode Exit fullscreen mode
Roughly, this is a shortcut for the following:
CompletableTask.asyncOn(executor).thenCombine(stage, (u, v) -> v);
Enter fullscreen mode Exit fullscreen mode
A typical usage of this method is:
TaskExecutorService executorService = TaskExecutors.newFixedThreadPool(3);
CompletableFuture<String> replyUrlPromise = sendRequestAsync();
Promise<byte[]> dataPromise = CompletableTask.waitFor(replyUrlPromise, executorService)
.thenApplyAsync(url -> loadDataInterruptibly(url));
Enter fullscreen mode Exit fullscreen mode
The dataPromise
returned may be cancelled later and loadDataInterruptibly
will be interrupted if not completed by that time.
4. Timeouts
Any robust application must handles situations when things go wrong. An ability to cancel an operation that takes too long existed in the library from the day one. But, the very definition of the “too long” was left to an application code initially. However, the practice shows that a lack of the proven, thoroughly tested timeout-related stuff in the library leads to a complex, repeatative and, unfortunately, error-prone code in application. Hence Tascalate Concurrent was extended to address this omission.
The library offers the following operations to control execution time of the Promise
(declared in Promise
interface):
<T> Promise<T> orTimeout(long timeout, TimeUnit unit[, boolean cancelOnTimeout = true])
<T> Promise<T> orTimeout(Duration duration[, boolean cancelOnTimeout = true])
Enter fullscreen mode Exit fullscreen mode
These methods create a new Promise
that is either settled successfully/exceptionally when original promise is completed within a timeout given; or it is settled exceptionally with a TimeoutException when time expired. In any case, handling code is executed on the default asynchronous Executor of the original Promise
.
Executor myExecutor = ...; // Get an executor
Promise<String> callPromise = CompletableTask
.supplyAsync( () -> someLongRunningIoBoundMehtod(), myExecutor )
.orTimeout( Duration.ofSeconds(3) );
Promise<?> nextPromiseSync = callPromise.whenComplete((v, e) -> processResultSync(v, e));
Promise<?> nextPromiseAsync = callPromise.whenCompleteAsync((v,e) -> processResultAsync(v, e));
Enter fullscreen mode Exit fullscreen mode
In the example above callPromise
will be settled within 3 seconds either successfully/exceptionally as a result of the someLongRunningIoBoundMehtod
execution, or exceptionally with a TimeoutException
.
It’s worth to mention, that both processResultSync
and processResultAsync
will be executed with myExecutor
, if timeout is triggered – this rule is true for all timeout-related methods.
The optional cancelOnTimeout
parameter defines whether or not to cancel the original Promise
when time is expired; it is implicitly true when omitted. So in example above the someLongRunningIoBoundMehtod
will be interrupted if it takes more than 3 seconds to complete. Pay attention: any Promise
is cancellable on timeout, even wrappers created via Promises.from(stage)
, but only CompletableTask
is interruptible!
Cancelling original promise on timeout is a desired behavior in most cases but not always. In reality, “Warn-first-Cancel-next” scenarios are not rare, where “warn” may be logging, sending notification emails, showing messages to user on UI etc. The library provides an option to set several non-cancelling timeouts like in the example below:
Executor myExecutor = ...; // Get an executor
Promise<String> resultPromise = CompletableTask
.supplyAsync( () -> someLongRunningIoBoundMehtod(), executor );
// Show UI message to user to let him/her know that everything is under control
Promise<?> t1 = resultPromise
.orTimeout( Duration.ofSeconds(2), false )
.exceptionally( e -> {
if (e instanceof TimeoutException) {
UI.showMessage("The operation takes longer than expected, please wait...");
}
return null;
}, false);
// Show UI confirmation to user to let him/her cancel operation explicitly
Promise<?> t2 = resultPromise
.orTimeout( Duration.ofSeconds(5), false )
.exceptionally( e -> {
if (e instanceof TimeoutException) {
UI.clearMessages();
UI.showConfirmation("Service does not respond. Do you whant to cancel (Y/N)?");
}
return null;
}, false);
// Cancel in 10 seconds
resultPromise.orTimeout( Duration.ofSeconds(10), true );
Enter fullscreen mode Exit fullscreen mode
Please note that the timeout is started from the call to the orTimeout
method. Hence, if you have a chain of unresolved promises ending with the orTimeout
call then the whole chain should be completed within the time given:
Executor myExecutor = ...; // Get an executor
Promise<String> parallelPromise = CompletableTask
.supplyAsync( () -> someLongRunningDbCall(), executor );
Promise<List<String>> resultPromise = CompletableTask
.supplyAsync( () -> someLongRunningIoBoundMehtod(), executor )
.thenApplyAsync( v -> converterMethod(v) )
.thenCombineAsync(parallelPromise, (u, v) -> Arrays.asList(u, v))
.orTimeout( Duration.ofSeconds(5) );
Enter fullscreen mode Exit fullscreen mode
In the latest example resultPromise
will be resolved successfully if and only if all of someLongRunningIoBoundMehtod
, converterMethod
and even someLongRunningDbCall
are completed within 5 seconds. If it’s necessary to restrict execution time of the single step, please use standard CompletionStage.thenCompose
method. Say, that in the previous example we have to restrict execution time of the converterMethod
only. Then the modified chain will look like:
Promise<List<String>> resultPromise = CompletableTask
.supplyAsync( () -> someLongRunningIoBoundMehtod(), executor )
// Restict only execution time of converterMethod
// -- start of changes
.thenCompose( v ->
CompletableTask.complete(v, executor)
.thenApplyAsync(vv -> converterMethod(vv))
.orTimeout( Duration.ofSeconds(5) )
)
// -- end of changes
.thenCombineAsync(parallelPromise, (u, v) -> Arrays.asList(u, v))
;
Enter fullscreen mode Exit fullscreen mode
Moreover, in the original example only the call to the thenCombineAsync
will be cancelled on timeout (the last in the chain), to cancel the whole chain it’s necessary to use a functionality of the DependentPromise
interface (will be discussed in next post).
Another useful timeout-related methods declared in Promise
interface are:
<T> Promise<T> onTimeout(T value, long timeout, TimeUnit unit[, boolean cancelOnTimeout = true])
<T> Promise<T> onTimeout(T value, Duration duration[, boolean cancelOnTimeout = true])
<T> Promise<T> onTimeout(Supplier<? extends T>, long timeout, TimeUnit unit[, boolean cancelOnTimeout = true])
<T> Promise<T> onTimeout(Supplier<? extends T>, Duration duration[, boolean cancelOnTimeout = true])
Enter fullscreen mode Exit fullscreen mode
The onTimeout
family of methods are similar in all regards to the orTimeout
methods with the single obvious difference – instead of completing resulting Promise
exceptionally with the TimeoutException
when time is expired, they are settled successfully with the alternative value
supplied (either directly or via Supplier):
Executor myExecutor = ...; // Get an executor
Promise<String> callPromise = CompletableTask
.supplyAsync( () -> someLongRunningIoBoundMehtod(), executor )
.onTimeout( "Timed-out!", Duration.ofSeconds(3) );
Enter fullscreen mode Exit fullscreen mode
The example shows, that callPromise
will be settled within 3 seconds either successfully/exceptionally as a result of the someLongRunningIoBoundMehtod
execution, or with a default value "Timed-out!"
when time exceeded.
It’s important to mention the crucial difference between Promise.orTimeot / onTimeout
and CompletableFuture.orTimeout / completeOnTimeout
in Java 9+. In Tascalate Concurrent both operations return a new Promise
, that is may be canceled individually, without cancelling the original Promise
. Moreover, the original Promise
will not be completed with TimeoutException
when time expired but rather with the CancellationException
(in the case of orTimeout([duration], true)
or orTimeout([duration])
). The behavior of CompletableFuture
in Java 9+ is radically different: timeout-related operations are just “side-effects”, and the returned value is the original CompletableFuture
itself. So the call to completableFuture.orTimeout(100, TimeUnit.MILLIS).cancel()
will cancel the completableFuture
itself, and there is no way to revert the timeout once it’s set. Correspondingly, when time expired the original completableFuture
will be completed exceptionally with TimeoutException
.
Finally, the Promise
interface provides an option to insert delays into the call chain:
<T> Promise<T> delay(long timeout, TimeUnit unit[, boolean delayOnError = true])
<T> Promise<T> delay(Duration duration[, boolean delayOnError = true])
Enter fullscreen mode Exit fullscreen mode
The delay is started only after the original Promise
is completed either successfully or exceptionally (unlike orTimeout
/ onTimeout
methods where timeout is started immediately). The resulting delay Promise
is resolved after the timeout specified with the same result as the original Promise
. The latest methods’ argument – delayOnError
– specifies whether or not we should delay if original Promise is resolved exceptionally, by default this argument is true
. If false
, then delay Promise
is completed immediately after the failed original Promise
.
Executor myExecutor = ...; // Get an executor
Promise<String> callPromise1 = CompletableTask
.supplyAsync( () -> someLongRunningIoBoundMehtod(), executor )
.delay( Duration.ofSeconds(1) ) // Give a second for CPU to calm down :)
.thenApply(v -> convertValue(v));
Promise<String> callPromise2 = CompletableTask
.supplyAsync( () -> aletrnativeLongRunningIoBoundMehtod(), executor )
.delay( Duration.ofSeconds(1), false ) // Give a second for CPU to calm down ONLY on success :)
.thenApply(v -> convertValue(v));
Enter fullscreen mode Exit fullscreen mode
Like with other timeout-related methods, convertValue
is invoked on the default asynchronous Executor
of the original Promise
.
You may notice, that delay may be introduced only in the middle of the chain, but what to do if you’d like to back-off the whole chain execution? Just start with a resolved promise!
// Option 1
// Interruptible tasks chain on the executor supplied
CompletableTask.asyncOn(executor)
.delay( Duration.ofSeconds(5) )
.thenApplyAsync(ignore -> produceValue());
// Option2
// Computational tasks on ForkJoinPool.commonPool()
Promises.from(CompletableFuture.completedFuture(""))
.delay( Duration.ofSeconds(5) )
.thenApplyAsync(ignore -> produceValue());
Enter fullscreen mode Exit fullscreen mode
As long as back-off execution is not a very rare case, the library provides the following convenient shortcuts in the CompletableTask
class:
static Promise<Duration> delay(long timeout, TimeUnit unit, Executor executor);
static Promise<Duration> delay(Duration duration, Executor executor);
Enter fullscreen mode Exit fullscreen mode
Notice, that in Java 9+ a different approach is chosen to implement delays – there is no corresponding operation defined for the CompletableFuture
object and you should use delayed Executor
. Please read documentation on the CompletableFuture.delayedExecutor method for details.
5. Combining several CompletionStage-s.
The utility class Promises
provides a rich set of methods to combine several CompletionStage
-s, that lefts limited functionality of CompletableFuter.allOf / anyOf
far behind:
- The library works with any
CompletionStage
implementation without resorting to converting arguments toCompletableFuture
first (andCompletionStage.toCompletableFuture
is an optional operation, at least it’s documented so in Java 8). - It’s possible to pass either an array or a
List
ofCompletionStage
-s as arguments. - The resulting
Promise
let access individual results of the settledCompletionStage
-s passed. - There is an option to cancel non-settled
CompletionStage
-s passed once the result of the operation is known. - Optionally you can specify whether or not to tolerate individual failures as long as they don’t affect the final result.
- General M completed successfully out of N passed promises scenario is possible. Let us review the relevant methods, from the simplest ones to the most advance.
static <T> Promise<List<T>> all([boolean cancelRemaining=true,]
CompletionStage<? extends T>... promises)
static <T> Promise<List<T>> all([boolean cancelRemaining=true,]
List<? extends CompletionStage<? extends T>> promises)
Enter fullscreen mode Exit fullscreen mode
Returns a promise that is completed normally when all CompletionStage
-s passed as parameters are completed normally; if any promise completed exceptionally, then resulting promise is completed exceptionally as well.
static <T> Promise<T> any([boolean cancelRemaining=true,]
CompletionStage<? extends T>... promises)
static <T> Promise<T> any([boolean cancelRemaining=true,]
List<? extends CompletionStage<? extends T>> promises)
Enter fullscreen mode Exit fullscreen mode
Returns a promise that is completed normally when any CompletionStage
passed as parameters is completed normally (race is possible); if all promises completed exceptionally, then resulting promise is completed exceptionally as well.
static <T> Promise<T> anyStrict([boolean cancelRemaining=true,] CompletionStage<? extends T>... promises)
static <T> Promise<T> anyStrict([boolean cancelRemaining=true,]
List<? extends CompletionStage<? extends T>> promises)
Enter fullscreen mode Exit fullscreen mode
Returns a promise that is completed normally when any CompletionStage
passed as parameters is completed normally (race is possible); if any promise completed exceptionally before the first result is available, then resulting promise is completed exceptionally as well (unlike non-Strict variant, where exceptions are ignored if any result is available at all).
static <T> Promise<List<T>> atLeast(int minResultsCount, [boolean cancelRemaining=true,]
CompletionStage<? extends T>... promises)
static <T> Promise<List<T>> atLeast(int minResultsCount, [boolean cancelRemaining=true,]
List<? extends CompletionStage<? extends T>> promises)
Enter fullscreen mode Exit fullscreen mode
Generalization of the any
method. Returns a promise that is completed normally when at least minResultCount
of CompletionStage
-s passed as parameters are completed normally (race is possible); if less than minResultCount
of promises completed normally, then resulting promise is completed exceptionally.
static <T> Promise<List<T>> atLeastStrict(int minResultsCount, [boolean cancelRemaining=true,]
CompletionStage<? extends T>... promises)
static <T> Promise<List<T>> atLeastStrict(int minResultsCount, [boolean cancelRemaining=true,]
List<? extends CompletionStage<? extends T>> promises)
Enter fullscreen mode Exit fullscreen mode
Generalization of the anyStrict
method. Returns a promise that is completed normally when at least minResultCount
of CompletionStage
-s passed as parameters are completed normally (race is possible); if any promise completed exceptionally before minResultCount
of results are available, then resulting promise is completed exceptionally as well (unlike non-Strict variant, where exceptions are ignored if minResultsCount
of successful results are available).
All methods above have an optional parameter cancelRemaining
. When omitted, it means implicitly cancelRemaining = true
. The cancelRemaining
parameter defines whether or not to eagerly cancel remaining promises
once the result of the operation is known, i.e. enough promises
passed are settled successfully or some CompletionStage
completed exceptionally in strict version.
Each operation to combine CompletionStage
-s has overloaded versions that accept either a List of CompletionStage
-s or varagr array of CompletionStage
-s.
Besides any
/anyStrict
methods that return single-valued promise, all other combining methods return a list of values per every successfully completed promise, at the same indexed position. If the promise at the given position was not settled at all, or failed (in non-strict version), then corresponding item in the result list is null
. If necessary number or promises
was not completed successfully, or any one completed exceptionally in strict version, then resulting Promise
is settled with a failure of the type MultitargetException
. Application developer may examine MultitargetException.getExceptions()
to check what is the exact failure per concrete CompletionStage
passed.
The Promise
returned has the following characteristics:
- Cancelling resulting
Promise
will cancel all theCompletionStage-s
passed as arguments. - Default asynchronous executor of the resulting
Promise
is undefined, i.e. it could be eitherForkJoin.commonPool
or whateverExecutor
is used by any of theCompletionStage
passed as argument. To ensure that necessary defaultExecutor
is used for subsequent asynchronous operations, please applydefaultAsyncOn(myExecutor)
on the result.
The list of features provided by the Tascalate Concurrent library doesn’t stop here. There is more interesting stuff like Retry / Poll functionality, controlling cancellation of the chain of Promises
, extensions to ExecutorService
etc. But this post is already getting too long, so the reamaing is left for the next time. In the meantime, you can check the home page of the Tascalate Concurrent library for the most up-to-date documentation.
vsilaev / tascalate-concurrent
Implementation of blocking (IO-Bound) cancellable java.util.concurrent.CompletionStage and related extensions to java.util.concurrent.ExecutorService-s
tascalate-concurrent
The library provides an implementation of the CompletionStage interface and related classes these are designed to support long-running blocking tasks (typically, I/O bound). This functionality augments the sole Java 8 built-in implementation, CompletableFuture, that is primarily supports computational tasks. Also, the library helps with numerous asynchronous programing challenges like handling timeouts, retry/poll functionality, orchestrating results of multiple concurrent computations and similar.
Since the version 0.7.0 the library is shipped as a multi-release JAR and may be used both with Java 8 as a classpath library or with Java 9+ as a module.
IMPORTANT!
In the version 0.8.0 the artifact was renamed New name:
<dependency> <groupId>net.tascalate</groupId> <artifactId>net.tascalate.concurrent</artifactId> <version>0.9.8</version> <!-- Any version above 0.8.0, the latest one is recommended --> </dependency>
Enter fullscreen mode Exit fullscreen mode
Old Name
<dependency> <groupId>net.tascalate.concurrent</groupId> <artifactId>net.tascalate.concurrent.lib</
… Enter fullscreen mode Exit fullscreen mode
View on GitHub
Acknowledgements
Internal implementation details of the CompletableTask
class hierarchy are greatly inspired by the work done by Lukáš Křečan. A description of his library is available as a two-part article on DZone: Part 1 and Part II. It’s a worth reading for those, who’d like to have better understanding of the CompletableTask
internals.
原文链接:Tascalate Concurrent – Filling the Gaps in CompletableFuture API (Part 1)
暂无评论内容