admin管理员组

文章数量:1441402

Apache Flink with Java 简介

1. 概述

Apache Flink是一个大数据处理框架,允许程序员以非常高效和可扩展的方式处理大量数据。

在本文中,我们将介绍Apache FlinkJava API 中提供的一些核心 API 概念和标准数据转换。这个 API 的流畅风格使得使用 Flink 的中心结构——分布式集合变得容易。

首先,我们将看一下 Flink 的DataSetAPI 转换,并使用它们来实现字数统计程序。然后我们将简要介绍一下 Flink 的DataStreamAPI,它允许您以实时方式处理事件流。

2. Maven 依赖

首先,我们需要将 Maven 依赖项添加到 flink-java 和flink-test-utils库中:

代码语言:javascript代码运行次数:0运行复制
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-test-utils_2.10</artifactId>
    <version>1.2.0</version>
    <scope>test<scope>
</dependency>Copy

3. 核心 API 概念

在使用 Flink 时,我们需要了解与其 API 相关的几件事:

  • 每个 Flink 程序都对分布式数据集合执行转换。提供了多种数据转换功能,包括过滤、映射、联接、分组和聚合
  • Flink 中的sink操作会触发流的执行以产生程序的预期结果,例如将结果保存到文件系统或将其打印到标准输出
  • Flink 转换是惰性的,这意味着在调用接收器操作之前不会执行它们
  • Apache Flink API 支持两种操作模式——批处理和实时。如果要处理可在批处理模式下处理的有限数据源,则将使用数据集API。如果要实时处理无限数据流,则需要使用DataStreamAPI

4. 数据集 API 转换

Flink 程序的入口点是ExecutionEnvironment类的一个实例——它定义了程序执行的上下文。

让我们创建一个执行环境来开始我们的处理:

代码语言:javascript代码运行次数:0运行复制
ExecutionEnvironment env
  = ExecutionEnvironment.getExecutionEnvironment();Copy

请注意,当您在本地机器上启动应用程序时,它将在本地 JVM 上执行处理。如果要在计算机集群上开始处理,则需要在这些计算机上安装Apache Flink并相应地配置执行环境

4.1. 创建DataSet

要开始执行数据转换,我们需要为程序提供数据。

让我们使用我们的执行环境创建DataSet类的实例:

代码语言:javascript代码运行次数:0运行复制
DataSet<Integer> amounts = env.fromElements(1, 29, 40, 50);Copy

您可以从多个源创建数据集,例如 Apache Kafka、CSV、文件或几乎任何其他数据源。

4.2. Filter和Reduce

创建DataSet类的实例后,可以对其应用转换。

假设您要过滤超过某个阈值的数字,然后将它们全部相加您可以使用filter() 和reduce() 转换来实现此目的:

代码语言:javascript代码运行次数:0运行复制
int threshold = 30;
List<Integer> collect = amounts
  .filter(a -> a > threshold)
  .reduce((integer, t1) -> integer + t1)
  .collect();

assertThat(collect.get(0)).isEqualTo(90);
Copy

请注意,collect() 方法是触发实际数据转换的接收器操作。

4.3. map

假设您有一个Person对象的数据集

代码语言:javascript代码运行次数:0运行复制
private static class Person {
    private int age;
    private String name;

    // standard constructors/getters/setters
}Copy

接下来,让我们创建这些对象的数据集

代码语言:javascript代码运行次数:0运行复制
DataSet<Person> personDataSource = env.fromCollection(
  Arrays.asList(
    new Person(23, "Tom"),
    new Person(75, "Michael")));Copy

假设您只想从集合的每个对象中提取年龄字段。您可以使用map() 转换来仅获取Person类的特定字段:

代码语言:javascript代码运行次数:0运行复制
List<Integer> ages = personDataSource
  .map(p -> p.age)
  .collect();

assertThat(ages).hasSize(2);
assertThat(ages).contains(23, 75);Copy

4.4. Join

当您有两个数据集时,您可能希望将它们连接到某个id字段上。为此,您可以使用join() 转换。

让我们创建用户的交易和地址的集合:

代码语言:javascript代码运行次数:0运行复制
Tuple3<Integer, String, String> address
  = new Tuple3<>(1, "5th Avenue", "London");
