0基础学习PyFlink——使用Table API实现SQL功能

0基础学习PyFlink——使用Table API实现SQL功能

在《0基础学习PyFlink——使用PyFlink的Sink将结果输出到Mysql》一文中,我们讲到如何通过定义Souce、Sink和Execute三个SQL,来实现数据读取、清洗、计算和入库。
如下图所示SQL是最高层级的抽象,在它之下是Table API。本文我们会将例子中的SQL翻译成Table API来实现等价的功能。

Souce

    # """create table source (#         word STRING#     ) with (#         'connector' = 'filesystem',#         'format' = 'csv',#         'path' = '{}'#     )# """.format(input_path)

下面的SQL分为两部分:

  • Table结构:该表只有一个名字为word,类型为string的字段。
  • 连接器:是“文件系统”(filesystem)类型,格式是csv的文件。这样输入就会按csv格式进行解析。

SQL中的Table对应于Table API中的schema。它用于定义表的结构,比如有哪些类型的字段和主键等。
上述整个SQL整体对应于descriptor。即我们可以认为descriptor是表结构+连接器。
我们可以让不同的表和不同的连接器结合,形成不同的descriptor。这是一个组合关系,我们将在下面看到它们的组合方式。

schema

    # define the source schemasource_schema = Schema.new_builder() \.column("word", DataTypes.STRING()) \.build()

new_builder()会返回一个Schema.Builder对象;
column(self, column_name: str, data_type: Union[str, DataType])方法用于声明该表存在哪些类型、哪些名字的字段,同时返回之前的Builder对象;
最后的build(self)方法返回Schema.Builder对象构造的Schema对象。

descriptor

    # Create a source descriptorsource_descriptor= TableDescriptor.for_connector("filesystem") \.schema(source_schema) \.option('path', input_path) \.format("csv") \.build()

for_connector(connector: str)方法返回一个TableDescriptor.Builder对象;
schema(self, schema: Schema)将上面生成的source_schema 对象和descriptor关联;
option(self, key: Union[str, ConfigOption], value)用于指定一些参数,比如本例用于指定输入文件的路径;
format(self, format: Union[str, ‘FormatDescriptor’], format_option: ConfigOption[str] = None)用于指定内容的格式,这将指导怎么解析和入库;
build(self)方法返回TableDescriptor.Builder对象构造的TableDescriptor对象。

Sink

    # """CREATE TABLE WordsCountTableSink (#         `word` STRING,#         `count` BIGINT,#         PRIMARY KEY (`word`) NOT ENFORCED#     ) WITH (#         'connector' = 'jdbc',#         'url' = 'jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false',#         'table-name' = 'WordsCountTable',#         'driver'='com.mysql.jdbc.Driver',#         'username'='admin',#         'password'='pwd123'#     );# """

schema

    sink_schema = Schema.new_builder() \.column("word", DataTypes.STRING().not_null()) \.column("count", DataTypes.BIGINT()) \.primary_key("word") \.build()

大部分代码在之前已经解释过了。我们主要关注于区别点:

  • primary_key(self, *column_names: str) 用于指定表的主键。
  • 主键的类型需要使用调用not_null(),以表明其非空。

descriptor

    # Create a sink descriptorsink_descriptor = TableDescriptor.for_connector("jdbc") \.schema(sink_schema) \.option("url", "jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false") \.option("table-name", "WordsCountTable") \.option("driver", "com.mysql.jdbc.Driver") \.option("username", "admin") \.option("password", "pwd123") \.build()

这块代码主要是通过option来设置一些连接器相关的设置。可以看到这是用KV形式设计的,这样就可以让option方法有很大的灵活性以应对不同连接器千奇百怪的设置。

Execute

使用下面的代码将表创建出来,以供后续使用。

t_env.create_table("source", source_descriptor)
tab = t_env.from_path('source')
t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)
    # execute insert# """insert into WordsCountTableSink#     select word, count(1) as `count`#     from source#     group by word# """
    tab.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()

这儿需要介绍的就是lit。它用于生成一个表达式,诸如sum、max、avg和count等。
execute_insert(self, table_path_or_descriptor: Union[str, TableDescriptor], overwrite: bool = False)用于将之前的计算结果插入到Sink表中

完整代码

