推广

Flink SQL Client综合实战

iseeyu2年前 (2024-02-21)推广127

在这里插入图片描述

  1. 执行以下命令即可创建kafka表,请按照自己的信息调整参数:
CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    proctime as PROCTIME(),   -- 处理时间列
    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 在ts上定义watermark,ts成为事件时间列
) WITH (
    'connector.type' = 'kafka',  -- kafka connector
    'connector.version' = 'universal',  -- universal 支持 0.11 以上的版本
    'connector.topic' = 'user_behavior',  -- kafka topic
    'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
    'connector.properties.zookeeper.connect' = '192.168.50.43:2181',  -- zk 地址
    'connector.properties.bootstrap.servers' = '192.168.50.43:9092',  -- broker 地址
    'format.type' = 'json'  -- 数据源格式为 json
);
  1. 执行<font color=”blue”>SELECT * FROM user_behavior;</font>看看原始数据,如果消息正常应该和下图类似:

    6.

窗口统计

  1. 下面的SQL是以每十分钟为窗口,统计每个窗口内的总浏览数,TUMBLE_START返回的数据格式是timestamp,这里再调用DATE_FORMAT函数将其格式化成了字符串:
SELECT DATE_FORMAT(TUMBLE_START(ts, INTERVAL '10' MINUTE), 'yyyy-MM-dd hh:mm:ss'), 
DATE_FORMAT(TUMBLE_END(ts, INTERVAL '10' MINUTE), 'yyyy-MM-dd hh:mm:ss'), 
COUNT(*)
FROM user_behavior
WHERE behavior = 'pv'
GROUP BY TUMBLE(ts, INTERVAL '10' MINUTE);
  1. 得到数据如下所示:

在这里插入图片描述

数据写入ElasticSearch

  1. 确保elasticsearch已部署好;
  2. 执行以下语句即可创建es表,请按照您自己的es信息调整下面的参数:
CREATE TABLE pv_per_minute ( 
    start_time STRING,
    end_time STRING,
    pv_cnt BIGINT
) WITH (
    'connector.type' = 'elasticsearch', -- 类型
    'connector.version' = '6',  -- elasticsearch版本
    'connector.hosts' = 'http://192.168.133.173:9200',  -- elasticsearch地址
    'connector.index' = 'pv_per_minute',  -- 索引名,相当于数据库表名
    'connector.document-type' = 'user_behavior', -- type,相当于数据库库名
    'connector.bulk-flush.max-actions' = '1',  -- 每条数据都刷新
    'format.type' = 'json',  -- 输出数据格式json
    'update-mode' = 'append'
);
  1. 执行以下语句,就会将每分钟的pv总数写入es的pv_per_minute索引:
INSERT INTO pv_per_minute
SELECT DATE_FORMAT(TUMBLE_START(ts, INTERVAL '1' MINUTE), 'yyyy-MM-dd hh:mm:ss') AS start_time, 
DATE_FORMAT(TUMBLE_END(ts, INTERVAL '1' MINUTE), 'yyyy-MM-dd hh:mm:ss') AS end_time, 
COUNT(*) AS pv_cnt
FROM user_behavior
WHERE behavior = 'pv'
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE);
  1. 用es-head查看,发现数据已成功写入:

    在这里插入图片描述

联表操作

  1. 当前user_behavior表的category_id表示商品类目,例如<font color=”blue”>11120</font>表示计算机书籍,<font color=”blue”>61626</font>表示牛仔裤,本次实战的数据集中,这样的类目共有五千多种;
  2. 如果我们将这五千多种类目分成6个大类,例如<font color=”blue”>11120</font>属于教育类,<font color=”blue”>61626</font>属于服装类,那么应该有个大类和类目的关系表;
  3. 这个大类和类目的关系表在MySQL创建,表名叫<font color=”blue”>category_info</font>,建表语句如下:
