DataStream API【3】
Sink数据输出
Flink可以使用DataStream API将数据流输出到文件、Socket、外部系统等。Flink自带了各种内置的输出格式,说明如下。
writeAsText():将元素转为String类型按行写入外部输出。String类型是通过调用每个元素的toString()方法获得的。
writeToSocket():将元素写入Socket。
writeAsCsv():将元组写入以逗号分隔的文本文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
addSink():调用自定义接收函数。Flink可以与其他系统(如Apache Kafka)的连接器集成在一起,这些系统已经实现了自定义接收函数。
分区策略
数据在算子之间流动需要依靠分区策略(分区器),Flink目前内置了8种已实现的分区策略和1种自定义分区策略。已实现的分区策略对应的API为:
BinaryHashPartitioner
BroadcastPartitioner
ForwardPartitioner
GlobalPartitioner
KeyGroupStreamPartitioner
RebalancePartitioner
RescalePartitioner
ShufflePartitioner
自定义分区策略的API为CustomPartitionerWrapper。
1. BinaryHashPartitioner
该分区策略位于Blink的Table API的org.apache.flink.table.runtime.partitioner包中,是一种针对BinaryRowData的哈希分区器。BinaryRowData是RowData的实现,可以显著减少Java对象的序列化/反序列化。RowData用于表示结构化数据类型,运行时通过Table API或SQL管道传递的所有顶级记录都是RowData的实例。关于BinaryHashPartitioner,此处不做过多讲解。
2. BroadcastPartitioner
广播分区策略将上游数据记录输出到下游算子的每个并行实例中,即下游每个分区都会有上游的所有数据。使用DataStream的broadcast()方法即可设置该DataStream向下游发送数据时使用广播分区策略,Java代码如下:
dataStream.broadcast()
3. ForwardPartitioner
转发分区策略只将元素转发给本地运行的下游算子的实例,即将元素发送到与当前算子实例在同一个TaskManager的下游算子实例,而不需要进行网络传输。要求上下游算子并行度一样,这样上下游算子可以同属一个子任务。
使用DataStream的forward()方法即可设置该DataStream向下游发送数据时使用转发分区策略,Java代码如下:
dataStream.forward()
4. GlobalPartitioner
全局分区策略将上游所有元素发送到下游子任务编号等于0的分区算子实例上(下游第一个实例)。
使用DataStream的global()方法即可设置该DataStream向下游发送数据时使用全局分区策略,Java代码如下:
dataStream.global()
5. KeyGroupStreamPartitioner
Key分区策略根据元素Key的Hash值输出到下游算子指定的实例。keyBy()算子底层正是使用的该分区策略,底层最终会调用KeyGroupStreamPartitioner的selectChannel()方法,计算每个Key对应的通道索引(通道编号,可理解为分区编号),根据通道索引将Key发送到下游相应的分区中。
6. RebalancePartitioner
平衡分区策略使用循环遍历下游分区的方式,将上游元素均匀分配给下游算子的每个实例。每个下游算子的实例都具有相等的负载。当数据流中的元素存在数据倾斜时,使用该策略对性能有很大的提升。
使用DataStream的rebalance()方法即可设置该DataStream向下游发送数据时使用平衡分区策略,Java代码如下:
dataStream.rebalance()
7. RescalePartitioner
重新调节分区策略基于上下游算子的并行度,将元素以循环的方式输出到下游算子的每个实例。类似于平衡分区策略,但又与平衡分区策略不同。
上游算子将元素发送到下游哪一个算子实例,取决于上游和下游算子的并行度。例如,如果上游算子的并行度为2,而下游算子的并行度为4,那么一个上游算子实例将把元素均匀分配给两个下游算子实例,而另一个上游算子实例将把元素均匀分配给另外两个下游算子实例。相反,如果下游算子的并行度为2,而上游算子的并行度为4,那么两个上游算子实例将分配给一个下游算子实例,而另外两个上游算子实例将分配给另一个下游算子实例。
假设上游算子并行度为2,分区编号为A和B,下游算子并行度为4,分区编号为1、2、3、4,那么A将把数据循环发送给1和2,B则把数据循环发送给3和4。假设上游算子并行度为4,编号为A、B、C、D,下游算子并行度为2,编号为1、2,那么A和B把数据发送给1,C和D则把数据发送给2。
使用DataStream的rescale()方法即可设置该DataStream向下游发送数据时使用重新调节分区策略,Java代码如下:
dataStream.rescale()
如果想将元素均匀地输出到下游算子的每个实例,以实现负载均衡,同时又不希望使用平衡分区策略的全局负载均衡,则可以使用重新调节分区策略。该策略会尽可能避免数据在网络间传输,而能否避免还取决于TaskManager的Task Slot数量、上下游算子的并行度等。
8. ShufflePartitioner
随机分区策略将上游算子元素输出到下游算子的随机实例中。元素会被均匀分配到下游算子的每个实例。这种策略可以实现计算任务的负载均衡。
使用DataStream的shuffle()方法即可设置该DataStream向下游发送数据时使用随机分区策略,Java代码如下:
dataStream.shuffle()
DataStream API【3】
Sink数据输出
Flink可以使用DataStream API将数据流输出到文件、Socket、外部系统等。Flink自带了各种内置的输出格式,说明如下。
writeAsText():将元素转为String类型按行写入外部输出。String类型是通过调用每个元素的toString()方法获得的。
writeToSocket():将元素写入Socket。
writeAsCsv():将元组写入以逗号分隔的文本文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
addSink():调用自定义接收函数。Flink可以与其他系统(如Apache Kafka)的连接器集成在一起,这些系统已经实现了自定义接收函数。
分区策略
数据在算子之间流动需要依靠分区策略(分区器),Flink目前内置了8种已实现的分区策略和1种自定义分区策略。已实现的分区策略对应的API为:
BinaryHashPartitioner
BroadcastPartitioner
ForwardPartitioner
GlobalPartitioner
KeyGroupStreamPartitioner
RebalancePartitioner
RescalePartitioner
ShufflePartitioner
自定义分区策略的API为CustomPartitionerWrapper。
1. BinaryHashPartitioner
该分区策略位于Blink的Table API的org.apache.flink.table.runtime.partitioner包中,是一种针对BinaryRowData的哈希分区器。BinaryRowData是RowData的实现,可以显著减少Java对象的序列化/反序列化。RowData用于表示结构化数据类型,运行时通过Table API或SQL管道传递的所有顶级记录都是RowData的实例。关于BinaryHashPartitioner,此处不做过多讲解。
2. BroadcastPartitioner
广播分区策略将上游数据记录输出到下游算子的每个并行实例中,即下游每个分区都会有上游的所有数据。使用DataStream的broadcast()方法即可设置该DataStream向下游发送数据时使用广播分区策略,Java代码如下:
dataStream.broadcast()
3. ForwardPartitioner
转发分区策略只将元素转发给本地运行的下游算子的实例,即将元素发送到与当前算子实例在同一个TaskManager的下游算子实例,而不需要进行网络传输。要求上下游算子并行度一样,这样上下游算子可以同属一个子任务。
使用DataStream的forward()方法即可设置该DataStream向下游发送数据时使用转发分区策略,Java代码如下:
dataStream.forward()
4. GlobalPartitioner
全局分区策略将上游所有元素发送到下游子任务编号等于0的分区算子实例上(下游第一个实例)。
使用DataStream的global()方法即可设置该DataStream向下游发送数据时使用全局分区策略,Java代码如下:
dataStream.global()
5. KeyGroupStreamPartitioner
Key分区策略根据元素Key的Hash值输出到下游算子指定的实例。keyBy()算子底层正是使用的该分区策略,底层最终会调用KeyGroupStreamPartitioner的selectChannel()方法,计算每个Key对应的通道索引(通道编号,可理解为分区编号),根据通道索引将Key发送到下游相应的分区中。
6. RebalancePartitioner
平衡分区策略使用循环遍历下游分区的方式,将上游元素均匀分配给下游算子的每个实例。每个下游算子的实例都具有相等的负载。当数据流中的元素存在数据倾斜时,使用该策略对性能有很大的提升。
使用DataStream的rebalance()方法即可设置该DataStream向下游发送数据时使用平衡分区策略,Java代码如下:
dataStream.rebalance()
7. RescalePartitioner
重新调节分区策略基于上下游算子的并行度,将元素以循环的方式输出到下游算子的每个实例。类似于平衡分区策略,但又与平衡分区策略不同。
上游算子将元素发送到下游哪一个算子实例,取决于上游和下游算子的并行度。例如,如果上游算子的并行度为2,而下游算子的并行度为4,那么一个上游算子实例将把元素均匀分配给两个下游算子实例,而另一个上游算子实例将把元素均匀分配给另外两个下游算子实例。相反,如果下游算子的并行度为2,而上游算子的并行度为4,那么两个上游算子实例将分配给一个下游算子实例,而另外两个上游算子实例将分配给另一个下游算子实例。
假设上游算子并行度为2,分区编号为A和B,下游算子并行度为4,分区编号为1、2、3、4,那么A将把数据循环发送给1和2,B则把数据循环发送给3和4。假设上游算子并行度为4,编号为A、B、C、D,下游算子并行度为2,编号为1、2,那么A和B把数据发送给1,C和D则把数据发送给2。
使用DataStream的rescale()方法即可设置该DataStream向下游发送数据时使用重新调节分区策略,Java代码如下:
dataStream.rescale()
如果想将元素均匀地输出到下游算子的每个实例,以实现负载均衡,同时又不希望使用平衡分区策略的全局负载均衡,则可以使用重新调节分区策略。该策略会尽可能避免数据在网络间传输,而能否避免还取决于TaskManager的Task Slot数量、上下游算子的并行度等。
8. ShufflePartitioner
随机分区策略将上游算子元素输出到下游算子的随机实例中。元素会被均匀分配到下游算子的每个实例。这种策略可以实现计算任务的负载均衡。
使用DataStream的shuffle()方法即可设置该DataStream向下游发送数据时使用随机分区策略,Java代码如下:
dataStream.shuffle()