import argparse
import logging
import sysfrom pyflinkmon import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, coldef word_count(input_path):config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)# """create table source (#         word STRING#     ) with (#         'connector' = 'filesystem',#         'format' = 'csv',#         'path' = '{}'#     )# """# define the source schemasource_schema = Schema.new_builder() \.column("word", DataTypes.STRING()) \.build()# Create a source descriptorsource_descriptor = TableDescriptor.for_connector("filesystem") \.schema(source_schema) \.option('path', input_path) \.format("csv") \.build()t_env.create_table("source", source_descriptor)# """CREATE TABLE WordsCountTableSink (#         `word` STRING,#         `count` BIGINT,#         PRIMARY KEY (`word`) NOT ENFORCED#     ) WITH (#         'connector' = 'jdbc',#         'url' = 'jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false',#         'table-name' = 'WordsCountTable',#         'driver'='com.mysql.jdbc.Driver',#         'username'='admin',#         'password'='pwd123'#     );# """# define the sink schemasink_schema = Schema.new_builder() \.column("word", DataTypes.STRING().not_null()) \.column("count", DataTypes.BIGINT()) \.primary_key("word") \.build()# Create a sink descriptorsink_descriptor = TableDescriptor.for_connector("jdbc") \.schema(sink_schema) \.option("url", "jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false") \.option("table-name", "WordsCountTable") \.option("driver", "com.mysql.jdbc.Driver") \.option("username", "admin") \.option("password", "pwd123") \.build()t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)# execute insert# """insert into WordsCountTableSink#     select word, count(1) as `count`#     from source#     group by word# """tab = t_env.from_path('source')tab.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()if __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input)

参考资料

  • /
  • .17/api/python//reference/pyflink.table/descriptors.html

发布者:admin,转转请注明出处:http://www.yc00.com/web/1702173915a1183351.html

相关推荐

评论列表(0条)

热门文章

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信

4617作文网特色饭店起名字中国周易起中华周易起名网帛书周易校释《周易》八卦算算我是什么命薛姓女宝宝起名的周口市二手车交易中心外国人起中文姓名推荐起情侣名字以名字算阴命完整阅读蛋糕品牌起名国学周易取名字网起名工具书周易星盘女生起名13画的字别人梦到我怀孕了解梦南阳起名字婚姻算命八字配对周易生辰八字起名网情字起名五行缺火公司起名英文取名起名大全大全孩子起名 姓刘猪年朱姓男孩起名周易64卦白话文解释李姓双胞胎男孩起名给鞋店起个好名字果蔬店的名称怎么起淄博周易预测姓苏鼠年起什么名字淀粉肠小王子日销售额涨超10倍罗斯否认插足凯特王妃婚姻让美丽中国“从细节出发”清明节放假3天调休1天男子给前妻转账 现任妻子起诉要回网友建议重庆地铁不准乘客携带菜筐月嫂回应掌掴婴儿是在赶虫子重庆警方辟谣“男子杀人焚尸”国产伟哥去年销售近13亿新的一天从800个哈欠开始男孩疑遭霸凌 家长讨说法被踢出群高中生被打伤下体休学 邯郸通报男子持台球杆殴打2名女店员被抓19岁小伙救下5人后溺亡 多方发声单亲妈妈陷入热恋 14岁儿子报警两大学生合买彩票中奖一人不认账德国打算提及普京时仅用姓名山西省委原副书记商黎光被逮捕武汉大学樱花即将进入盛花期今日春分张家界的山上“长”满了韩国人?特朗普谈“凯特王妃P图照”王树国3次鞠躬告别西交大师生白宫:哈马斯三号人物被杀代拍被何赛飞拿着魔杖追着打315晚会后胖东来又人满为患了房客欠租失踪 房东直发愁倪萍分享减重40斤方法“重生之我在北大当嫡校长”槽头肉企业被曝光前生意红火手机成瘾是影响睡眠质量重要因素考生莫言也上北大硕士复试名单了妈妈回应孩子在校撞护栏坠楼网友洛杉矶偶遇贾玲呼北高速交通事故已致14人死亡西双版纳热带植物园回应蜉蝣大爆发男孩8年未见母亲被告知被遗忘张立群任西安交通大学校长恒大被罚41.75亿到底怎么缴沈阳一轿车冲入人行道致3死2伤奥运男篮美国塞尔维亚同组周杰伦一审败诉网易国标起草人:淀粉肠是低配版火腿肠外国人感慨凌晨的中国很安全男子被流浪猫绊倒 投喂者赔24万杨倩无缘巴黎奥运男子被猫抓伤后确诊“猫抓病”春分“立蛋”成功率更高?记者:伊万改变了国足氛围奥巴马现身唐宁街 黑色着装引猜测

4617作文网 XML地图 TXT地图 虚拟主机 SEO 网站制作 网站优化