DataSet<Tuple3<Integer, String, String>> addresses
  = env.fromElements(address);

Tuple2<Integer, String> firstTransaction 
  = new Tuple2<>(1, "Transaction_1");
DataSet<Tuple2<Integer, String>> transactions 
  = env.fromElements(firstTransaction, new Tuple2<>(12, "Transaction_2"));
Copy

两个元组中的第一个字段都是Integer类型,这是一个id字段,我们希望在其上连接两个数据集。

为了执行实际的连接逻辑,我们需要为地址和事务实现一个KeySelector接口:

代码语言:javascript代码运行次数:0运行复制
private static class IdKeySelectorTransaction 
  implements KeySelector<Tuple2<Integer, String>, Integer> {
    @Override
    public Integer getKey(Tuple2<Integer, String> value) {
        return value.f0;
    }
}
private static class IdKeySelectorAddress 
  implements KeySelector<Tuple3<Integer, String, String>, Integer> {
    @Override
    public Integer getKey(Tuple3<Integer, String, String> value) {
        return value.f0;
    }
}Copy

每个选择器仅返回应对其执行连接的字段。

不幸的是,这里不能使用 lambda 表达式,因为 Flink 需要泛型类型信息。

接下来,让我们使用这些选择器实现合并逻辑:

代码语言:javascript代码运行次数:0运行复制
List<Tuple2<Tuple2<Integer, String>, Tuple3<Integer, String, String>>>
  joined = transactions.join(addresses)
  .where(new IdKeySelectorTransaction())
  .equalTo(new IdKeySelectorAddress())
  .collect();

assertThat(joined).hasSize(1);
assertThat(joined).contains(new Tuple2<>(firstTransaction, address));

Copy

4.5. Sort

假设您有以下Tuple2 集合:

代码语言:javascript代码运行次数:0运行复制
Tuple2<Integer, String> secondPerson = new Tuple2<>(4, "Tom");
Tuple2<Integer, String> thirdPerson = new Tuple2<>(5, "Scott");
Tuple2<Integer, String> fourthPerson = new Tuple2<>(200, "Michael");
Tuple2<Integer, String> firstPerson = new Tuple2<>(1, "Jack");
DataSet<Tuple2<Integer, String>> transactions = env.fromElements(
  fourthPerson, secondPerson, thirdPerson, firstPerson);

Copy

如果要按元组的第一个字段对此集合进行排序,可以使用sortPartitions()转换:

代码语言:javascript代码运行次数:0运行复制
List<Tuple2<Integer, String>> sorted = transactions
  .sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING)
  .collect();

assertThat(sorted)
  .containsExactly(firstPerson, secondPerson, thirdPerson, fourthPerson);Copy

5. 字数统计

字数统计问题通常用于展示大数据处理框架的功能。基本解决方案涉及计算文本输入中的单词出现次数。让我们使用 Flink 来实现这个问题的解决方案。

作为解决方案的第一步,我们创建一个LineSplitter类,该类将我们的输入拆分为标记(单词),为每个标记收集键值对的Tuple2。在每个元组中,键是在文本中找到的单词,值是整数 (1)。

此类实现FlatMapFunction接口,该接口将字符串作为输入并生成Tuple2<String, Integer>:

代码语言:javascript代码运行次数:0运行复制
public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        Stream.of(value.toLowerCase().split("\\W+"))
          .filter(t -> t.length() > 0)
          .forEach(token -> out.collect(new Tuple2<>(token, 1)));
    }
}Copy

我们在Collector类上调用collect() 方法,以在处理管道中向前推送数据。

我们的下一步也是最后一步是按元组的第一个元素(单词)对元组进行分组,然后对第二个元素执行总和聚合以生成单词出现的计数:

代码语言:javascript代码运行次数:0运行复制
public static DataSet<Tuple2<String, Integer>> startWordCount(
  ExecutionEnvironment env, List<String> lines) throws Exception {
    DataSet<String> text = env.fromCollection(lines);

    return text.flatMap(new LineSplitter())
      .groupBy(0)
      .aggregate(Aggregations.SUM, 1);
}Copy

我们使用三种类型的 Flink 转换:flatMap()、groupBy()aggregate()。

