Java 9 Parallelism
A current computer programming paradigm is not object-oriented anymore; it is about parallelism1. In the programming perspective, concurrency is the composition of independently executing processes. Parallelism is the simultaneous execution of computations.
Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once.
Concurrency is about structure, parallelism is about execution.
Java Futures
are the way to support asynchronous operations. The Java fork/join framework and parallel streams divide a task into multiple subtasks and perform those subtasks parallelly on different cores in a single machine. This will avoid blocking a thread and wasting CPU resources.
Introduction
The key goal of parallelism is to efficiently partition tasks into sub-tasks(fork) and combine results(join). That will improve the performance in the way of maximum throughput, high scalability and low latency. Parallelism is best to use when
- tasks are independent
- high volume of data to process
- When threads neither block nor share mutable state
- When need to utilize more processors and cores
Since Java, early releases, synchronized
, Runnable
and Thread
are lock-based support to concurrent programming in the single-core. The monitoring2 mechanism using wait
and notify
was there at that time. Java 5 introduced java.util.concurrent
package, which supports ExecutorService
interface, which decoupled task submission from thread execution. The ExecutorService
can execute both Runnable
and Callable
. Java 7 introduced the fork-join pool framework3 by adding java.util.concurrent.Recursive-Task
to support fork/join implementation. Java 8 added support for Parallel Stream supporting lambda functions. Also, introduce decorator Completable Future on Future
where Async operations are a model: the caller doesn't block waiting for the callee.
Java threads are OS threads which are expensive and limited(crash if exceeds the number of maximum threads allowed).
Here the example of the old single-core thread:
class OldSingleThreadEx {
static Integer l=0;
static Integer r=0;
public static void main(String[] args) throws InterruptedException {
Thread lt = new Thread(()->{
for(int i=0; i < 10;i++){
l = l+i;
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"lt");
Thread rt = new Thread(()->{
for(int i=10; i < 20;i++){
r = r+i;
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"rt");
lt.start();
rt.start();
lt.join();
rt.join();
System.out.println("total: "+(l+r));
}
}
Above code (Java 1) can be refactored to use ExecutorService
.
In Java, Daemon threads are terminated when exit itself, but
main
thread has to wait for all non-daemons threads to be terminated.
Executor
Following code creates a pool of threads, which will serve tasks to acquire the first-come basis and return back when finished. In this case, full control of the threads pool's size, priority, and reject/accept policies are easy to handle.
ExecutorService executorService = Executors.newFixedThreadPool(int nThreads);
But there are problems:
- Thread pool is fixed size and tasks has to be in a queue until a thread is free.
- The worst is I/O blocking where a thread is occupying unnecessarily.
- It is important to shut down the thread pool after complete all the threads.
Java offers two important interfaces for this:
Future
CompletableFuture
Here the modified aexample of above thread example usubg ExecutorService
:
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
class Scratch {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<Integer> lt = executorService.submit( ()->{
int t=0;
for(int i=0; i < 10;i++){
t = t+i;
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return t;
});
Future<Integer> rt = executorService.submit( ()->{
int t=0;
for(int i=10; i < 20;i++){
t = t+i;
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return t;
});
System.out.println("total: "+(lt.get()+rt.get()));
executorService.shutdown();
}
}
The output is similar to the following.
pool-1-thread-1
pool-1-thread-2
pool-1-thread-2
pool-1-thread-1
As you can see, Executor
using pool of threads. In the above code, at line #40, one wait for other get()
operation to be completed: which is blocking.
Synchronous APIs are blocking4 APIs, where asynchronous APIs are nonblocking.
Java 8 supported Java Future and CompletelableFuture implementation of that interface. The ordinary Feature is typically created with a Callable
.
CompletableFuture
This Here the simple example to understand, how CompletableFuture
is working:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
class CFEx1 {
public Future<Integer> getLength(String name){
CompletableFuture<Integer> futureLength = new CompletableFuture<>();
new Thread(() -> {
int length = 0;
try {
length = calculateLengthOfName(name);
} catch (Exception e) {
futureLength.completeExceptionally(e);
}
futureLength.complete(length);
}).start();
return futureLength;
}
public int calculateLengthOfName(String name) throws Exception {
char firstChar = name.charAt(0);
if (firstChar != Character.toUpperCase(firstChar) )
throw new Exception("First letter is not a capital letter");
return name.length();
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CFEx1 ex1 = new CFEx1();
String name = "ojitha";
Future<Integer> f = ex1.getLength(name);
while (!f.isDone()) System.out.print(".");
System.out.println("length of the name `"+name+"' is :"+f.get());
}
}
You can replace the above getLength
function as follows:
public Future<Integer> getLength(String name) {
return CompletableFuture.supplyAsync(() -> {
try {
return calculateLengthOfName(name);
} catch (Exception e) {
e.printStackTrace();
return null;
}
});
}
The Supplier
is run by one of the Executors
in the ForkJoinPool
.
\[
\begin{aligned}N^{threads}=N^{CPU}\times U^{CPU}\times \left( 1+W/C\right) \\
\end{aligned}
\]
If you are using customer Executor
, then you have to decide right number of threads in the pool using the above equation5. Here, \(N^{CPU}\) is number of cores calcualted with the Runtime.getRuntime().availableProcessors()
. The \(U^{CPU}\) is the target CPU between 0 to 1. The \(W/C\) is the ratio of wait time to compute time. You can rewrite above with the Executor
as follows:
public Future<Integer> getLength(String name) {
int n = 2; // can be caluclated based on above equation
Executor executor = Executors.newFixedThreadPool(n, (Runnable r) -> {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
});
return CompletableFuture.supplyAsync(() -> {
try {
return calculateLengthOfName(name);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}, executor);
}
If the routine has heavy I/O operations, then
CompletableFuture
s are the way to go, otherwiseStream
interface is the simplest to use.
REFERENCES
Comments
Post a Comment
commented your blog