为什么需要Stream

在很多时候我们需要遍历一个集合,根据一些条件从中筛选出我们需要的元素,常规的做法就是用foreach遍历,在foreach块里做判断来筛选。而Stream提供了一种抽象程度更高的选择,使我们在完成计算任务的时候只需要说明需要完成什么任务,而不是说明如何去实现它。

Stream的特点

1、流不存储元素。这些元素可能存储在底层的集合中,或者是按需生成的。

2、流的操作不会修改其数据源。

3、流的操作时尽可能惰性执行的。这意味着知道需要其结果的时候,操作才会执行。

四大函数式接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//R apply(T t)函数式接口,一个参数,一个返回值
Function<String, Integer> function = String::length;
System.out.println(function.apply("function"));

//boolean test(T t)断定型接口,一个参数,返回boolean
Predicate<String> predicate = t -> {
return t.startsWith("Predicate");
};
System.out.println(predicate.test("Predicate"));

//void accept(T t)消费型接口,一个参数,没有返回值
Consumer<String> consumer = System.out::println;
consumer.accept("Consumer");

//T get()供给型接口,无参数,有返回值
Supplier<String> supplier = () -> {
return "Supplier";
};
System.out.println(supplier.get());

###流的转换方法

流的转换会产生一个新的流,它的元素派生自另一个流中的元素。

1
2
Steam<T> filter(Predicate<? super T> predicate) 
//Returns a stream consisting of the elements of this stream that match the given predicate.

fliter方法用于过滤流中的元素,接受的参数是一个判定型接口。

1
2
<R> Stream<R> map(Function<? super T,? extends R> mapper) 
//Returns a stream consisting of the results of applying the given function to the elements of this stream.

map方法用于转换,它将mapper应用于当前流中所有元素所产生的结果。

1
2
<R> Stream<R> flatMap(Function<? super T,? extends Stream<? extends R>> mapper) 
//Returns a stream consisting of the results of replacing each element of this stream with the contents of a mapped //stream produced by applying the provided mapping function to each element.

flatMapmap基本一致唯一的不同是它返回的是应用mapper后的每一个结果连接起来以后的流。

1
2
Stream<T> distinct() 
//Returns a stream consisting of the distinct elements (according to Object.equals(Object)) of this stream.

返回流中所有的不同元素。终结操作

1
2
Stream<T> skip(long n) 
//Returns a stream consisting of the remaining elements of this stream after discarding the first n elements of the //stream.

返回除了前n个元素的流。终结操作

1
2
Stream<T> limit(long maxSize) 
//Returns a stream consisting of the elements of this stream, truncated to be no longer than maxSize in length.

返回前maxSize个元素的流。

1
2
3
4
Stream<T> sorted() 
//Returns a stream consisting of the elements of this stream, sorted according to natural order.
Stream<T> sorted(Comparator<? super T> comparator)
//Returns a stream consisting of the elements of this stream, sorted according to the provided Comparator.

无参的sort()返回元素升序排列后的流,有参的需要传一个自定义的比较器。

1
2
3
4
Optional<T> max(Comparator<? super T> comparator) 
//Returns the maximum element of this stream according to the provided Comparator.
Optional<T> min(Comparator<? super T> comparator)
//Returns the minimum element of this stream according to the provided Comparator.

返回这个流的最大最小元素,由传入的比较器决定比较规则。注:这里返回的是Optional对象。终结操作

1
2
3
4
Optional<T> findAny() 
//Returns an Optional describing some element of the stream, or an empty Optional if the stream is empty.
Optional<T> findFirst()
//Returns an Optional describing the first element of this stream, or an empty Optional if the stream is empty.

返回第任意一个元素,返第一个元素。终结操作

1
2
3
4
5
6
boolean allMatch(Predicate<? super T> predicate) 
//Returns whether all elements of this stream match the provided predicate.
boolean anyMatch(Predicate<? super T> predicate)
//Returns whether any elements of this stream match the provided predicate.
boolean noneMatch(Predicate<? super T> predicate)
//Returns whether no elements of this stream match the provided predicate.

分别在这个流中所有元素、任意元素和没有任意元素匹配时返回true。终结操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Arrays;
import java.util.Comparator;
import java.util.List;

/**
* @Author:Gao
* @Date:2020-07-25 21:01
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
class User {
private Integer id;
private String userName;
private int age;
}

/**
* 找出同时满足以下条件的用户,偶数ID且年龄大于24且用户名转为大写且用户名字母倒序排序
* 只输出一个用户名字
*/
public class StreamDemo {
public static void main(String[] args) {
User u1 = new User(11, "a", 23);
User u2 = new User(12, "b", 24);
User u3 = new User(13, "c", 22);
User u4 = new User(14, "d", 28);
User u5 = new User(16, "e", 26);

List<User> list = Arrays.asList(u1, u2, u3, u4, u5);

list.stream()
.filter(user -> user.getId() % 2 == 0)
.filter(user -> user.getAge() > 24)
.map(user -> user.getUserName().toUpperCase())
.sorted(Comparator.reverseOrder())
.limit(1)
.forEach(System.out::println);
StreamDemo streamDemo = new StreamDemo();
streamDemo.coreFunctionalInterface();
}
}

并行流

并行流parallerStream()来自Collection接口所以可以调用parallerStream()来从任何集合中获取并行流

1
Stream<String> paralleWords = words.parallerStream();

也可以将任意顺序流转换为并行流

1
Stream<String> paralleWords = Stream.of(wordArray).parallel();

只要终结方法执行时流处于并行模式,所有的中间流操作就都将被并行化。

当流操作并行运行时,其目标是让其返回结果与顺序执行时返回的结果相同。重要的是这些操作必须是无状态的,并且可以以任意顺序执行。

下面是一个单词计数程序

1
2
3
4
var shortWords = new int[12];
words.parallelStream().forEach(
s -> {if(s.length() < 12) shortWords[s.length()]++;});
System.out.println(Arrays.toString(shortWords))

因为多个线程会并发的修改共享的数组,所以这段代码不能计算出正确的结果。这也就引出了一个原则:

  • 在做并行计算的时候各个线程之间做的工作应当是不重叠的。

一个正确的处理方式应当是这样

1
2
3
4
5
Map<Integer,Long> shortWordCounts = words.parallelStream()
.filter(s -> s.length() < 12)
.collect(groupingBy(
String::length,
counting()));

先按单词长度分组,再统计各个长度的单词的数目,最后合并。

在做并行计算时要注意以下几点:

  • 并行化会导致大量的开销,只有面对非常大的数据集才划算。
  • 只有在底层的数据源可以被有效的分割成多个部分时,将流并行化才有意义。
  • 并行流使用的线程池可能会因诸如文件I/O或者网络访问这样的操作被阻塞而饿死。