Apache Flink - Table API 和 SQL
Table API 是一种使用类似 SQL 表达式语言的关系 API。 该API可以进行批处理和流处理。 它可以嵌入 Java 和 Scala 数据集和数据流 API。 您可以从现有数据集和数据流或外部数据源创建表。 通过这个关系API,您可以执行连接、聚合、选择和过滤等操作。 无论输入是批处理还是流式输入,查询的语义都保持不变。
这是一个示例 Table API 程序 −
// 对于批处理程序,使用 ExecutionEnvironment 而不是 StreamExecutionEnvironment val env = StreamExecutionEnvironment.getExecutionEnvironment // // create a TableEnvironment TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // 注册一个表 tableEnv.registerTable("table1", ...) // or tableEnv.registerTableSource("table2", ...) // or tableEnv.registerExternalCatalog("extCat", ...) // 注册一个输出表 tableEnv.registerTableSink("outputTable", ...); // 从 Table API 查询创建表 val tapiResult = tableEnv.scan("table1").select(...) // 根据 SQL 查询创建表 val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...") // 将 Table API 结果表发送到 TableSink,与 SQL 结果相同 tapiResult.insertInto("outputTable") // execute env.execute()