Asynchronous Processing in Java with Promises

The Reactive Toolbox Core library implements Promise-based asynchronous processing model somewhat similar (but not identical) to one implemented by ECMAScript 2015.

Probably main difference is that Promise implemented by the library has only two states (pending and resolved) as opposed to three states in ECMAScript (pending, resolved and rejected). This difference is caused by the need for ECMASCript to implement error handling (hence third rejected state), while Reactive Toolbox Core uses Result<T>-based error handling model and does not need separate state for error.

The need to support only two states significantly simplifies implementation. The Promise implemented in library is much simpler than, for example, CompletableFuture and much more lightweight. This approach also enables convenient handling of complex interaction scenarios with fluent and transparent syntax.

Let’s look into this implementation little bit deeper.

Promises can be created in both, pending and resolved states:

final var pendigPromise = Promise.<Integer>promise();
final var resolvedPromise = Promise.<Integer>fulfilled(123);

Enter fullscreen mode Exit fullscreen mode

Only first resolution is accepted and used as Promise instance result. All subsequent attempts to resolve promise will be ignored.

User can attach actions to promise. Those actions will be executed once promise will be resolved:

final var holder = new AtomicInteger(-1);
final var promise = Promise.<Integer>promise()
                              .onSuccess(holder::set);

Enter fullscreen mode Exit fullscreen mode

If then we call promise.ok(1) instance will be resolved and attached action executed. Attached actions are executed only once. If instance is already resolved by the moment when new action is attached, then action is executed immediately, in the context of calling thread. If instance is not yet resolved, then attached actions will be executed in the context of thread where promise is resolved.

The Promise has built-in scheduler which is tuned for large number of small and short tasks, which don’t block. This scheduler can be used to run some action on promise asynchronously, perform resolution of promise asynchronously or schedule action on promise once specified timeout is expired. Timeouts are implemented with very small overhead and without any additional threads.

More examples:

// create instance and immediately invoke code which uses this instance
final var promise = Promise.<JWT>promise(p -> p.resolve(service.createNewToken(user)))

// Set promise processing timeout
final var promise = Promise.<User>promise()
                               .when(Timeout.timeout(30).seconds(), Errors.TIMEOUT::asFailure);

// Resolve promise asynchronously, so all attached actions will be run in other thread
promise.asyncResolve(Result.success("ok"));

Enter fullscreen mode Exit fullscreen mode

Sometimes it is necessary to make a transformation of received result before passing it further. For this purpose Promise has method map():

final Promise<String> promise = service.promiseRandomInteger()
                                       .map(Objects::toString);

Enter fullscreen mode Exit fullscreen mode

Now it worth to look at more complex scenarios.

Promise.any() and Promise.anySuccess()

Sometimes it is necessary to receive at least one response from different services. If any response is fine, then static Promise.any() can be used:

final Promise<Integer> promise = Promise.any(service1.promiseInteger(),
                                             service2.promiseInteger());

Enter fullscreen mode Exit fullscreen mode

First resolved promise will be returned as result, regardless if it was success or failure. But in most cases only successful result counts, then following method will help:

final Promise<Integer> promise = 
         Promise.anySuccess(service1.promiseInteger(),
                            service2.promiseInteger());

Enter fullscreen mode Exit fullscreen mode

In this case promise will be resolved as failure only if all promises will be resolved to failure.

Another important property of this method is that it will cancel (i.e. resolve with special type of failure) all remaining pending promises once success is obtained.

Promise.all()

Quite often it is necessary to obtain several results before doing further processing. The set of static Promise.all() method serve exactly this purpose – resulting promise is resolved to success only when all passed promises are resolved to success. Successful result in this case returned as Tuple:

   final Promise<Tuple3<Integer, String, UUID>> promise = 
            Promise.all(integerService.promiseInteger(),
                        stringService.promiseString(),
                        uuidService.promiseUuid());

Enter fullscreen mode Exit fullscreen mode

Sometimes it is necessary to obtain some intermediate values and then pass them to yet another service, which will calculate result. In other words, this is the case of dependent calls which return promises. For this purpose library provided dedicated method:

Promise.chainMap()

Example below illustrates use of this method:

    private ArticleService articleService;
    private TopicService topicService;
    private UserService userService;

    public Promise<List<Article>> userFeedHandler(final User.Id userId) {
        return all(topicService.topicsByUser(userId, Order.ANY),
                   userService.followers(userId))
                .chainMap(tuple -> tuple.map((topics, users) -> articleService.userFeed(map(topics, Topic::id), map(users, User::id))))
                .when(timeout(30).seconds(), failure(Errors.TIMEOUT));
    }

Enter fullscreen mode Exit fullscreen mode

Let’s look deeper into this example. First of all, Promise.all() is called to obtain results from topicService and userService. When these services resolve their promises with success, then articleService is invoked with transformed values received from first two calls. Finally, timeout is configured for returned promise, so even if there will be any issues, final promise will be resolved with timeout error.

All this complex processing with all relevant error handling is just one line of code.

Summary

The Promise-based asynchronous processing being combined with other functional programming techniques is a very powerful and expressive tool. It allows convenient expression of complex processing while keeping code clean and easy to read and write. Overall application of FP-style to Java enables creating modern applications which are convenient to create, extend, test and maintain.

Afterword

The Reactive Toolbox Core is still work in progress. It requires a lot of effort and I realize that the whole project is way bigger than one person can accomplish in reasonable time. So, those who want to participate are welcome to join project. Comments, ideas, suggestions are welcome as well.

原文链接:Asynchronous Processing in Java with Promises

© 版权声明
THE END
喜欢就支持一下吧
点赞13 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容