Java Streaming API recipes

In Java, elements in the stream are subdivided into subsets (can be subdivided into further) which are processed in parallel using different cores of the CPU. Therefore, only stateless, non-interfering and associative subsets are eligible to run parallel. These subsets are combined for the short-circuit/terminal process for the final result. The sequential(default) or parallel mode depends on the last used method in the pipeline.

Fig.1: Stream
Fig.1: Stream



Streams API

In the blog post Use of default and static methods, I have introduce how to write a factorial lambda functions using UnaryOperator. You can use the java.util.stream.IntStream.reduce function to calculate the factorial value in the stream as follows.

var r =IntStream.range(1, 10).reduce(1,(a,b) -> a*b);

Streams handling interfaces:

  • use Java generics.
  • BaseStream - defines core stream behaviours, such as managing the stream in a parallel or sequential mode.
  • Stream<T>, DoubleStream, IntStream, LongStream interfaces extend the BaseStream and primitive streams provide stream processing operations for primitives and avoid overhead of boxing and unboxing.

The Stream<T> must return Objects while primitive streams returns primitives as follows:

var a = IntStream.of(1,2,3)
var i = a.iterator()
var v = i.next() // created variable v : Integer
var vp = i.nextInt() // created variable vp : int

Fig.2: Stream API
Fig.2: Stream API

Stream creating from and list:

List<Integer> l1 = new ArrayList<>(){ { for(int i=0; i < 15; i++) add((int)(Math.random()*10));} };

l1.stream().parallel().mapToDouble(x -> x).forEach(System.out::println);

In the above line# 3, static method reference use to print the values.

List.<Character>of('a','b').stream()
            .map(String::charAt) // Error
        .forEach(System.out::print);

In the above code, line# 2. , the charAt is instance reference and not possible to create static revere from String. Therefore, above code will not work.

Stream creating from Arrays:

String[] strs = {"a", "z", "b", "k", "c"}
Arrays.stream(strs).parallel().forEach(System.out::println)

or using static method Stream.of(...):

Stream.of(1,2,3,4,5,6).forEach(System.out::println);

Or using primitive stream:

IntStream.generate(()->(int)(Math.random()*10)).distinct().takeWhile(n -> n !=4).forEach(System.out::println);

In the above code, distinct() is an intermediate operation that performs an action and produces another stream. The takeWhile(...) is a short-circuit operation which produces a finite result from the infinite stream. The forEach(...) is a terminal operation that traverses the stream pipeline and ends the stream processing.

Functional Interfaces:

  • Supplier<T> create the stream via T get () to produce elements.
  • Predicate<T> filter the stream applying conditions via boolean test (T t).
  • Function<T, R> convert type T to type R by the method R apply (T t).
  • UnaryOperator<T> extended from Function apply a function to create new value of the same type T by method T apply (T t).
  • Consumer<T> don't return anything but consuming the stream via void accept (T t).

Following are the functions to process more than one value at a time:

  • BiPredicate<T,U> use boolean test (T t, U u)
  • BiFunction<T,U,R> use R apply(T t, U u)
  • BinaryOperator<T> use T apply(T t1, T t2)
  • BiConsumer<T> use void acccept(T t, U u)

The map operation as follows:

Function<Integer, String> m = x -> x+" 0";
UnaryOperator<String> u = x -> x.replace(" ","");
l1.stream().map(m.andThen(u))
  .mapToInt( x -> Integer.valueOf(x)).forEach(System.out::println);

// possible output
// 20
// 30
// 40
// 70
// ...
// ...
// ...

Here the example for infinite stream:

var ints = Stream.iterate(0, i -> i+1);

ints
  //.limit(10)
  .filter(c -> c < 2)
  .sorted()
  .findFirst()
  .ifPresent(System.out::println);

In the above code when you uncomment the line# 4, this stream process will terminate because of the infinite source in the line #1.

Another example:

// List<Character> list = new ArrayList<>(){ { for(char i=65; i < 70; i++) add(i);} };
// var chars = list.stream();

// infinite
var chars = Stream.generate(() -> 'A');

