Java Streaming API recipes
In Java, elements in the stream are subdivided into subsets (can be subdivided into further) which are processed in parallel using different cores of the CPU. Therefore, only stateless, non-interfering and associative subsets are eligible to run parallel. These subsets are combined for the short-circuit/terminal process for the final result. The sequential(default) or parallel mode depends on the last used method in the pipeline.
Streams API
In the blog post Use of default and static methods, I have introduce how to write a factorial lambda functions using UnaryOperator
. You can use the java.util.stream.IntStream.reduce
function to calculate the factorial value in the stream as follows.
var r =IntStream.range(1, 10).reduce(1,(a,b) -> a*b);
Streams handling interfaces:
- use Java generics.
BaseStream
- defines core stream behaviours, such as managing the stream in a parallel or sequential mode.Stream<T>
,DoubleStream
,IntStream
,LongStream
interfaces extend theBaseStream
and primitive streams provide stream processing operations for primitives and avoid overhead of boxing and unboxing.
The Stream<T>
must return Objects while primitive streams returns primitives as follows:
var a = IntStream.of(1,2,3)
var i = a.iterator()
var v = i.next() // created variable v : Integer
var vp = i.nextInt() // created variable vp : int
Stream creating from and list:
List<Integer> l1 = new ArrayList<>(){ { for(int i=0; i < 15; i++) add((int)(Math.random()*10));} };
l1.stream().parallel().mapToDouble(x -> x).forEach(System.out::println);
In the above line# 3, static method reference use to print the values.
List.<Character>of('a','b').stream()
.map(String::charAt) // Error
.forEach(System.out::print);
In the above code, line# 2. , the charAt
is instance reference and not possible to create static revere from String
. Therefore, above code will not work.
Stream creating from Arrays
:
String[] strs = {"a", "z", "b", "k", "c"}
Arrays.stream(strs).parallel().forEach(System.out::println)
or using static method Stream.of(...)
:
Stream.of(1,2,3,4,5,6).forEach(System.out::println);
Or using primitive stream:
IntStream.generate(()->(int)(Math.random()*10)).distinct().takeWhile(n -> n !=4).forEach(System.out::println);
In the above code, distinct()
is an intermediate operation that performs an action and produces another stream. The takeWhile(...)
is a short-circuit operation which produces a finite result from the infinite stream. The forEach(...)
is a terminal operation that traverses the stream pipeline and ends the stream processing.
Functional Interfaces:
Supplier<T>
create the stream viaT get ()
to produce elements.Predicate<T>
filter the stream applying conditions viaboolean test (T t)
.Function<T, R>
convert typeT
to typeR
by the methodR apply (T t)
.UnaryOperator<T>
extended fromFunction
apply a function to create new value of the same typeT
by methodT apply (T t)
.Consumer<T>
don't return anything but consuming the stream viavoid accept (T t)
.
Following are the functions to process more than one value at a time:
BiPredicate<T,U>
useboolean test (T t, U u)
BiFunction<T,U,R>
useR apply(T t, U u)
BinaryOperator<T>
useT apply(T t1, T t2)
BiConsumer<T>
usevoid acccept(T t, U u)
The map
operation as follows:
Function<Integer, String> m = x -> x+" 0";
UnaryOperator<String> u = x -> x.replace(" ","");
l1.stream().map(m.andThen(u))
.mapToInt( x -> Integer.valueOf(x)).forEach(System.out::println);
// possible output
// 20
// 30
// 40
// 70
// ...
// ...
// ...
Here the example for infinite stream:
var ints = Stream.iterate(0, i -> i+1);
ints
//.limit(10)
.filter(c -> c < 2)
.sorted()
.findFirst()
.ifPresent(System.out::println);
In the above code when you uncomment the line# 4, this stream process will terminate because of the infinite source in the line #1.
Another example:
// List<Character> list = new ArrayList<>(){ { for(char i=65; i < 70; i++) add(i);} };
// var chars = list.stream();
// infinite
var chars = Stream.generate(() -> 'A');
chars.filter(c -> c < 'B')
.sorted()
.findFirst()
.ifPresent(System.out::println);
The line# 1 source of the above stream is finite. However, source of the line# 5 is infinite therefore sorting will be never finished.
Short-Circuit terminal operations
For example,
Integer[] values = {2,5,8,7,9,3,4,1,3,6}
Optional<Integer> any = Arrays.stream(values).findAny();
any.get() // 2
Arrays.stream(values).findFirst() // 2
Arrays.stream(values).allMatch( i -> i == 2); //false but
Arrays.stream(values).anyMatch( i -> i == 2); // is true
Arrays.stream(values).noneMatch( i -> i == 2); // false
Arrays.stream(values).count(); // 10
Arrays.stream(values).mapToInt(i -> (int)i).sum() // 48
Arrays.stream(values).mapToInt(i -> (int)i).average(); // OptionalDouble[4.8]
Arrays.stream(values).mapToInt(i -> (int)i).max(); // OptionalInt[9]
Arrays.stream(values).mapToInt(i -> (int)i).min(); // OptionalInt[1]
You can create a stream from different data structures with different type of data:
Set<String> set = new HashSet<>();
set.add("A");
set.add("B");
//List<String> list = Arrays.asList("1","2");
List<Integer> list = Arrays.asList(1,2);
Deque<Boolean> queue = new ArrayDeque<>();
queue.push(true);
queue.push(false);
Stream.of(set, list, queue ).forEach(System.out::println);
// [A, B]
// [1, 2]
// [false, true]
Here the use of flatMap
to modify above:
// modify line# 9 in the above code
Stream.of(set, list, queue ).flatMap(x -> x.stream()).forEach(System.out::println);
// A
// B
// 1
// 2
// false
// true
Stream Aggregation
Following requirments need to be satisfied to perform Collector
's parallel reduction:
- Stream should be parallel
- The parameter of the collect() operation has the
Collector.Characteristics.CONCURRENT
- Either collector is
Collector.Characteristics.UNORDERED
or stream should be unordered.
For example, the methods Collectors.toConcurrentMap()
and Collectors.groupingByConcurrent()
both have Collector.Characteristics.CONCURRENT
and Collector.Characteristics.UNORDERED
characteristics.
var fruits = Stream.of("Mango","Apple","kiwi","Bannana").parallel();
ConcurrentMap<Integer, List<String>> map = fruits.collect(
Collectors.groupingByConcurrent(String::length));
System.out.println(map);
Above stream is parallel.
In the following code, in the map elements are added to the thread-safe collection randomly.
var d = Collections.synchronizedList(new ArrayList<>());
var result = IntStream.range(1,6)
.parallel()
.map(i -> {d.add(i);return i;})
.collect(ArrayList::new, List::add, List::addAll);
result.forEach(System.out::println); // ordered - 12345
System.out.println("---------");
d.forEach(System.out::println); // not ordered - 15342
But the result
is ordered.
Produce a single result from the stream of values using reduce
operation:
Optional<T> reduce (BinaryOperator<T> accumulator)
performs accumulation.List<Integer> l1 = new ArrayList<>(){ { for(int i=0; i < 10; i++) add(i+1);} }; l1.stream().reduce((x,y)-> x +y) // Optional[55]
T reduce (T identity, BinaryOperator<T> accumulator)
identity acts as initial or default value.List<Integer> l1 = new ArrayList<>(){ { for(int i=0; i < 10; i++) add(i+1);} }; l1.stream().reduce(0,(x,y)-> x +y) // 55 but not Optional because default is available
// or
Character[] a = {'a','b','c','d'}
Arrays.stream(a).parallel().map(c -> c.toString()).reduce("<", (x,y) -> x+y);
// output : "<a<b<c<d"<U> U reduce (U identity, BiFunction<U, T, U> accumulator, BinaryOperator<U> combiner)
Character[] a = {'a','b','c','d'} Arrays.stream(a).parallel().reduce("<", (x,y) -> x +y, (x,y) -> x+">"+y)
// output: "<a><b><c><d"
Collectors
Here the basic use of the java.util.stream.Stream.collect
method:
List<String> list = Stream.of("c","a","t")
.collect(ArrayList::new, List::add, List::addAll); // [c, a, t]
The ArrayList::new
is the supplier creating mutable result container for parallel execution. The ArrayList::add
is the accumulator an associative, non-interfering, stateless function that must fold an element into a result container. The ArrayList::addAll
is the combiner an associative, non-interfering, stateless function that accepts two partial result containers and merges them, which must be compatible with the accumulator function. The combiner function must fold the elements from the second result container into the first result container.
NOTE: For concurrent reduction, the underlying type should be a thread-safe collection and stream need to be marked as parallel.
Here the examples of using basic collectors:
// create a list of [1, 2, 3]
List<Integer> l1 = new ArrayList<>(){ { for(int i=0; i < 3; i++) add(i+1);} };
l1.stream().collect(Collectors.summarizingInt(x -> x));
// output: IntSummaryStatistics{count=3, sum=6, min=1, average=2.000000, max=3}
l1.stream().collect(Collectors.mapping(x -> x.toString(), Collectors.joining(",")));
// output: "1,2,3"
l1.stream().map(x -> x*x).collect(Collectors.toList())
// output: [1, 4, 9]
// use of collectingAndThen
l1.stream().collect(Collectors.collectingAndThen(Collectors.averagingInt(x -> x*x),x->"avarage: " + x));
// output: "avarage: 4.666666666666667"
Partitioning
// create integer list
List<Integer> intlist = new ArrayList<>(){ { for(int i=0; i < 8; i++) add(i+1);} };
// output: intlist ==> [1, 2, 3, 4, 5, 6, 7, 8]
// group on Predicate
intlist.stream().collect(Collectors.partitioningBy(x -> x >5));
// output: {false=[1, 2, 3, 4, 5], true=[6, 7, 8]} is type of Map<Boolean,List<Integer>>
Grouping
// create a list
List<String> fruits = new ArrayList<>(){ {add("Apple");add("Mango");add("Avacado");add("Melon");add("Orange");} }
//Group the elements by first letter for their names
fruits.stream().collect(Collectors.groupingBy(x -> x.substring(0,1)))
// output: {A=[Apple, Avacado], M=[Mango, Melon], O=[Orange]}
Flat Mapping
List<String> s = new ArrayList<>(){ {add("aaa");add("aaaa");add("bbb");add("aacc");} }
// output: [aaa, aaaa, bbb, aacc]
s.stream().collect(Collectors.groupingBy(x -> x.length(), Collectors.flatMapping(x-> x.chars().mapToObj(i-> (char)i),Collectors.toSet())))
// output: {3=[a, b], 4=[a, c]}
For the above flat mapping, need the stream similar to the following example.
Stream<Character> a ="Ojitha".chars().mapToObj(i -> (char)i)
a.forEach(System.out::print) // Ojitha
Filtering
List<String> s = new ArrayList<>(){ {add("aaa");add("aaaa");add("bbb");add("aacc");} }
// output: [aaa, aaaa, bbb, aacc]
s.stream().collect(Collectors.groupingBy(x -> x.length(), Collectors.filtering(x-> !x.equals("aaa") ,Collectors.toSet())))
// {3=[bbb], 4=[aaaa, aacc]} instead of {3=[aaa, bbb]...
Say you have to map the words base on the length as the key, then you have to use the Collectors.toMap(keyMapper, valueMapper, mergeFunction)
function.
var s = Stream.of("speak", "bark", "talk", "shout");
BinaryOperator<String> merge = (a, b) -> a+", "+b;
var map = s.collect(toMap(String::length, k -> k, merge));
System.out.println(map); // {4=bark, talk, 5=speak, shout}
Comments
Post a Comment
commented your blog