Java函数编程探秘:潜入JavaStream(上)

二叶草 2020年2月13日22:12:52函数代码评论阅读模式

引子

大家对这样的代码一定很熟悉了:

List<Integer> squares = Arrays.asList(1,2,3,4,5).stream().map(x -> x*x).collect(Collectors.toList());
System.out.println(squares);

那么,这其中发生了什么呢 ?本文将为你揭开 JavaStream 的“神秘面纱”。

三要素

简单看下 Stream stream(), Stream map(Function), collector(Collector) 的入参和返回,即可知道JavaStream 的三要素:

  • 函数接口(Function)
  • 流(Stream)
  • 聚合器(Collector)

三者的关联是:流(Stream)通过 函数接口(Function)进行过滤和转换,最后通过聚合器(Collector)对流中的元素进行聚合操作,得到最终结果。

让我们来逐一认识它们。

函数接口

关于函数接口,需要记住的就是两件事:

  • 函数接口是行为的抽象;
  • 函数接口是数据转换器。

最直接的支持就是 java.util.Function 包。定义了四个最基础的函数接口:

  • Supplier[T]: 数据提供器,可以提供 T 类型对象;无参的构造器,提供了 get 方法;
  • Function[T,R]: 数据转换器,接收一个 T 类型的对象,返回一个 R 类型的对象;单参数单返回值的行为接口;提供了 apply, compose, andThen, identity 方法;
  • Consumer[T]: 数据消费器, 接收一个 T类型的对象,无返回值,通常用于根据T对象做些处理;单参数无返回值的行为接口;提供了 accept, andThen 方法;
  • Predicate[T]: 数据测试器,接收一个 T 类型的对象,返回布尔值,通常用于传递条件函数;单参数布尔值的条件性接口。提供了 test (条件测试) , and-or- negate(与或非) 方法。

其中, compose, andThen, and, or, negate 用来组合函数接口而得到更强大的函数接口。

其它的函数接口都是通过这四个扩展而来。

  • 在参数个数上扩展:比如接收双参数的,有 Bi 前缀, 比如 BiConsumer[T,U], BiFunction[T,U,R] ;
  • 在类型上扩展:比如接收原子类型参数的,有 [Int|Double|Long][Function|Consumer|Supplier|Predicate]
  • 特殊常用的变形:比如 BinaryOperator , 是同类型的双参数 BiFunction[T,T,T] 。

那么,这些函数接口可以接收哪些值呢?

  • 类/对象的静态方法引用、实例方法引用。引用符号为双冒号 ::
  • 类的构造器引用,比如 Class::new
  • lambda表达式

多练习,熟练使用函数接口并不难。

接下来,我们会更频繁地与之打交道。

聚合器

每一个流式计算的末尾总有一个类似 collect(Collectors.toList()) 的方法调用。Collectors.toList() 会返回一个聚合器 Collector 。

聚合器 Collector 的功能是将指定的数据流根据指定的能力聚合成最终结果。 聚合器是多个函数接口能力的组合,体现了函数编程的精要。 当然,聚合器实现也会相对复杂一点,要细细揣摩。

Reduce

在深入聚合器的内部实现之前,了解下 Reduce 是合适的。Reduce 是一个推导过程, 其算法如下:

STEP1: 初始化结果 R = init ;

STEP2: 给定一个值集 S。每次从 S 中取出一个值 v,通过二元操作符 op 施加到 R 和 v ,产生一个新值赋给 R = BinaryOperator(R, v);重复 STEP2, 直到 S 中没有值可取为止。

如下代码所示:S = list , op = biFunc ,R = result。

public static <E,T> T reduce(List<E> list, BiFunction<E,T,T> biFunc, Supplier<T> init) {
T result = init.get();
for (E e: list) {
      result = biFunc.apply(e, result);
    }
return result;
  }

Collector

来看看 Collector 的主要定义:

public interface Collector<T, A, R> {
Supplier<A> supplier();
BiConsumer<A, T> accumulator();
BinaryOperator<A> combiner();
Function<A, R> finisher();

完整的 Collector 含有四个要素。Collector 与 Reduce 有很多相似之处:有一个初始值提供器 init = supplier ; 有一个累积操作器 accumulator = op ;有一个 合并器 combiner ;有一个终值转换器 finisher 。比 Reduce 多出了两样东西:combiner 和 finisher 。

理解 Collector 定义要注意的是,泛型参数在方法参数中的顺序。A 是值提供器的类型,是累积操作的左参数,是合并操作的类型,也是中间结果的类型;T 是从某个 Stream 中取出的值的类型;R 是终值的类型。显然 A 是一个承前启后的核心类型。

看函数式代码时,往往容易被各种泛型参数弄得很糊涂。但函数式编程结合泛型,才能使代码的表达能力突破类型限制,提升到非常灵活的程度。

聚合器实现

Collectors 里提供了多种 Collector 的实现。Collector 大致可以划分为四类:列表类、统计类、映射类、自定义。

列表类聚合器

列表类 Collector 通常将 Stream of Collection 中的元素生成 Collection、List 或 Set 。来看 toList 的实现:

public static <T>
    Collector<T, ?, List<T>> toList() {
return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
                                   (left, right) -> { left.addAll(right); return left; },
                                   CH_ID);
    }

还是比较容易看懂的:

  • A 类型是 List[T] ;
  • 值提供器 supplier = ArrayList::new ,创建一个空的结果列表;
  • 值累加器 accumulator = (list,e) -> list.add(e) ; 将 Stream 取出的值加入到结果列表中;
  • 合并器 combiner = (left, right) -> { left.addAll(right); return left; }
  • 终值转换器 finisher = castingIdentity() = i -> (R) i (默认给出的)

它的返回值 Collector[T, ?, List[T]] 的含义是:List[T] 是结果列表;T 是每次从 Stream 中取出的类型 T ;中间运算结果 ? 是不限制类型的。

统计类聚合器

统计类聚合器通常生成单个值,主要包括 minBy,maxBy, counting, summing, averaging 等,基于 reducing 来实现。

来看看 reducing 的库实现。记住 Collectors.reducing 的 A 类型是 OptionalBox ,实际上就是上面 reduce 方法的局部变量 T result 的封装。present 用来处理首值赋值的问题。

public static <T> Collector<T, ?, Optional<T>>
    reducing(BinaryOperator<T> op) {
class OptionalBox implements Consumer<T> {
            T value = null;
boolean present = false;

@Override
public void accept(T t) {
if (present) {
                    value = op.apply(value, t);
                }
else {
                    value = t;
                    present = true;
                }
            }
        }

return new CollectorImpl<T, OptionalBox, Optional<T>>(
                OptionalBox::new, OptionalBox::accept,
                (a, b) -> { if (b.present) a.accept(b.value); return a; },
                a -> Optional.ofNullable(a.value), CH_NOID);
    }

映射类聚合器

映射类聚合器,通常是将一个 Stream[T] 聚合成 Map[K, V] 。看 toMap 的实现:这里提供了重载方法。

简单形式是只有 keyMapper, valueMapper 两个转换函数,最终的 Map[K,U] = [K=keyMapper.apply(T), U=valueMapper.apply(T)] ;初始值提供器默认 mapSupplier = HashMap::new。

完全形式是提供了 Collector 的四要素。完全形式的含义是:

STEP1: 先用简单形式的 keyMapper, valueMapper 两个转换函数,将指定流转换成 streamMap = Map[K,U];

STEP2: 合并 streamMap 与 mapSupplier 。合并的方法是,对于每一个 key 对应的 streamValue = streamMap[key], supplierValue = mapSupplier[key] , finalValue = mergeFunction(supplierValue, streamValue)

public static <T, K, U>
    Collector<T, ?, Map<K,U>> toMap(Function<? super T, ? extends K> keyMapper,
                                    Function<? super T, ? extends U> valueMapper) {
return toMap(keyMapper, valueMapper, throwingMerger(), HashMap::new);
}

public static <T, K, U, M extends Map<K, U>>
    Collector<T, ?, M> toMap(Function<? super T, ? extends K> keyMapper,
                                Function<? super T, ? extends U> valueMapper,
                                BinaryOperator<U> mergeFunction,
                                Supplier<M> mapSupplier) {
        BiConsumer<M, T> accumulator
                = (map, element) -> map.merge(keyMapper.apply(element),
                                              valueMapper.apply(element), mergeFunction);
return new CollectorImpl<>(mapSupplier, accumulator, mapMerger(mergeFunction), CH_ID);
    }