chars.filter(c -> c < 'B')
  .sorted()
  .findFirst()
  .ifPresent(System.out::println);

The line# 1 source of the above stream is finite. However, source of the line# 5 is infinite therefore sorting will be never finished.

Short-Circuit terminal operations

For example,

Integer[] values = {2,5,8,7,9,3,4,1,3,6}

Optional<Integer> any = Arrays.stream(values).findAny();
any.get() // 2

Arrays.stream(values).findFirst() // 2

Arrays.stream(values).allMatch( i -> i == 2); //false but
Arrays.stream(values).anyMatch( i -> i == 2); // is true
Arrays.stream(values).noneMatch( i -> i == 2); // false

Arrays.stream(values).count(); // 10

Arrays.stream(values).mapToInt(i -> (int)i).sum() // 48

Arrays.stream(values).mapToInt(i -> (int)i).average(); // OptionalDouble[4.8]
Arrays.stream(values).mapToInt(i -> (int)i).max(); // OptionalInt[9]
Arrays.stream(values).mapToInt(i -> (int)i).min(); // OptionalInt[1]

You can create a stream from different data structures with different type of data:

Set<String> set = new HashSet<>();
set.add("A");
set.add("B");
//List<String> list = Arrays.asList("1","2");
List<Integer> list = Arrays.asList(1,2);
Deque<Boolean> queue = new ArrayDeque<>();
queue.push(true);
queue.push(false);
Stream.of(set, list, queue ).forEach(System.out::println);
// [A, B]
// [1, 2]
// [false, true]

Here the use of flatMap to modify above:

// modify line# 9 in the above code
Stream.of(set, list, queue ).flatMap(x -> x.stream()).forEach(System.out::println);
// A
// B
// 1
// 2
// false
// true

Stream Aggregation

Following requirments need to be satisfied to perform Collector's parallel reduction:

  1. Stream should be parallel
  2. The parameter of the collect() operation has the Collector.Characteristics.CONCURRENT
  3. Either collector is Collector.Characteristics.UNORDERED or stream should be unordered.

For example, the methods Collectors.toConcurrentMap() and Collectors.groupingByConcurrent() both have Collector.Characteristics.CONCURRENT and Collector.Characteristics.UNORDERED characteristics.

var fruits = Stream.of("Mango","Apple","kiwi","Bannana").parallel();
ConcurrentMap<Integer, List<String>> map = fruits.collect(
  Collectors.groupingByConcurrent(String::length));
System.out.println(map); 

Above stream is parallel.

In the following code, in the map elements are added to the thread-safe collection randomly.

var d = Collections.synchronizedList(new ArrayList<>());
var result = IntStream.range(1,6)
  .parallel()
  .map(i -> {d.add(i);return i;})
  .collect(ArrayList::new, List::add, List::addAll);
result.forEach(System.out::println); // ordered - 12345
System.out.println("---------");
d.forEach(System.out::println); // not ordered - 15342

But the result is ordered.

Produce a single result from the stream of values using reduce operation:

  1. Optional<T> reduce (BinaryOperator<T> accumulator) performs accumulation.

    List<Integer> l1 = new ArrayList<>(){ { for(int i=0; i < 10; i++) add(i+1);} };
    l1.stream().reduce((x,y)-> x +y) // Optional[55]
    
  2. T reduce (T identity, BinaryOperator<T> accumulator) identity acts as initial or default value.

    List<Integer> l1 = new ArrayList<>(){ { for(int i=0; i < 10; i++) add(i+1);} };
    l1.stream().reduce(0,(x,y)-> x +y)  // 55 but not Optional because default is available
    // or
    Character[] a = {'a','b','c','d'}
    Arrays.stream(a).parallel().map(c -> c.toString()).reduce("<", (x,y) -> x+y);
    // output : "<a<b<c<d"
  3. <U> U reduce (U identity, BiFunction<U, T, U> accumulator, BinaryOperator<U> combiner)

    Character[] a = {'a','b','c','d'}
    Arrays.stream(a).parallel().reduce("<", (x,y) -> x +y, (x,y) -> x+">"+y)
    // output: "<a><b><c><d"

