Java 9 Parallelism

A current computer programming paradigm is not object-oriented anymore; it is about parallelism1. In the programming perspective, concurrency is the composition of independently executing processes. Parallelism is the simultaneous execution of computations.

Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once.

Concurrency is about structure, parallelism is about execution.



Fig.1: Threading vs parallel tasking
Fig.1: Threading vs parallel tasking

Java Futures are the way to support asynchronous operations. The Java fork/join framework and parallel streams divide a task into multiple subtasks and perform those subtasks parallelly on different cores in a single machine. This will avoid blocking a thread and wasting CPU resources.

Introduction

The key goal of parallelism is to efficiently partition tasks into sub-tasks(fork) and combine results(join). That will improve the performance in the way of maximum throughput, high scalability and low latency. Parallelism is best to use when

  • tasks are independent
  • high volume of data to process

Fig.2: More CPU & Volume support Parallel processing
Fig.2: More CPU & Volume support Parallel processing

  • When threads neither block nor share mutable state
  • When need to utilize more processors and cores

Since Java, early releases, synchronized, Runnable and Thread are lock-based support to concurrent programming in the single-core. The monitoring2 mechanism using wait and notify was there at that time. Java 5 introduced java.util.concurrent package, which supports ExecutorService interface, which decoupled task submission from thread execution. The ExecutorService can execute both Runnable and Callable. Java 7 introduced the fork-join pool framework3 by adding java.util.concurrent.Recursive-Task to support fork/join implementation. Java 8 added support for Parallel Stream supporting lambda functions. Also, introduce decorator Completable Future on Future where Async operations are a model: the caller doesn't block waiting for the callee.

Java threads are OS threads which are expensive and limited(crash if exceeds the number of maximum threads allowed).

Here the example of the old single-core thread:

class OldSingleThreadEx {
  
    static Integer l=0;
    static Integer r=0;

    public static void main(String[] args) throws InterruptedException {
      
        Thread lt = new Thread(()->{
            for(int i=0; i < 10;i++){
                l = l+i;
                System.out.println(Thread.currentThread().getName());
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"lt");
      
        Thread rt = new Thread(()->{
            for(int i=10; i < 20;i++){
                r = r+i;
                System.out.println(Thread.currentThread().getName());
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"rt");
      
        lt.start();
        rt.start();
        lt.join();
        rt.join();

        System.out.println("total: "+(l+r));
    }
    
}

Above code (Java 1) can be refactored to use ExecutorService.

In Java, Daemon threads are terminated when exit itself, but main thread has to wait for all non-daemons threads to be terminated.

Executor

Following code creates a pool of threads, which will serve tasks to acquire the first-come basis and return back when finished. In this case, full control of the threads pool's size, priority, and reject/accept policies are easy to handle.

ExecutorService executorService = Executors.newFixedThreadPool(int nThreads);

But there are problems:

  • Thread pool is fixed size and tasks has to be in a queue until a thread is free.
  • The worst is I/O blocking where a thread is occupying unnecessarily.
  • It is important to shut down the thread pool after complete all the threads.

Java offers two important interfaces for this:

  • Future
  • CompletableFuture

Here the modified aexample of above thread example usubg ExecutorService:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

class Scratch {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        
      ExecutorService executorService = Executors.newFixedThreadPool(2);

        Future<Integer> lt = executorService.submit( ()->{
            int t=0;
            for(int i=0; i < 10;i++){
                t = t+i;
                System.out.println(Thread.currentThread().getName());
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return t;
        });
      
        Future<Integer> rt = executorService.submit( ()->{
            int t=0;
            for(int i=10; i < 20;i++){
                t = t+i;
                System.out.println(Thread.currentThread().getName());
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return t;
        });

        System.out.println("total: "+(lt.get()+rt.get()));
        executorService.shutdown();
    }

}

The output is similar to the following.

pool-1-thread-1
pool-1-thread-2
pool-1-thread-2
pool-1-thread-1

As you can see, Executor using pool of threads. In the above code, at line #40, one wait for other get() operation to be completed: which is blocking.

Synchronous APIs are blocking4 APIs, where asynchronous APIs are nonblocking.

Java 8 supported Java Future and CompletelableFuture implementation of that interface. The ordinary Feature is typically created with a Callable.

CompletableFuture

This Here the simple example to understand, how CompletableFuture is working:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

class CFEx1 {

    public Future<Integer> getLength(String name){
        CompletableFuture<Integer> futureLength = new CompletableFuture<>();
        new Thread(() -> {
            int length = 0;
            try {
                length = calculateLengthOfName(name);
            } catch (Exception e) {
                futureLength.completeExceptionally(e);
            }
            futureLength.complete(length);
        }).start();

        return futureLength;
    }

    public int calculateLengthOfName(String name) throws Exception {
        char firstChar = name.charAt(0);
        if (firstChar != Character.toUpperCase(firstChar) )
            throw new Exception("First letter is not a capital letter");
        return name.length();
    }
  
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CFEx1 ex1 = new CFEx1();
        String name = "ojitha";

        Future<Integer> f = ex1.getLength(name);
        while (!f.isDone()) System.out.print(".");

        System.out.println("length of the name `"+name+"' is :"+f.get());

    }

}

You can replace the above getLength function as follows:

public Future<Integer> getLength(String name)  {
    return CompletableFuture.supplyAsync(()  -> {
        try {
            return calculateLengthOfName(name);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
     });
}

The Supplier is run by one of the Executors in the ForkJoinPool.
\[
\begin{aligned}N^{threads}=N^{CPU}\times U^{CPU}\times \left( 1+W/C\right) \\
\end{aligned}
\]
If you are using customer Executor, then you have to decide right number of threads in the pool using the above equation5. Here, \(N^{CPU}\) is number of cores calcualted with the Runtime.getRuntime().availableProcessors(). The \(U^{CPU}\) is the target CPU between 0 to 1. The \(W/C\) is the ratio of wait time to compute time. You can rewrite above with the Executor as follows:

public Future<Integer> getLength(String name)  {
    int n = 2; // can be caluclated based on above equation
    Executor executor = Executors.newFixedThreadPool(n, (Runnable r) -> {
        Thread t = new Thread(r);
        t.setDaemon(true);
        return t;
    });
    return CompletableFuture.supplyAsync(()  -> {
        try {
            return calculateLengthOfName(name);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
     }, executor);
}

If the routine has heavy I/O operations, then CompletableFutures are the way to go, otherwise Stream interface is the simplest to use.

REFERENCES

Comments

Popular posts from this blog

How To: GitHub projects in Spring Tool Suite

Spring 3 Part 7: Spring with Databases

Parse the namespace based XML using Python