Go for Java developers (or is the Java concurrency that bad?!)

I am by no means an expert in Go, indeed quite the opposite. I am currently trying to get familiar with it. I started getting familiar with the syntax, the memory and the concurrency model. As usual for me, I am trying to contrast it with something I already know, like Java.
So I stumbled in this interesting talk in which the great Sajma introduced the Go concurrency model with some examples. The slides for the talk and the examples are here. Not far in the talk, a question popped up: think about what it would take to implement the same thing in other languages like Java.
Is it really that hard? I was not that sure, I mean, Java does not have ‘select’ statement, neither it has built-in channels, but it should not be difficult to replicate the examples in Java or is it?
So I though I could have some fun implementing the examples in Java.

Go concurrency

Before getting to the example, this is a streamlined recap of the talk (by the way, it’s a cool talk, so I really suggest you to watch it).

  1. Go concurrency
    • The concurrency model is based on Communication sequential Processes (Hoare, 1978)
    • Concurrent programs are structured as independent processes that execute sequentially and communicate by passing messages.
    • “Don’t communicate by sharing memory, share memory by communicating”
    • Go primitives: go routines, channels and the select statement
  2. Go routines
    • It’s a lightweight thread (it’s not a thread)
    • Channel provide communication between go routines (analougous to synchronized queue in Java)
    • Select multiplex communication among go routines

The example

In the examples we have to build an hypothetical client which queries google services (web, image and video services). Ideally, we would like to query those services in parallel and aggregate the answers. All the code for the examples is in github.com/napicella/go-for-java-programmers.
So let’s get started.

First example: querying Google search in parallel

This is how the go code looks like:

        func Google(query string) (results []Result) {
            c := make(chan Result)
            go func() { c <- Web(query) }()
            go func() { c <- Image(query) }()
            go func() { c <- Video(query) }()

            for i := 0; i < 3; i++ {
                result := <-c
                results = append(results, result)
            }
            return
        }

Enter fullscreen mode Exit fullscreen mode

What about Java? My solution involves using CompletableFuture like the following.

    public void google(String query) throws ExecutionException, InterruptedException {
        CompletableFuture<String>[] futures = new CompletableFuture[] {
            supplyAsync(() -> web(query)),
            supplyAsync(() -> image(query)),
            supplyAsync(() -> video(query))
        };

        List<String> result = new ArrayList<>();

        allOf(futures)
            .thenAccept((ignore) -> Arrays.stream(futures)
                                          .map(this::safeGet)
                                          .forEach(result::add)).get();
        // Note: calling get is necessary only because I want to print the result 
        // before returning from the function
        System.out.println(result);
    }

    protected String safeGet(Future<String> future) {
        try {
            return future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        return "";
    }

Enter fullscreen mode Exit fullscreen mode

The web, image and video services are just mocks with random sleeps.
So, what’s the difference between the java code and go one? The java code is a bit more verbose and the code does not use message passing between threads like in Go, besides that they look really similar.

Let’s move to the second example.

Second example: timeout

What if we don’t want to wait for slow servers? We can use a timeout!
The idea is to wait until either all the servers replies to our request or the timeout goes off.


func Google(query string) (results []Result) {
    c := make(chan Result, 3)
    go func() { c <- Web(query) }()
    go func() { c <- Image(query) }()
    go func() { c <- Video(query) }()

    timeout := time.After(80 * time.Millisecond)
    for i := 0; i < 3; i++ {
        select {
        case result := <-c:
            results = append(results, result)
        case <-timeout:
            fmt.Println("timed out")
            return
        }
    }
    return

Enter fullscreen mode Exit fullscreen mode

Let’s see how that would look like in java:

    public void googleWithTimeout(String query) throws ExecutionException, InterruptedException {
        // This is the first difference with the go example, the result array must 
        // be a synchronized list.
        // Go channel are completely thread safe, so it's totally okay to funnel 
        // data from multiple go routines to an array.
        List<String> result = Collections.synchronizedList(new ArrayList<>());

        // this is not safe since it's possible that all the threads in the thread 
        // pool (default to ForkJoin) are busy, so the timer won't start
        CompletableFuture<Void> timeout = runAsync(() -> timeout(TIMEOUT_MILLIS));

        anyOf(
            allOf(runAsync(() -> result.add(web(query))),
                  runAsync(() -> result.add(image(query))),
                  runAsync(() -> result.add(video(query)))),
            timeout
        ).get();


        System.out.println(result);
    }

    protected Void timeout(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return null;
    }

Enter fullscreen mode Exit fullscreen mode

In the Java example there is a substantial difference to the go one: the tasks share the result array, so for the java code to work, we need a synchronized array. On the other hand, Go channel are completely thread safe, so it’s totally okay to funnel data from multiple go routines to an array.
As mentioned in the comment the use of timeout is not completely safe indeed it’s possible that all the threads in the thread pool (default to ForkJoin) are busy so the timer won’t start. We can obviously run a Thread with a different ExecutorService or just manually create a Thread and run it.

    protected CompletableFuture<Void> timeout(int millis) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);

        final CompletableFuture<Void> timeout = new CompletableFuture<>();
        executorService.schedule(() -> {
            timeout.complete(null);
        }, millis, TimeUnit.MILLISECONDS);

        return timeout;
    }

