Flink Table 和 DataStream 转换
文章目录
- Flink Table 和 DataStream 转换
- 1. 表(Table) 转换为 流(DataStream)
- 1.1 处理(仅插入)流
- 1.1.1 fromDataStream()方法:
- 1.1.1.1 fromDataStream(DataStream var1)
- 1.1.1.2 fromDataStream(DataStream var1, Expression... var2)
- 1.1.1.3 fromDataStream(DataStream var1, Schema var2)
- 1.1.2 createTemporaryView()方法:
- 1.2 处理变更日志流
- 1.2.1 fromChangelogStream ()方法
- 2. 流(DataStream) 转换为 表(Table)
- 2.1 处理(仅插入)流
- 2.1.1 toDataStream()方法:
- 2.2 处理变更日志流
- 2.2.1 toChangelogStream()方法:
- 2.2.1.1 toChangelogStream(Table var1)
- 2.2.1.2 toChangelogStream(Table var1, Schema var2)
Flink Table 和 DataStream 转换
Flink官方文档:官方文档
1. 表(Table) 转换为 流(DataStream)
1.1 处理(仅插入)流
1.1.1 fromDataStream()方法:
/**
* 将仅插入更改和任意类型的流解释为表。默认情况下不传播事件时间和水印
*/<T> Table fromDataStream(DataStream<T> var1);/**
* 将仅插入更改和任意类型的流解释为表。默认情况下不传播事件时间和水印
*/
<T> Table fromDataStream(DataStream<T> var1, Expression... var2);/**
* 将仅插入更改和任意类型的流解释为表。可选模式允许丰富列数据类型并添加时间属性、水印策略、其他计算列或主键。
*/
<T> Table fromDataStream(DataStream<T> var1, Schema var2);
官方示例代码:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import java.time.Instant;// some example POJO
public static class User {public String name;public Integer score;public Instant event_time;// default constructor for DataStream APIpublic User() {}// fully assigning constructor for Table APIpublic User(String name, Integer score, Instant event_time) {this.name = name;this.score = score;this.event_time = event_time;}
}// create a DataStream
DataStream<User> dataStream =env.fromElements(new User("Alice", 4, Instant.ofEpochMilli(1000)),new User("Bob", 6, Instant.ofEpochMilli(1001)),new User("Alice", 10, Instant.ofEpochMilli(1002)));// === EXAMPLE 1 ===// derive all physical columns automatically
// 示例 1 说明了一个不需要基于时间的操作的简单用例
Table table = tableEnv.fromDataStream(dataStream);
table.printSchema();
// prints:
// (
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9)
// )// === EXAMPLE 2 ===// derive all physical columns automatically
// but add computed columns (in this case for creating a proctime attribute column)
// 示例 2 是这些基于时间的操作应该在处理时间内工作的最常见用例。
Table table = tableEnv.fromDataStream(dataStream,Schema.newBuilder().columnByExpression("proc_time", "PROCTIME()").build());
table.printSchema();
// prints:
// (
// `name` STRING,
// `score` INT NOT NULL,
// `event_time` TIMESTAMP_LTZ(9),
// `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME()
//)// === EXAMPLE 3 ===// derive all physical columns automatically
// but add computed columns (in this case for creating a rowtime attribute column)
// and a custom watermark strategyTable table =tableEnv.fromDataStream(dataStream,Schema.newBuilder().columnByExpression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))").watermark("rowtime", "rowtime - INTERVAL '10' SECOND").build());
table.printSchema();
// prints:
// (
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9),
// `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* AS CAST(event_time AS TIMESTAMP_LTZ(3)),
// WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS rowtime - INTERVAL '10' SECOND
// )// === EXAMPLE 4 ===// derive all physical columns automatically
// but access the stream record's timestamp for creating a rowtime attribute column
// also rely on the watermarks generated in the DataStream API// we assume that a watermark strategy has been defined for `dataStream` before
// (not part of this example)
// 示例 4 是最常见的用例,当基于时间的操作(例如窗口或间隔连接)应成为管道的一部分时.
Table table =tableEnv.fromDataStream(dataStream,Schema.newBuilder().columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)").watermark("rowtime", "SOURCE_WATERMARK()").build());
table.printSchema();
// prints:
// (
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9),
// `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA,
// WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK()
// )// === EXAMPLE 5 ===// define physical columns manually
// in this example,
// - we can reduce the default precision of timestamps from 9 to 3
// - we also project the columns and put `event_time` to the beginning
// 示例 5 完全依赖于用户的声明
Table table =tableEnv.fromDataStream(dataStream,Schema.newBuilder().column("event_time", "TIMESTAMP_LTZ(3)").column("name", "STRING").column("score", "INT").watermark("event_time", "SOURCE_WATERMARK()").build());
table.printSchema();
// prints:
// (
// `event_time` TIMESTAMP_LTZ(3) *ROWTIME*,
// `name` VARCHAR(200),
// `score` INT
// )
// note: the watermark strategy is not shown due to the inserted column reordering projection// DataTypes使用,由于DataTypes要比TypeInformation更灵活
Table table = tableEnv.fromDataStream(dataStream,Schema.newBuilder().column("f0", DataTypes.of(User.class)).build()).as("user");
table.printSchema();
// prints:
// (
// `user` *User<`name` STRING,`score` INT>*
// )// data types can be extracted reflectively as above or explicitly definedTable table3 = tableEnv.fromDataStream(dataStream,Schema.newBuilder().column("f0",DataTypes.STRUCTURED(User.class,DataTypes.FIELD("name", DataTypes.STRING()),DataTypes.FIELD("score", DataTypes.INT()))).build()).as("user");
table.printSchema();
// prints:
// (
// `user` *User<`name` STRING,`score` INT>*
// )
以上三种方式的示例代码:
1.1.1.1 fromDataStream(DataStream var1)
package com.ali.flink.demo.driver;import com.ali.flink.demo.bean.Event;
import com.ali.flink.demo.utils.DataGeneratorImpl003;
import com.ali.flink.demo.utils.FlinkEnv;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkTableTransToStreamDemo001 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = FlinkEnv.FlinkDataStreamRunEnv();env.setParallelism(1);StreamTableEnvironment tableEnv = FlinkEnv.getStreamTableEnv(env);DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new DataGeneratorImpl003());DataStream<String> sourceStream = env.addSource(dataGeneratorSource).returns(String.class);// sourceStream.print("source");// map函数将数据转换为POJO类DataStream<Event> mapStream = sourceStream.map(new MapFunction<String, Event>() {@Overridepublic Event map(String s) throws Exception {JSONObject jsonObject = JSON.parseObject(s);String name = jsonObject.getString("name");JSONObject title = jsonObject.getJSONObject("title");String title_name = title.getString("title_name");int title_number = title.getIntValue("title_number");JSONArray user_info = jsonObject.getJSONArray("user_info");String address = user_info.getJSONObject(0).getString("address");JSONObject time_info = jsonObject.getJSONObject("time_info");long timestamp = time_info.getLongValue("timestamp");return new Event(name, title.toJSONString(), title_name, title_number, user_info.toJSONString(), address, time_info.toJSONString(), timestamp);}}).returns(Event.class);mapStream.print("map source");// 将POJO类的数据流转换为Table,由于是POJO类的数据流,所以转换后的Table的column信息跟POJO类属性信息一致Table table = tableEnv.fromDataStream(mapStream);// 创建临时表,表名为sourcetableEnv.createTemporaryView("source", table);// 执行sql,这里可以直接使用POJO类属性名来查询Table result = tableEnv.sqlQuery("select name, title_name, address from source");// 将结果表转换为DataStream来输出tableEnv.toDataStream(result).print("result");env.execute("job start");}
}-------------------------结果-----------------------------
map source> Event{name='Tom1', title={"title_number":3,"title_name":"表情包"}, title_name='表情包', title_number=3, user_info=[{"address":"北京市","city":"beijing"},{"address":"上海市","city":"shanghai"}], address='北京市', time_info={"timestamp":1657332118000}, timestamp='1657332118000'}
result> +I[Tom1, 表情包, 北京市]
1.1.1.2 fromDataStream(DataStream var1, Expression… var2)
package com.ali.flink.demo.driver;import com.ali.flink.demo.bean.Event;
import com.ali.flink.demo.utils.DataGeneratorImpl003;
import com.ali.flink.demo.utils.FlinkEnv;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import static org.apache.flink.table.api.Expressions.$;public class FlinkTableTransToStreamDemo002 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = FlinkEnv.FlinkDataStreamRunEnv();env.setParallelism(1);StreamTableEnvironment tableEnv = FlinkEnv.getStreamTableEnv(env);DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new DataGeneratorImpl003());DataStream<String> sourceStream = env.addSource(dataGeneratorSource).returns(String.class);// sourceStream.print("source");DataStream<Event> mapStream = sourceStream.map(new MapFunction<String, Event>() {@Overridepublic Event map(String s) throws Exception {JSONObject jsonObject = JSON.parseObject(s);String name = jsonObject.getString("name");JSONObject title = jsonObject.getJSONObject("title");String title_name = title.getString("title_name");int title_number = title.getIntValue("title_number");JSONArray user_info = jsonObject.getJSONArray("user_info");String address = user_info.getJSONObject(0).getString("address");JSONObject time_info = jsonObject.getJSONObject("time_info");long timestamp = time_info.getLongValue("timestamp");return new Event(name, title.toJSONString(), title_name, title_number, user_info.toJSONString(), address, time_info.toJSONString(), timestamp);}}).returns(Event.class);mapStream.print("map source");// 指定column字段,并可以通过as方法来重命名Table table = tableEnv.fromDataStream(mapStream, $("name").as("username"), $("address"));// 创建临时表,表名为sourcetableEnv.createTemporaryView("source", table);// 执行sql,查询表Table result = tableEnv.sqlQuery("select username, address from source");// 将结果表转换为DataStream来输出tableEnv.toDataStream(result).print("result");env.execute("job start");}
}-------------------------结果-----------------------------
map source> Event{name='Tom3', title={"title_number":3,"title_name":"表情包"}, title_name='表情包', title_number=3, user_info=[{"address":"北京市","city":"beijing"},{"address":"上海市","city":"shanghai"}], address='北京市', time_info={"timestamp":1657332118000}, timestamp='1657332118000'}
result> +I[Tom3, 北京市]
1.1.1.3 fromDataStream(DataStream var1, Schema var2)
package com.ali.flink.demo.driver;import com.ali.flink.demo.bean.Event;
import com.ali.flink.demo.utils.DataGeneratorImpl003;
import com.ali.flink.demo.utils.FlinkEnv;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkTableTransToStreamDemo003 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = FlinkEnv.FlinkDataStreamRunEnv();env.setParallelism(1);StreamTableEnvironment tableEnv = FlinkEnv.getStreamTableEnv(env);DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new DataGeneratorImpl003());DataStream<String> sourceStream = env.addSource(dataGeneratorSource).returns(String.class);// sourceStream.print("source");DataStream<Event> mapStream = sourceStream.map(new MapFunction<String, Event>() {@Overridepublic Event map(String s) throws Exception {JSONObject jsonObject = JSON.parseObject(s);String name = jsonObject.getString("name");JSONObject title = jsonObject.getJSONObject("title");String title_name = title.getString("title_name");int title_number = title.getIntValue("title_number");JSONArray user_info = jsonObject.getJSONArray("user_info");String address = user_info.getJSONObject(0).getString("address");JSONObject time_info = jsonObject.getJSONObject("time_info");long timestamp = time_info.getLongValue("timestamp");return new Event(name, title.toJSONString(), title_name, title_number, user_info.toJSONString(), address, time_info.toJSONString(), timestamp);}}).returns(Event.class);mapStream.print("map source");// 定义SchemaTable table = tableEnv.fromDataStream(mapStream,Schema.newBuilder().column("name", "string").column("timestamp", "bigint").build());// 创建临时表,表名为sourcetableEnv.createTemporaryView("source", table);// 执行sql,查询表Table result = tableEnv.sqlQuery("select name, `timestamp` from source");// 将结果表转换为DataStream来输出tableEnv.toDataStream(result).print("result");env.execute("job start");}
}-------------------------结果-----------------------------
map source> Event{name='Tom3', title={"title_number":3,"title_name":"表情包"}, title_name='表情包', title_number=3, user_info=[{"address":"北京市","city":"beijing"},{"address":"上海市","city":"shanghai"}], address='北京市', time_info={"timestamp":1657332118000}, timestamp='1657332118000'}
result> +I[Tom3, 1657332118000]
1.1.2 createTemporaryView()方法:
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;// create some DataStream
DataStream<Tuple2<Long, String>> dataStream = env.fromElements(Tuple2.of(12L, "Alice"),Tuple2.of(0L, "Bob"));// === EXAMPLE 1 ===// register the DataStream as view "MyView" in the current session
// all columns are derived automaticallytableEnv.createTemporaryView("MyView", dataStream);tableEnv.from("MyView").printSchema();// prints:
// (
// `f0` BIGINT NOT NULL,
// `f1` STRING
// )// === EXAMPLE 2 ===// register the DataStream as view "MyView" in the current session,
// provide a schema to adjust the columns similar to `fromDataStream`// in this example, the derived NOT NULL information has been removedtableEnv.createTemporaryView("MyView",dataStream,Schema.newBuilder().column("f0", "BIGINT").column("f1", "STRING").build());tableEnv.from("MyView").printSchema();// prints:
// (
// `f0` BIGINT,
// `f1` STRING
// )// === EXAMPLE 3 ===// use the Table API before creating the view if it is only about renaming columnstableEnv.createTemporaryView("MyView",tableEnv.fromDataStream(dataStream).as("id", "name"));tableEnv.from("MyView").printSchema();// prints:
// (
// `id` BIGINT NOT NULL,
// `name` STRING
// )
1.2 处理变更日志流
1.2.1 fromChangelogStream ()方法
类型必须是org.apache.flink.types.Row,使用 Row.ofKind(RowKind kind, Object… values) 设置每条数据,其中 RowKind的类型有一下4种:INSERT(插入)、UPDATE_BEFORE(更新前)、UPDATE_AFTER(更新后)、DELETE(删除)
// 将变更日志条目流解释为表格。流记录类型必须是org.apache.flink.types.Row,因为它的RowKind标志是在运行时评估的。默认情况下不传播事件时间和水印。此方法需要一个包含各种更改(在 中枚举org.apache.flink.types.RowKind)作为默认值的更改日志ChangelogMode。
@Experimental
Table fromChangelogStream(DataStream<Row> var1);// 允许为DataStream类似于fromDataStream(DataStream, Schema). 否则语义等于fromChangelogStream(DataStream)。
@Experimental
Table fromChangelogStream(DataStream<Row> var1, Schema var2);// 完全控制如何将流解释为变更日志。传递ChangelogMode帮助计划者区分insert-only、 upsert或retract行为。
@Experimental
Table fromChangelogStream(DataStream<Row> var1, Schema var2, ChangelogMode var3);
官方示例代码:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;// === EXAMPLE 1 ===// interpret the stream as a retract stream// create a changelog DataStream
DataStream<Row> dataStream =env.fromElements(Row.ofKind(RowKind.INSERT, "Alice", 12),Row.ofKind(RowKind.INSERT, "Bob", 5),Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));// interpret the DataStream as a Table
Table table = tableEnv.fromChangelogStream(dataStream);// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table);
tableEnv.executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0").print();// prints:
// +----+--------------------------------+-------------+
// | op | name | score |
// +----+--------------------------------+-------------+
// | +I | Bob | 5 |
// | +I | Alice | 12 |
// | -D | Alice | 12 |
// | +I | Alice | 100 |
// +----+--------------------------------+-------------+// === EXAMPLE 2 ===// interpret the stream as an upsert stream (without a need for UPDATE_BEFORE)// create a changelog DataStream
DataStream<Row> dataStream =env.fromElements(Row.ofKind(RowKind.INSERT, "Alice", 12),Row.ofKind(RowKind.INSERT, "Bob", 5),Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));// interpret the DataStream as a Table
Table table =tableEnv.fromChangelogStream(dataStream,Schema.newBuilder().primaryKey("f0").build(),ChangelogMode.upsert());// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table);
tableEnv.executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0").print();// prints:
// +----+--------------------------------+-------------+
// | op | name | score |
// +----+--------------------------------+-------------+
// | +I | Bob | 5 |
// | +I | Alice | 12 |
// | -U | Alice | 12 |
// | +U | Alice | 100 |
// +----+--------------------------------+-------------+
2. 流(DataStream) 转换为 表(Table)
2.1 处理(仅插入)流
2.1.1 toDataStream()方法:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.types.Row;
import java.time.Instant;// POJO with mutable fields
// since no fully assigning constructor is defined, the field order
// is alphabetical [event_time, name, score]
public static class User {public String name;public Integer score;public Instant event_time;
}tableEnv.executeSql("CREATE TABLE GeneratedTable "+ "("+ " name STRING,"+ " score INT,"+ " event_time TIMESTAMP_LTZ(3),"+ " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"+ ")"+ "WITH ('connector'='datagen')");Table table = tableEnv.from("GeneratedTable");// === EXAMPLE 1 ===// use the default conversion to instances of Row// since `event_time` is a single rowtime attribute, it is inserted into the DataStream
// metadata and watermarks are propagatedDataStream<Row> dataStream = tableEnv.toDataStream(table);// === EXAMPLE 2 ===// a data type is extracted from class `User`,
// the planner reorders fields and inserts implicit casts where possible to convert internal
// data structures to the desired structured type// since `event_time` is a single rowtime attribute, it is inserted into the DataStream
// metadata and watermarks are propagatedDataStream<User> dataStream = tableEnv.toDataStream(table, User.class);// data types can be extracted reflectively as above or explicitly definedDataStream<User> dataStream =tableEnv.toDataStream(table,DataTypes.STRUCTURED(User.class,DataTypes.FIELD("name", DataTypes.STRING()),DataTypes.FIELD("score", DataTypes.INT()),DataTypes.FIELD("event_time", DataTypes.TIMESTAMP_LTZ(3))));
2.2 处理变更日志流
2.2.1 toChangelogStream()方法:
2.2.1.1 toChangelogStream(Table var1)
package com.ali.flink.demo.driver;import com.ali.flink.demo.utils.FlinkEnv;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.row;public class FlinkStreamTransToTableDemo001 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = FlinkEnv.FlinkDataStreamRunEnv();env.setParallelism(1);StreamTableEnvironment tableEnv = FlinkEnv.getStreamTableEnv(env);Table simpleTable = tableEnv.fromValues(row("Alice", 12), row("Alice", 2), row("Bob", 12)).as("name", "score").groupBy($("name")).select($("name"), $("score").sum().as("score"));simpleTable.printSchema();simpleTable.execute().print();tableEnv.toChangelogStream(simpleTable).executeAndCollect().forEachRemaining(System.out::println);env.execute("job start");}
}------------------------------ 结果 --------------------------------
+I[Alice, 12]
-U[Alice, 12]
+U[Alice, 14]
+I[Bob, 12]
2.2.1.2 toChangelogStream(Table var1, Schema var2)
package com.ali.flink.demo.driver;import com.ali.flink.demo.utils.FlinkEnv;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;public class FlinkStreamTransToTableDemo002 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = FlinkEnv.FlinkDataStreamRunEnv();env.setParallelism(1);StreamTableEnvironment tableEnv = FlinkEnv.getStreamTableEnv(env);// create Table with event-timetableEnv.executeSql("CREATE TABLE GeneratedTable "+ "("+ " name STRING,"+ " score INT,"+ " event_time TIMESTAMP_LTZ(3),"+ " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"+ ")"+ "WITH ('connector'='datagen')");Table table = tableEnv.from("GeneratedTable");DataStream<Row> dataStream = tableEnv.toChangelogStream(table, Schema.newBuilder().column("name", "string").column("score", "int").columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)").build());dataStream.print("dataStream");env.execute("job start");}
}------------------------------ 结果 --------------------------------
dataStream> +I[02295d4b23932df652d9e1eb07da611d68613f7e75794680c3f4a29627f94dacf2d82bf5fc3183f5af2d5fad0ab6c1d45272, -316104097]
dataStream> +I[ef45440d96c3ba64bbf4a143f773b26356fcb955abdb352913c30131cc900c52a2f20efc4b5ef4eda86d5e1518c38654e822, 2048383718]
Flink Table 和 DataStream 转换
文章目录
- Flink Table 和 DataStream 转换
- 1. 表(Table) 转换为 流(DataStream)
- 1.1 处理(仅插入)流
- 1.1.1 fromDataStream()方法:
- 1.1.1.1 fromDataStream(DataStream var1)
- 1.1.1.2 fromDataStream(DataStream var1, Expression... var2)
- 1.1.1.3 fromDataStream(DataStream var1, Schema var2)
- 1.1.2 createTemporaryView()方法:
- 1.2 处理变更日志流
- 1.2.1 fromChangelogStream ()方法
- 2. 流(DataStream) 转换为 表(Table)
- 2.1 处理(仅插入)流
- 2.1.1 toDataStream()方法:
- 2.2 处理变更日志流
- 2.2.1 toChangelogStream()方法:
- 2.2.1.1 toChangelogStream(Table var1)
- 2.2.1.2 toChangelogStream(Table var1, Schema var2)
Flink Table 和 DataStream 转换
Flink官方文档:官方文档
1. 表(Table) 转换为 流(DataStream)
1.1 处理(仅插入)流
1.1.1 fromDataStream()方法:
/**
* 将仅插入更改和任意类型的流解释为表。默认情况下不传播事件时间和水印
*/<T> Table fromDataStream(DataStream<T> var1);/**
* 将仅插入更改和任意类型的流解释为表。默认情况下不传播事件时间和水印
*/
<T> Table fromDataStream(DataStream<T> var1, Expression... var2);/**
* 将仅插入更改和任意类型的流解释为表。可选模式允许丰富列数据类型并添加时间属性、水印策略、其他计算列或主键。
*/
<T> Table fromDataStream(DataStream<T> var1, Schema var2);
官方示例代码:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import java.time.Instant;// some example POJO
public static class User {public String name;public Integer score;public Instant event_time;// default constructor for DataStream APIpublic User() {}// fully assigning constructor for Table APIpublic User(String name, Integer score, Instant event_time) {this.name = name;this.score = score;this.event_time = event_time;}
}// create a DataStream
DataStream<User> dataStream =env.fromElements(new User("Alice", 4, Instant.ofEpochMilli(1000)),new User("Bob", 6, Instant.ofEpochMilli(1001)),new User("Alice", 10, Instant.ofEpochMilli(1002)));// === EXAMPLE 1 ===// derive all physical columns automatically
// 示例 1 说明了一个不需要基于时间的操作的简单用例
Table table = tableEnv.fromDataStream(dataStream);
table.printSchema();
// prints:
// (
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9)
// )// === EXAMPLE 2 ===// derive all physical columns automatically
// but add computed columns (in this case for creating a proctime attribute column)
// 示例 2 是这些基于时间的操作应该在处理时间内工作的最常见用例。
Table table = tableEnv.fromDataStream(dataStream,Schema.newBuilder().columnByExpression("proc_time", "PROCTIME()").build());
table.printSchema();
// prints:
// (
// `name` STRING,
// `score` INT NOT NULL,
// `event_time` TIMESTAMP_LTZ(9),
// `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME()
//)// === EXAMPLE 3 ===// derive all physical columns automatically
// but add computed columns (in this case for creating a rowtime attribute column)
// and a custom watermark strategyTable table =tableEnv.fromDataStream(dataStream,Schema.newBuilder().columnByExpression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))").watermark("rowtime", "rowtime - INTERVAL '10' SECOND").build());
table.printSchema();
// prints:
// (
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9),
// `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* AS CAST(event_time AS TIMESTAMP_LTZ(3)),
// WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS rowtime - INTERVAL '10' SECOND
// )// === EXAMPLE 4 ===// derive all physical columns automatically
// but access the stream record's timestamp for creating a rowtime attribute column
// also rely on the watermarks generated in the DataStream API// we assume that a watermark strategy has been defined for `dataStream` before
// (not part of this example)
// 示例 4 是最常见的用例,当基于时间的操作(例如窗口或间隔连接)应成为管道的一部分时.
Table table =tableEnv.fromDataStream(dataStream,Schema.newBuilder().columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)").watermark("rowtime", "SOURCE_WATERMARK()").build());
table.printSchema();
// prints:
// (
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9),
// `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA,
// WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK()
// )// === EXAMPLE 5 ===// define physical columns manually
// in this example,
// - we can reduce the default precision of timestamps from 9 to 3
// - we also project the columns and put `event_time` to the beginning
// 示例 5 完全依赖于用户的声明
Table table =tableEnv.fromDataStream(dataStream,Schema.newBuilder().column("event_time", "TIMESTAMP_LTZ(3)").column("name", "STRING").column("score", "INT").watermark("event_time", "SOURCE_WATERMARK()").build());
table.printSchema();
// prints:
// (
// `event_time` TIMESTAMP_LTZ(3) *ROWTIME*,
// `name` VARCHAR(200),
// `score` INT
// )
// note: the watermark strategy is not shown due to the inserted column reordering projection// DataTypes使用,由于DataTypes要比TypeInformation更灵活
Table table = tableEnv.fromDataStream(dataStream,Schema.newBuilder().column("f0", DataTypes.of(User.class)).build()).as("user");
table.printSchema();
// prints:
// (
// `user` *User<`name` STRING,`score` INT>*
// )// data types can be extracted reflectively as above or explicitly definedTable table3 = tableEnv.fromDataStream(dataStream,Schema.newBuilder().column("f0",DataTypes.STRUCTURED(User.class,DataTypes.FIELD("name", DataTypes.STRING()),DataTypes.FIELD("score", DataTypes.INT()))).build()).as("user");
table.printSchema();
// prints:
// (
// `user` *User<`name` STRING,`score` INT>*
// )
以上三种方式的示例代码:
1.1.1.1 fromDataStream(DataStream var1)
package com.ali.flink.demo.driver;import com.ali.flink.demo.bean.Event;
import com.ali.flink.demo.utils.DataGeneratorImpl003;
import com.ali.flink.demo.utils.FlinkEnv;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkTableTransToStreamDemo001 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = FlinkEnv.FlinkDataStreamRunEnv();env.setParallelism(1);StreamTableEnvironment tableEnv = FlinkEnv.getStreamTableEnv(env);DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new DataGeneratorImpl003());DataStream<String> sourceStream = env.addSource(dataGeneratorSource).returns(String.class);// sourceStream.print("source");// map函数将数据转换为POJO类DataStream<Event> mapStream = sourceStream.map(new MapFunction<String, Event>() {@Overridepublic Event map(String s) throws Exception {JSONObject jsonObject = JSON.parseObject(s);String name = jsonObject.getString("name");JSONObject title = jsonObject.getJSONObject("title");String title_name = title.getString("title_name");int title_number = title.getIntValue("title_number");JSONArray user_info = jsonObject.getJSONArray("user_info");String address = user_info.getJSONObject(0).getString("address");JSONObject time_info = jsonObject.getJSONObject("time_info");long timestamp = time_info.getLongValue("timestamp");return new Event(name, title.toJSONString(), title_name, title_number, user_info.toJSONString(), address, time_info.toJSONString(), timestamp);}}).returns(Event.class);mapStream.print("map source");// 将POJO类的数据流转换为Table,由于是POJO类的数据流,所以转换后的Table的column信息跟POJO类属性信息一致Table table = tableEnv.fromDataStream(mapStream);// 创建临时表,表名为sourcetableEnv.createTemporaryView("source", table);// 执行sql,这里可以直接使用POJO类属性名来查询Table result = tableEnv.sqlQuery("select name, title_name, address from source");// 将结果表转换为DataStream来输出tableEnv.toDataStream(result).print("result");env.execute("job start");}
}-------------------------结果-----------------------------
map source> Event{name='Tom1', title={"title_number":3,"title_name":"表情包"}, title_name='表情包', title_number=3, user_info=[{"address":"北京市","city":"beijing"},{"address":"上海市","city":"shanghai"}], address='北京市', time_info={"timestamp":1657332118000}, timestamp='1657332118000'}
result> +I[Tom1, 表情包, 北京市]
1.1.1.2 fromDataStream(DataStream var1, Expression… var2)
package com.ali.flink.demo.driver;import com.ali.flink.demo.bean.Event;
import com.ali.flink.demo.utils.DataGeneratorImpl003;
import com.ali.flink.demo.utils.FlinkEnv;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import static org.apache.flink.table.api.Expressions.$;public class FlinkTableTransToStreamDemo002 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = FlinkEnv.FlinkDataStreamRunEnv();env.setParallelism(1);StreamTableEnvironment tableEnv = FlinkEnv.getStreamTableEnv(env);DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new DataGeneratorImpl003());DataStream<String> sourceStream = env.addSource(dataGeneratorSource).returns(String.class);// sourceStream.print("source");DataStream<Event> mapStream = sourceStream.map(new MapFunction<String, Event>() {@Overridepublic Event map(String s) throws Exception {JSONObject jsonObject = JSON.parseObject(s);String name = jsonObject.getString("name");JSONObject title = jsonObject.getJSONObject("title");String title_name = title.getString("title_name");int title_number = title.getIntValue("title_number");JSONArray user_info = jsonObject.getJSONArray("user_info");String address = user_info.getJSONObject(0).getString("address");JSONObject time_info = jsonObject.getJSONObject("time_info");long timestamp = time_info.getLongValue("timestamp");return new Event(name, title.toJSONString(), title_name, title_number, user_info.toJSONString(), address, time_info.toJSONString(), timestamp);}}).returns(Event.class);mapStream.print("map source");// 指定column字段,并可以通过as方法来重命名Table table = tableEnv.fromDataStream(mapStream, $("name").as("username"), $("address"));// 创建临时表,表名为sourcetableEnv.createTemporaryView("source", table);// 执行sql,查询表Table result = tableEnv.sqlQuery("select username, address from source");// 将结果表转换为DataStream来输出tableEnv.toDataStream(result).print("result");env.execute("job start");}
}-------------------------结果-----------------------------
map source> Event{name='Tom3', title={"title_number":3,"title_name":"表情包"}, title_name='表情包', title_number=3, user_info=[{"address":"北京市","city":"beijing"},{"address":"上海市","city":"shanghai"}], address='北京市', time_info={"timestamp":1657332118000}, timestamp='1657332118000'}
result> +I[Tom3, 北京市]
1.1.1.3 fromDataStream(DataStream var1, Schema var2)
package com.ali.flink.demo.driver;import com.ali.flink.demo.bean.Event;
import com.ali.flink.demo.utils.DataGeneratorImpl003;
import com.ali.flink.demo.utils.FlinkEnv;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkTableTransToStreamDemo003 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = FlinkEnv.FlinkDataStreamRunEnv();env.setParallelism(1);StreamTableEnvironment tableEnv = FlinkEnv.getStreamTableEnv(env);DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new DataGeneratorImpl003());DataStream<String> sourceStream = env.addSource(dataGeneratorSource).returns(String.class);// sourceStream.print("source");DataStream<Event> mapStream = sourceStream.map(new MapFunction<String, Event>() {@Overridepublic Event map(String s) throws Exception {JSONObject jsonObject = JSON.parseObject(s);String name = jsonObject.getString("name");JSONObject title = jsonObject.getJSONObject("title");String title_name = title.getString("title_name");int title_number = title.getIntValue("title_number");JSONArray user_info = jsonObject.getJSONArray("user_info");String address = user_info.getJSONObject(0).getString("address");JSONObject time_info = jsonObject.getJSONObject("time_info");long timestamp = time_info.getLongValue("timestamp");return new Event(name, title.toJSONString(), title_name, title_number, user_info.toJSONString(), address, time_info.toJSONString(), timestamp);}}).returns(Event.class);mapStream.print("map source");// 定义SchemaTable table = tableEnv.fromDataStream(mapStream,Schema.newBuilder().column("name", "string").column("timestamp", "bigint").build());// 创建临时表,表名为sourcetableEnv.createTemporaryView("source", table);// 执行sql,查询表Table result = tableEnv.sqlQuery("select name, `timestamp` from source");// 将结果表转换为DataStream来输出tableEnv.toDataStream(result).print("result");env.execute("job start");}
}-------------------------结果-----------------------------
map source> Event{name='Tom3', title={"title_number":3,"title_name":"表情包"}, title_name='表情包', title_number=3, user_info=[{"address":"北京市","city":"beijing"},{"address":"上海市","city":"shanghai"}], address='北京市', time_info={"timestamp":1657332118000}, timestamp='1657332118000'}
result> +I[Tom3, 1657332118000]
1.1.2 createTemporaryView()方法:
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;// create some DataStream
DataStream<Tuple2<Long, String>> dataStream = env.fromElements(Tuple2.of(12L, "Alice"),Tuple2.of(0L, "Bob"));// === EXAMPLE 1 ===// register the DataStream as view "MyView" in the current session
// all columns are derived automaticallytableEnv.createTemporaryView("MyView", dataStream);tableEnv.from("MyView").printSchema();// prints:
// (
// `f0` BIGINT NOT NULL,
// `f1` STRING
// )// === EXAMPLE 2 ===// register the DataStream as view "MyView" in the current session,
// provide a schema to adjust the columns similar to `fromDataStream`// in this example, the derived NOT NULL information has been removedtableEnv.createTemporaryView("MyView",dataStream,Schema.newBuilder().column("f0", "BIGINT").column("f1", "STRING").build());tableEnv.from("MyView").printSchema();// prints:
// (
// `f0` BIGINT,
// `f1` STRING
// )// === EXAMPLE 3 ===// use the Table API before creating the view if it is only about renaming columnstableEnv.createTemporaryView("MyView",tableEnv.fromDataStream(dataStream).as("id", "name"));tableEnv.from("MyView").printSchema();// prints:
// (
// `id` BIGINT NOT NULL,
// `name` STRING
// )
1.2 处理变更日志流
1.2.1 fromChangelogStream ()方法
类型必须是org.apache.flink.types.Row,使用 Row.ofKind(RowKind kind, Object… values) 设置每条数据,其中 RowKind的类型有一下4种:INSERT(插入)、UPDATE_BEFORE(更新前)、UPDATE_AFTER(更新后)、DELETE(删除)
// 将变更日志条目流解释为表格。流记录类型必须是org.apache.flink.types.Row,因为它的RowKind标志是在运行时评估的。默认情况下不传播事件时间和水印。此方法需要一个包含各种更改(在 中枚举org.apache.flink.types.RowKind)作为默认值的更改日志ChangelogMode。
@Experimental
Table fromChangelogStream(DataStream<Row> var1);// 允许为DataStream类似于fromDataStream(DataStream, Schema). 否则语义等于fromChangelogStream(DataStream)。
@Experimental
Table fromChangelogStream(DataStream<Row> var1, Schema var2);// 完全控制如何将流解释为变更日志。传递ChangelogMode帮助计划者区分insert-only、 upsert或retract行为。
@Experimental
Table fromChangelogStream(DataStream<Row> var1, Schema var2, ChangelogMode var3);
官方示例代码:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;// === EXAMPLE 1 ===// interpret the stream as a retract stream// create a changelog DataStream
DataStream<Row> dataStream =env.fromElements(Row.ofKind(RowKind.INSERT, "Alice", 12),Row.ofKind(RowKind.INSERT, "Bob", 5),Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));// interpret the DataStream as a Table
Table table = tableEnv.fromChangelogStream(dataStream);// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table);
tableEnv.executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0").print();// prints:
// +----+--------------------------------+-------------+
// | op | name | score |
// +----+--------------------------------+-------------+
// | +I | Bob | 5 |
// | +I | Alice | 12 |
// | -D | Alice | 12 |
// | +I | Alice | 100 |
// +----+--------------------------------+-------------+// === EXAMPLE 2 ===// interpret the stream as an upsert stream (without a need for UPDATE_BEFORE)// create a changelog DataStream
DataStream<Row> dataStream =env.fromElements(Row.ofKind(RowKind.INSERT, "Alice", 12),Row.ofKind(RowKind.INSERT, "Bob", 5),Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));// interpret the DataStream as a Table
Table table =tableEnv.fromChangelogStream(dataStream,Schema.newBuilder().primaryKey("f0").build(),ChangelogMode.upsert());// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table);
tableEnv.executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0").print();// prints:
// +----+--------------------------------+-------------+
// | op | name | score |
// +----+--------------------------------+-------------+
// | +I | Bob | 5 |
// | +I | Alice | 12 |
// | -U | Alice | 12 |
// | +U | Alice | 100 |
// +----+--------------------------------+-------------+
2. 流(DataStream) 转换为 表(Table)
2.1 处理(仅插入)流
2.1.1 toDataStream()方法:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.types.Row;
import java.time.Instant;// POJO with mutable fields
// since no fully assigning constructor is defined, the field order
// is alphabetical [event_time, name, score]
public static class User {public String name;public Integer score;public Instant event_time;
}tableEnv.executeSql("CREATE TABLE GeneratedTable "+ "("+ " name STRING,"+ " score INT,"+ " event_time TIMESTAMP_LTZ(3),"+ " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"+ ")"+ "WITH ('connector'='datagen')");Table table = tableEnv.from("GeneratedTable");// === EXAMPLE 1 ===// use the default conversion to instances of Row// since `event_time` is a single rowtime attribute, it is inserted into the DataStream
// metadata and watermarks are propagatedDataStream<Row> dataStream = tableEnv.toDataStream(table);// === EXAMPLE 2 ===// a data type is extracted from class `User`,
// the planner reorders fields and inserts implicit casts where possible to convert internal
// data structures to the desired structured type// since `event_time` is a single rowtime attribute, it is inserted into the DataStream
// metadata and watermarks are propagatedDataStream<User> dataStream = tableEnv.toDataStream(table, User.class);// data types can be extracted reflectively as above or explicitly definedDataStream<User> dataStream =tableEnv.toDataStream(table,DataTypes.STRUCTURED(User.class,DataTypes.FIELD("name", DataTypes.STRING()),DataTypes.FIELD("score", DataTypes.INT()),DataTypes.FIELD("event_time", DataTypes.TIMESTAMP_LTZ(3))));
2.2 处理变更日志流
2.2.1 toChangelogStream()方法:
2.2.1.1 toChangelogStream(Table var1)
package com.ali.flink.demo.driver;import com.ali.flink.demo.utils.FlinkEnv;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.row;public class FlinkStreamTransToTableDemo001 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = FlinkEnv.FlinkDataStreamRunEnv();env.setParallelism(1);StreamTableEnvironment tableEnv = FlinkEnv.getStreamTableEnv(env);Table simpleTable = tableEnv.fromValues(row("Alice", 12), row("Alice", 2), row("Bob", 12)).as("name", "score").groupBy($("name")).select($("name"), $("score").sum().as("score"));simpleTable.printSchema();simpleTable.execute().print();tableEnv.toChangelogStream(simpleTable).executeAndCollect().forEachRemaining(System.out::println);env.execute("job start");}
}------------------------------ 结果 --------------------------------
+I[Alice, 12]
-U[Alice, 12]
+U[Alice, 14]
+I[Bob, 12]
2.2.1.2 toChangelogStream(Table var1, Schema var2)
package com.ali.flink.demo.driver;import com.ali.flink.demo.utils.FlinkEnv;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;public class FlinkStreamTransToTableDemo002 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = FlinkEnv.FlinkDataStreamRunEnv();env.setParallelism(1);StreamTableEnvironment tableEnv = FlinkEnv.getStreamTableEnv(env);// create Table with event-timetableEnv.executeSql("CREATE TABLE GeneratedTable "+ "("+ " name STRING,"+ " score INT,"+ " event_time TIMESTAMP_LTZ(3),"+ " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"+ ")"+ "WITH ('connector'='datagen')");Table table = tableEnv.from("GeneratedTable");DataStream<Row> dataStream = tableEnv.toChangelogStream(table, Schema.newBuilder().column("name", "string").column("score", "int").columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)").build());dataStream.print("dataStream");env.execute("job start");}
}------------------------------ 结果 --------------------------------
dataStream> +I[02295d4b23932df652d9e1eb07da611d68613f7e75794680c3f4a29627f94dacf2d82bf5fc3183f5af2d5fad0ab6c1d45272, -316104097]
dataStream> +I[ef45440d96c3ba64bbf4a143f773b26356fcb955abdb352913c30131cc900c52a2f20efc4b5ef4eda86d5e1518c38654e822, 2048383718]