写个示例来理解下。先创建一个 Person 列表,然后通过 toMap 的简单形式可以创建 Map[name, age];

使用 toMap 的完全形式时,只需要多提供一个已有的 anotherPersonAgeMap = Map[name, age] ,然后创建一个 valueMerge = (age1, age2) -> age1 ,当有年龄重合时,用 MapSupplier 的 age 覆盖。

public class CollectorsToMapDemo {

public static void main(String[]args) {
List<Person> persons = Arrays.asList(new Person("qin", 32), new Person("ni", 24));
    Map<String, Integer> personAgeMap = persons.stream().collect(Collectors.toMap(
        Person::getName, Person::getAge
    ));
    System.out.println("personAgeMap: " + personAgeMap);

List<Person> anotherPersons = Arrays.asList(new Person("su", 24), new Person("ni", 25));
    Map<String, Integer> anotherPersonAgeMap = anotherPersons.stream().collect(Collectors.toMap(
        Person::getName, Person::getAge
    ));

    Map<String,Integer> merged = persons.stream().collect(Collectors.toMap(
        Person::getName, Person::getAge, (age1, age2) -> age1, () -> anotherPersonAgeMap
        ));
    System.out.println("merged: " + merged);
  }

}

@AllArgsConstructor
@Data
class Person {
private String name;
private Integer age;
}

输出结果:
personAgeMap: {qin=32, ni=24}
merged: {su=24, qin=32, ni=25}

分析 toMap 得到的启发是:**从简单形式着手,更容易理解其原理。复杂形式,往往是在某一方面对简单形式进行了一般化而得到的**。

再来看 groupingby 的实现。所涉及的泛型更加眼花缭乱,竟然有 T,K,D,A,M 这么多类型 !理一理:

  • T 是 Stream 中的数据的类型;
  • K 是 classifier.apply(T) 得到的类型,生成的结果 Map 的 key 的类型;
  • D 是 生成的结果 Map 的 value 的类型;
  • M 是结果 Map 的类型;
  • A 是中间结果类型,无限制。

从第二个实现看起,会更容易理解一点。首先,classifier 函数用来生成 key ,接着 downstream 应用于 Stream 生成 value 。比如,downstream = toList() , value = List[T] ; downstream = toMap , value = Map[K,U]。

public static <T, K> Collector<T, ?, Map<K, List<T>>>
    groupingBy(Function<? super T, ? extends K> classifier) {
return groupingBy(classifier, toList());
    }

public static <T, K, A, D>
    Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier,
                                          Collector<? super T, A, D> downstream) {
return groupingBy(classifier, HashMap::new, downstream);
    }

public static <T, K, D, A, M extends Map<K, D>>
    Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier,
                                  Supplier<M> mapFactory,
                                  Collector<? super T, A, D> downstream) {
        Supplier<A> downstreamSupplier = downstream.supplier();
        BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
        BiConsumer<Map<K, A>, T> accumulator = (m, t) -> {
            K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
            A container = m.computeIfAbsent(key, k -> downstreamSupplier.get());
            downstreamAccumulator.accept(container, t);
        };
        BinaryOperator<Map<K, A>> merger = Collectors.<K, A, Map<K, A>>mapMerger(downstream.combiner());
@SuppressWarnings("unchecked")
        Supplier<Map<K, A>> mangledFactory = (Supplier<Map<K, A>>) mapFactory;

if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_ID);
        }
else {
// code...
return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_NOID);
        }
    }

接下来,会用一个实际例子来说明其用法。

恭喜你!能坚持读到这里,已经是一种胜利。

实际例子

一种常用场景,是根据一个对象里的多个字段进行分组。比如,假设一个企业有多个部门(department),每个部门都有各种职务(position)的员工(Employee)。现在,要统计每个部门下的每种职务的员工姓名。其结果形式是:Map[department, Map[position, List[name]]] groupedEmployees.