Enter fullscreen mode Exit fullscreen mode

Third example : Reduce tail latency using replicated search servers.

In go:

func Google(query string) (results []Result) {
    c := make(chan Result, 3)
    go func() { c <- First(query, Web1, Web2) }()
    go func() { c <- First(query, Image1, Image2) }()
    go func() { c <- First(query, Video1, Video2) }()
    timeout := time.After(80 * time.Millisecond)
    for i := 0; i < 3; i++ {
        select {
        case result := <-c:
            results = append(results, result)
        case <-timeout:
            fmt.Println("timed out")
            return
        }
    }
    return

Enter fullscreen mode Exit fullscreen mode

where the function First is defined as follow:

func First(query string, replicas ...Search) Result {
    c := make(chan Result, len(replicas))
    searchReplica := func(i int) { c <- replicas[i](query) }
    for i := range replicas {
        go searchReplica(i)
    }
    return <-c
}

Enter fullscreen mode Exit fullscreen mode

Let’s see in Java

    public void googleWithReplicatedServers(String query) throws ExecutionException, InterruptedException {
        List<String> result = Collections.synchronizedList(new ArrayList<>());

        // Unfortunately this does not work as expected because the inner anyOf 
        // won't stop the other calls, so the result might end up having 
        // duplicates, i.e [some-image, some-image, some-video]
        anyOf(
            allOf(
                anyOf(runAsync(() -> result.add(web(query))), runAsync(() -> result.add(webReplica(query)))),
                anyOf(runAsync(() -> result.add(image(query))), runAsync(() -> result.add(imageReplica(query)))),
                anyOf(runAsync(() -> result.add(video(query))), runAsync(() -> result.add(videoReplica(query))))
            ),
            timeout(TIMEOUT_MILLIS)
        ).get();

        System.out.println(result);
    }

Enter fullscreen mode Exit fullscreen mode

Unfortunately the code does not quite work because when one of the future completes will add the result in the array but the execution of the other one will continue causing a duplicate in the result.
Let’s correct that:

    // replicate servers and use the first response - fixing the problem mentioned 
    // earlier by using supplyAsync + thenAccept instead of runAsync
    public void googleWithReplicatedServers2(String query) throws ExecutionException, InterruptedException {
        List<String> result = Collections.synchronizedList(new ArrayList<>());

        anyOf(
            allOf(
                anyOf(supplyAsync(() -> web(query)),
                      supplyAsync(() -> webReplica(query))).thenAccept((s) -> result.add((String) s)),
                anyOf(supplyAsync(() -> image(query)),
                      supplyAsync(() -> imageReplica(query))).thenAccept((s) -> result.add((String) s)),
                anyOf(supplyAsync(() -> video(query)),
                      supplyAsync(() -> videoReplica(query))).thenAccept((s) -> result.add((String) s))
            ),
            timeout(TIMEOUT_MILLIS)
        ).get();

        System.out.println(result);
    }

    // same as above, but this time we use the function 'first', which is really 
    // just a wrapper around CompletableFuture.anyOf
    public void googleWithReplicatedServers3(String query) throws ExecutionException, InterruptedException {
        List<String> result = Collections.synchronizedList(new ArrayList<>());

        anyOf(
            allOf(
                first(query, Google::web, Google::webReplica).thenAccept((s) ->  result.add((String) s)),
                first(query, Google::image, Google::imageReplica).thenAccept((s) ->  result.add((String) s)),
                first(query, Google::video, Google::videoReplica).thenAccept((s) ->  result.add((String) s))
            ),
            timeout(TIMEOUT_MILLIS)
        ).get();

        System.out.println(result);
    }

Enter fullscreen mode Exit fullscreen mode

Conclusions

Besides the fact I had some fun with CompletableFuture, the clear advantage of Go is really the fact that the concurrency model is built in the language itself which simplifies the communication among different agents.
On the other side, I am not sure why they dropped OOP support, like classes for example. I mean, what’s wrong with OOP?

原文链接:Go for Java developers (or is the Java concurrency that bad?!)

© 版权声明
THE END
喜欢就支持一下吧
点赞15 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容