让我们编写一个测试来断言字数统计实现按预期工作:

代码语言:javascript代码运行次数:0运行复制
List<String> lines = Arrays.asList(
  "This is a first sentence",
  "This is a second sentence with a one word");

DataSet<Tuple2<String, Integer>> result = WordCount.startWordCount(env, lines);

List<Tuple2<String, Integer>> collect = result.collect();
 
assertThat(collect).containsExactlyInAnyOrder(
  new Tuple2<>("a", 3), new Tuple2<>("sentence", 2), new Tuple2<>("word", 1),
  new Tuple2<>("is", 2), new Tuple2<>("this", 2), new Tuple2<>("second", 1),
  new Tuple2<>("first", 1), new Tuple2<>("with", 1), new Tuple2<>("one", 1));Copy

6. 数据流接口

6.1. 创建数据流

Apache Flink 还支持通过其 DataStream API 处理事件流。如果我们想开始使用事件,我们首先需要使用StreamExecutionEnvironment类:

代码语言:javascript代码运行次数:0运行复制
StreamExecutionEnvironment executionEnvironment
 = StreamExecutionEnvironment.getExecutionEnvironment();Copy

接下来,我们可以使用来自各种来源的executionEnvironment创建事件流。它可以是像Apache Kafka 这样的消息总线,但在这个例子中,我们将简单地从几个字符串元素创建一个源:

代码语言:javascript代码运行次数:0运行复制
DataStream<String> dataStream = executionEnvironment.fromElements(
  "This is a first sentence", 
  "This is a second sentence with a one word");Copy

我们可以像在普通的 DataSet类中一样将转换应用于DataStream的每个元素:

代码语言:javascript代码运行次数:0运行复制
SingleOutputStreamOperator<String> upperCase = text.map(String::toUpperCase);Copy

要触发执行,我们需要调用一个接收器操作,例如print(),该操作只会将转换的结果打印到标准输出中,然后是StreamExecutionEnvironment类上的execute()方法:

代码语言:javascript代码运行次数:0运行复制
upperCase.print();
env.execute();Copy

它将生成以下输出:

代码语言:javascript代码运行次数:0运行复制
1> THIS IS A FIRST SENTENCE
2> THIS IS A SECOND SENTENCE WITH A ONE WORDCopy

6.2. 事件的窗口化

实时处理事件流时,有时可能需要将事件分组在一起,并在这些事件的窗口上应用一些计算。

假设我们有一个事件流,其中每个事件都是一对,由事件编号和事件发送到我们系统时的时间戳组成,我们可以容忍无序事件,但前提是它们迟到不超过二十秒。

对于此示例,让我们首先创建一个模拟相隔几分钟的两个事件的流,并定义一个时间戳提取器来指定我们的延迟阈值:

代码语言:javascript代码运行次数:0运行复制
SingleOutputStreamOperator<Tuple2<Integer, Long>> windowed
  = env.fromElements(
  new Tuple2<>(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()),
  new Tuple2<>(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond()))
  .assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessTimestampExtractor
      <Tuple2<Integer, Long>>(Time.seconds(20)) {
 
        @Override
        public long extractTimestamp(Tuple2<Integer, Long> element) {
          return element.f1 * 1000;
        }
    });Copy

接下来,让我们定义一个窗口操作,将事件分组为五秒窗口,并对这些事件应用转换:

代码语言:javascript代码运行次数:0运行复制
SingleOutputStreamOperator<Tuple2<Integer, Long>> reduced = windowed
  .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
  .maxBy(0, true);
reduced.print();Copy

它将每五秒获得窗口的最后一个元素,因此它会打印出来:

代码语言:javascript代码运行次数:0运行复制
1> (15,1491221519)Copy

请注意,我们看不到第二个事件,因为它的到达时间晚于指定的延迟阈值。

7. 结论

在本文中,我们介绍了 Apache Flink 框架,并查看了其 API 提供的一些转换。

我们使用 Flink 流畅且实用的 DataSet API 实现了一个字数统计程序。然后,我们查看了 DataStream API,并对事件流实现了简单的实时转换。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2023-02-24,如有侵权请联系 cloudcommunity@tencent 删除教程事件javaapacheflink

本文标签: Apache Flink with Java 简介