实现代码如下所示。

public class CollectorsGroupingbyDemo {

public static void main(String[]args) {

List<Employee> employList = Arrays.asList(
new Employee("su", "mid", "engine"),
new Employee("lan", "mid", "prod"),
new Employee("qin", "data", "engine"),
new Employee("yu", "mid", "engine"),
new Employee("ming", "data", "engine")
    );


// Map[department, Map[position, List[name]]]
    Map<String, Map<String, List<String>>> groupedEmployees =
        employList.stream().collect(
            Collectors.groupingBy(Employee::getDepartment,
                Collectors.groupingBy(Employee::getPosition, new EmployNameListCollector())
        ));

    System.out.println("groupedEmployees: " + groupedEmployees);

  }

}

class EmployNameListCollector implements Collector<Employee,List<String>,List<String>> {

@Override
public Supplier<List<String>> supplier() {
return () -> new ArrayList<>();
  }

@Override
public BiConsumer<List<String>, Employee> accumulator() {
return (list, e) -> list.add(e.getName());
  }

@Override
public BinaryOperator<List<String>> combiner() {
return (list1, list2) -> { list1.addAll(list2); return list1; };
  }

@Override
public Function<List<String>, List<String>> finisher() {
return i->i;
  }

@Override
public Set<Characteristics> characteristics() {
return Collections.emptySet();
  }
}


@AllArgsConstructor
@Data
class Employee {
private String name;
private String department;
private String position;

}

解读如下:

STEP1: 首先根据 department 分组。使用 groupingby(Employee::getDepartment, positionEmployeeMapCollector) ;需要实现 positionEmployeeMapCollector;

STEP2: 现在得到的是 Stream[Employee] 。根据 position 分组, 使用 Collectors.groupingBy(Employee::getPosition, employNameListCollector) ,需要实现 employNameListCollector ;

STEP3:现在得到的是 Stream[Employee] , 要得到 List[String] 。显然,如果要得到 List[Employee] ,只需要使用 Collectors.toList() 即可;但是现在要拿到 List[String]。可以仿照 Collectors.toList() 的实现,自定义一个 EmployNameListCollector 。EmployNameListCollector 与 Collectors.toList() 的区别仅在于 要将 employee.getName() 加到 list 。其它的几乎一样。

通过编写自定义的 Collector ,可以加深对 Collector 的理解。

小结

本文讲解了 JavaStream 三要素中的两剑客:Function 和 Collector ,重点介绍了 Collector 的各种实现。Collector 的实现已经很抽象了,这说明了:抽象的层次越高,编程就越接近于数学。

本文来源于:Java函数编程探秘:潜入JavaStream(上)-变化吧门户
特别声明:以上文章内容仅代表作者本人观点,不代表变化吧门户观点或立场。如有关于作品内容、版权或其它问题请于作品发表后的30日内与变化吧联系。

  • 赞助本站
  • 微信扫一扫
  • weinxin
  • 加入Q群
  • QQ扫一扫
  • weinxin
二叶草
Go语言中的常量 函数代码

Go语言中的常量

1 概述 常量,一经定义不可更改的量。功能角度看,当出现不需要被更改的数据时,应该使用常量进行存储,例如圆周率。从语法的角度看,使用常量可以保证数据,在整个运行期间内,不会被更改。例如当前处理器的架构...
Go语言的接口 函数代码

Go语言的接口

Go语言-接口 在Go语言中,一个接口类型总是代表着某一种类型(即所有实现它的类型)的行为。一个接口类型的声明通常会包含关键字type、类型名称、关键字interface以及由花括号包裹的若干方法声明...
Go语言支持的正则语法 函数代码

Go语言支持的正则语法

1 字符 语法 说明 . 任意字符,在单行模式(s标志)下,也可以匹配换行 字符类 否定字符类 d Perl 字符类 D 否定 Perl 字符类 ASCII 字符类 否定 ASCII 字符类 pN U...
Go语言的包管理 函数代码

Go语言的包管理

1 概述 Go 语言的源码复用建立在包(package)基础之上。包通过 package, import, GOPATH 操作完成。 2 main包 Go 语言的入口 main() 函数所在的包(pa...

发表评论