CREATE TABLE `category_info`(
   `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
   `parent_id` bigint ,
   `category_id` bigint ,
   PRIMARY KEY ( `id` )
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
  1. 表<font color=”blue”>category_info</font>所有数据来自对原始数据中<font color=”blue”>category_id</font>字段的提取,并且随机将它们划分为6个大类,该表的数据请在我的GitHub下载:https://raw.githubusercontent.com/zq2599/blog_demos/master/files/category_info.sql
  2. 请在MySQL上建表<font color=”blue”>category_info</font>,并将上述数据全部写进去;
  3. 在Flink SQL Client执行以下语句创建这个维表,mysql信息请按您自己配置调整:
CREATE TABLE category_info (
    parent_id BIGINT, -- 商品大类
    category_id BIGINT  -- 商品详细类目
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://192.168.50.43:3306/flinkdemo',
    'connector.table' = 'category_info',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = '123456',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);
  1. 尝试联表查询:
SELECT U.user_id, U.item_id, U.behavior, C.parent_id, C.category_id
FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.category_id;
  1. 如下图,联表查询成功,每条记录都能对应大类:

    在这里插入图片描述

  2. 再试试联表统计,每个大类的总浏览量:
SELECT C.parent_id, COUNT(*) AS pv_count
FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.category_id
WHERE behavior = 'pv'
GROUP BY C.parent_id;
  1. 如下图,数据是动态更新的:

    在这里插入图片描述

  2. 执行以下语句,可以在统计时将大类ID转成中文名:
SELECT CASE C.parent_id
    WHEN 1 THEN '服饰鞋包'
    WHEN 2 THEN '家装家饰'
    WHEN 3 THEN '家电'
    WHEN 4 THEN '美妆'
    WHEN 5 THEN '母婴'
    WHEN 6 THEN '3C数码'
    ELSE '其他'
  END AS category_name,
COUNT(*) AS pv_count
FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.category_id
WHERE behavior = 'pv'
GROUP BY C.parent_id;
  1. 效果如下图:

在这里插入图片描述

至此,我们借助Flink SQL Client体验了Flink SQL丰富的功能,如果您也在学习Flink SQL,希望本文能给您一些参考;

欢迎关注公众号:程序员欣宸

微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界…
https://github.com/zq2599/blog_demos

扫描二维码推送至手机访问。

版权声明:本文由西安泽虎代运营发布,如需转载请注明出处。

转载请注明出处https://www.0291.com.cn/post/57033.html

相关文章

淘宝店铺单品宝打折怎么改,店铺打折怎么设置(淘宝店铺会员收费吗)

淘宝店铺单品宝打折怎么改,店铺打折怎么设置(淘宝店铺会员收费吗)

因此每个店铺均可以设置一个店铺最低折扣。若设置单品宝折扣出现以下提示“最终优惠力度应高于店铺最低折扣”,那就要进入店铺工具修改最低折扣数值了。...

天天说的能量,到底是什么?

提到和健康有关的事儿,人们总会提及能量! 对于的人,能量这个词更是不陌生。 可是,能量到底是什么呢? 问了问无所不能的度娘, 她说“能量是物质做功的能力,是人的活动能力”。 听起来还是很费解! 能量具体是干什么的? 从哪儿来? 打哪儿去? 干什...

APP如何实现产品的冷启动?

APP如何实现产品的冷启动?

本文以片刻APP为例。 1.  产品简述 1.1 产品背景 现代社会因为信息化的到来,给人们的生活带来了很多的变化。一方面,生活工作节奏越来越快,人们的时间、信息来源也都越来越碎片化;另一方面有些隐匿的情感表达,在那些社交场所里,不可说也不能说。很多人迫切想逃离微博、微...

iOS文件管理

iOS文件管理

– (void)_getSandBoxPath {     NSArray *pathArray = NSSearchPathForDirectoriesInDomains(NSCachesDirectory, NS...

高质量的营销型网站需满足以下三个条件。

高质量的营销型网站需满足以下三个条件。

随着互联网不断的发展,其中营销型网站建设的技术也是不断成熟,而用户的审美要求也愈发高端,这也就给企业网站制作提出了更高的要求。只有更高质量的网站,才更易被用户接受,从而实现更好的营销和转化,企业才能获取更多利润。所以怎样制作出更加优质,效果更好的网站,就成为了当下企业普遍关注的话题。对此网站建设点瑞...

SEO优化人员必备的优化工具集合。

SEO优化人员必备的优化工具集合。

以下seo优化工具是王泽鹏博客呕心沥血数年优化经验总结 seo从业人员必不可少的优化利器 看看有没有你没用过的? 一、综合查询工具 1、站长工具 2、5118 工具 3、爱站工具 4、去查网工具 5、web3389 站长工具 二、站长平台 1、百度站长 2、搜狗站长 3、...

现在,非常期待与您的又一次邂逅

我们努力让每一部企业宣传片和抖音短视频成为商业大片