Collectors

Here the basic use of the java.util.stream.Stream.collect method:

List<String> list = Stream.of("c","a","t")
  .collect(ArrayList::new, List::add, List::addAll); // [c, a, t]

The ArrayList::new is the supplier creating mutable result container for parallel execution. The ArrayList::add is the accumulator an associative, non-interfering, stateless function that must fold an element into a result container. The ArrayList::addAll is the combiner an associative, non-interfering, stateless function that accepts two partial result containers and merges them, which must be compatible with the accumulator function. The combiner function must fold the elements from the second result container into the first result container.

NOTE: For concurrent reduction, the underlying type should be a thread-safe collection and stream need to be marked as parallel.

Here the examples of using basic collectors:

// create a list of [1, 2, 3]
List<Integer> l1 = new ArrayList<>(){ { for(int i=0; i < 3; i++) add(i+1);} };

l1.stream().collect(Collectors.summarizingInt(x -> x)); 
// output: IntSummaryStatistics{count=3, sum=6, min=1, average=2.000000, max=3} 

l1.stream().collect(Collectors.mapping(x -> x.toString(), Collectors.joining(",")));
// output: "1,2,3"

l1.stream().map(x -> x*x).collect(Collectors.toList())
// output: [1, 4, 9] 
  
// use of collectingAndThen
l1.stream().collect(Collectors.collectingAndThen(Collectors.averagingInt(x -> x*x),x->"avarage: " + x)); 
// output: "avarage: 4.666666666666667"

Partitioning

// create integer list
List<Integer> intlist = new ArrayList<>(){ { for(int i=0; i < 8; i++) add(i+1);} };
// output: intlist ==> [1, 2, 3, 4, 5, 6, 7, 8]

// group on Predicate
intlist.stream().collect(Collectors.partitioningBy(x -> x >5));
// output: {false=[1, 2, 3, 4, 5], true=[6, 7, 8]} is type of Map<Boolean,List<Integer>>

Grouping

// create a list
List<String> fruits = new ArrayList<>(){ {add("Apple");add("Mango");add("Avacado");add("Melon");add("Orange");} }

//Group the elements by first letter for their names
fruits.stream().collect(Collectors.groupingBy(x -> x.substring(0,1)))
// output: {A=[Apple, Avacado], M=[Mango, Melon], O=[Orange]} 

Flat Mapping

List<String> s = new ArrayList<>(){ {add("aaa");add("aaaa");add("bbb");add("aacc");} }
// output: [aaa, aaaa, bbb, aacc]

s.stream().collect(Collectors.groupingBy(x -> x.length(), Collectors.flatMapping(x-> x.chars().mapToObj(i-> (char)i),Collectors.toSet())))
// output: {3=[a, b], 4=[a, c]}

For the above flat mapping, need the stream similar to the following example.

Stream<Character> a ="Ojitha".chars().mapToObj(i -> (char)i)
a.forEach(System.out::print) // Ojitha 

Filtering

List<String> s = new ArrayList<>(){ {add("aaa");add("aaaa");add("bbb");add("aacc");} }
// output: [aaa, aaaa, bbb, aacc]

s.stream().collect(Collectors.groupingBy(x -> x.length(), Collectors.filtering(x-> !x.equals("aaa") ,Collectors.toSet())))
// {3=[bbb], 4=[aaaa, aacc]} instead of {3=[aaa, bbb]...  

Say you have to map the words base on the length as the key, then you have to use the Collectors.toMap(keyMapper, valueMapper, mergeFunction) function.

var s = Stream.of("speak", "bark", "talk", "shout");
BinaryOperator<String> merge = (a, b) -> a+", "+b;
var map = s.collect(toMap(String::length, k -> k, merge));
System.out.println(map); // {4=bark, talk, 5=speak, shout}

Comments

Popular posts from this blog

How To: GitHub projects in Spring Tool Suite

Spring 3 Part 7: Spring with Databases

Parse the namespace based XML using Python