第十七章 数据集成

article/2025/6/26 6:33:49

系列文章目录

第一章 总体概述
第二章 在实体机上安装ubuntu
第三章 Windows远程连接ubuntu
第四章 使用Docker安装和运行EMQX
第五章 Docker卸载EMQX
第六章 EMQX客户端MQTTX Desktop的安装与使用
第七章 EMQX客户端MQTTX CLI的安装与使用
第八章 Wireshark工具的安装与使用
第九章 MQTT报文
第十章 MQTT消息质量等级QoS
第十一章 MQTT主题
第十二章 MQTT会话
第十三章 MQTT消息
第十四章 MQTT订阅
第十五章 EMQX访问控制
第十六章 EMQX黑名单与连接抖动检测
第十七章 数据集成


文章目录

  • 系列文章目录
  • 前言
  • 1 数据集成概述
  • 2 工作原理介绍
  • 3 数据集成入门
  • 4 连接器使用
    • 4.1 案例一
    • 4.2 案例二
  • 5 SQL语法介绍
    • 5.1 FROM、SELECT 和 WHERE 子句
    • 5.2 FOREACH、DO 和 INCASE 子句
      • 5.2.1 语法介绍
      • 5.2.2 案例演示
    • 5.3 CASE-WHEN 语法示例
    • 5.4 内置SQL函数
  • 6 Webhook
    • 6.1 Webhook简介
    • 6.2 Webhook演示
  • 总结


前言


1 数据集成概述

思考问题:如何将一个物联网设备产生的数据传输到业务系统中?
在这里插入图片描述
上述方案的弊端:较为麻烦

数据集成:为 EMQX 引入了与外部数据系统的连接,从而以实现设备与其他业务系统的无缝集成。
在这里插入图片描述
EMQX的数据集成功能不单单可以快速的将物联网设备产生的数据传递到业务系统中,也可以和其他的外部数据系统进行集成,实现数据的快速传输。比如:从Kafka某一个主题中获取数据,然后将数据写入到Redis中。

2 工作原理介绍

sink和source组件

数据集成使用 SinkSource 组件与外部数据系统对接。

1、Sink 用于将消息发送到外部数据系统,例如 MySQL、Kafka 或 HTTP 服务等。

2、Source 则用于从外部数据系统接收消息,例如 MQTT、Kafka 或 GCP PubSub。

连接器

连接器负责与外部数据系统的连接,用户可以为不同的外部数据系统创建不同的连接器,一个连接器可以为多个 Sink/Source 提供连接。
在这里插入图片描述
规则引擎

规则引擎是 EMQX 内置基于 SQL 的数据处理组件,搭配数据集成无需编写代码即可实现一站式的 IoT 数据提取、过滤、转换、存储与处理,以加速应用集成和业务创新。

在这里插入图片描述
规则的组成:规则描述了 数据来源数据处理过程处理结果去向 三个方面:
在这里插入图片描述
1、数据来源:规则的数据源可以是消息或事件,也可以是外部的数据系统 (source)。规则通过 SQL 的 FROM 子句指定数据的来源;

2、数据处理过程:规则通过 SQL 语句和函数来描述数据的处理过程。SQL 的 WHERE 子句用于过滤数据,SELECT 子句以及 SQL 函数用于提取和转换数据;

3、处理结果去向:规则可以定义一个或多个动作来处理 SQL 的输出结果。如果 SQL 执行通过,规则将按顺序执行相应的动作,比如将处理结果存储到数据库、或者重新

发布到另一个 MQTT 主题等。支持的动作如下:

  • 消息重发布:将结果发布到指定 MQTT 主题
  • 控制台输出:将结果输出到控制台或日志中
  • 发送到各类 Sink:将结果发送到外部数据系统中,如 MQTT 服务,Kafka,PostgreSQL 等

3 数据集成入门

需求:将客户端发往’t/a’主题中的消息输出到EMQX的控制台

具体步骤:

1、进入到Dashboard中,依次点击"集成" ----> “规则” ----> “创建” 进入到创建规则的表单页面
在这里插入图片描述

2、添加动作输出
在这里插入图片描述

3、启动调试。
在这里插入图片描述
4、测试结果
在这里插入图片描述
5、开始创建
在这里插入图片描述
在这里插入图片描述

6、打开MQTTX,建立连接。
在这里插入图片描述
7、监听broker控制台日志

sudo docker ps -a	//查看容器名称
sudo docker logs -f <容器名称>	//监听当前容器控制台

在这里插入图片描述
8、MQTTX客户端开始发送。
在这里插入图片描述
9、控制台日志如下
在这里插入图片描述


4 连接器使用

4.1 案例一

需求:将客户端发往’t/b’主题中的消息输出到EMQX的控制台和Redis中

在ubuntu中使用docker部署Redis参考我的另一篇文章:Docker 安装 Redis 容器

具体步骤:

1、创建连接器
在这里插入图片描述

2、选择Redis,下一步。
在这里插入图片描述
3、填写配置信息,点击测试连接,并创建。
在这里插入图片描述

在这里插入图片描述

4、创建规则
在这里插入图片描述
5、点击创建
在这里插入图片描述
6、添加动作输出
这里添加两个动作输出:
一个控制台,一个redis

其中命令模板:

HSET emqx_messages:${clientid} username ${username} payload ${payload} timestamp ${timestamp}

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
7、使用MQTTX客户端测试
向t/b主题发送消息:
在这里插入图片描述
查看Tiny RDM:
在这里插入图片描述
控制台输出:
在这里插入图片描述

查看连接器拓扑图:
在这里插入图片描述

4.2 案例二

需求:将发往Kafka中的test_mqtt_topic主题中的消息输出到EMQX的控制台和Redis中

注意服务器端需要在安全组中开放对应的端口号:218180489092

具体步骤:

1、在Kafka中创建test_mqtt_topic主题

在这里插入图片描述
2、进入到Dashboard中,依次点击"集成" ----> “规则” ----> “创建” 进入到创建规则的表单页面,点击数据输入添加一个source
在这里插入图片描述
3、在添加source动作的页面,添加连接器"+"号,添加kafka的连接器
在这里插入图片描述
4、在创建规则的表单页面,点击动作输出添加控制台输出和Redis输出的sink

Redis的命令模板如下所示:

HSET kafka_mqtt:${topic} offset ${offset} value ${value}

命令参数可以通过控制台输出进行确定。

5、向Kafka中的test_mqtt_topic发送消息

具体步骤:

5.1、创建一个基于spring boot 3.0.5构建的web项目,加入如下依赖

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version>
</parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>

5.2、在application.yml文件中添加如下配置

spring:kafka:producer:bootstrap-servers: 192.168.136.147:9092

5.3、编写启动类

@SpringBootApplication
public class MqttKafkaApplication {public static void main(String[] args) {SpringApplication.run(MqttKafkaApplication.class , args) ;}}

5.4 编写测试类,发送消息

@SpringBootTest(classes = MqttKafkaApplication.class)
public class MqttKafkaProducerTest {@Autowiredprivate KafkaTemplate<String , String> kafkaTemplate;@Testpublic void sendMsg() {kafkaTemplate.send("test_mqtt_topic" , "mqtt kafka producer msg....") ;}}

发送消息进行测试,观察Redis中的数据状态:

在这里插入图片描述

5 SQL语法介绍

SQL 处理结果将以 JSON 形式呈现在输出结果部分。SQL 处理结果中的所有字段都可以通过后续操作(内置操作或 Sink)以 ${key}的形式进行引用。

5.1 FROM、SELECT 和 WHERE 子句

规则的 SQL 语句基本格式为:

SELECT <字段名> FROM <主题> [WHERE <条件>]

举例:

## SELECT 语句用于决定最终的输出结果里的字段。比如:
## 下面 SQL 的输出结果中将只有两个字段 "a" 和 "b":
SELECT a, b FROM "t/#"# 选取 username 为 'abc' 的终端发来的消息,输出结果为所有可用字段:
SELECT * FROM "#" WHERE username = 'abc'## 选取 clientid 为 'abc' 的终端发来的消息,输出结果将只有 cid 一个字段。
## 注意 cid 变量是在 SELECT 语句中定义的,故可在 WHERE 语句中使用:
SELECT clientid as cid FROM "#" WHERE cid = 'abc'## 选取 username 为 'abc' 的终端发来的消息,输出结果将只有 cid 一个字段。
## 注意虽然 SELECT 语句中只选取了 cid 一个字段,所有消息发布事件中的可用字段 (比如 clientid、username 等) 仍然可以在 WHERE 语句中使用:
SELECT clientid as cid FROM "#" WHERE username = 'abc'## 但下面这个 SQL 语句就不能工作了,因为变量 xyz 既不是消息发布事件中的可用字段,又没有在 SELECT 语句中定义:
SELECT clientid as cid FROM "#" WHERE xyz = 'abc'

FROM 语句用于选择事件来源。如果是消息发布则填写消息的主题,如果是事件则填写对应的事件主题。

5.2 FOREACH、DO 和 INCASE 子句

5.2.1 语法介绍

如果对于一个数组数据,想针对数组中的每个元素分别执行一些操作并执行 Actions,需要使用 FOREACH-DO-INCASE 语法。其基本格式为:

FOREACH <字段名> [DO <条件>] [INCASE <条件>] FROM <主题> [WHERE <条件>]FOREACH 子句用于选择需要做 foreach 操作的字段,注意选择出的字段必须为数组类型
DO 子句用于对 FOREACH 选择出来的数组中的每个元素进行变换,并选择出感兴趣的字段
INCASE 子句用于对 DO 选择出来的某个字段施加条件过滤FOREACHpayload.sensors as e ## 选择出的字段必须为数组类型
DO                       ## DO 相当于针对当前循环中对象的 SELECT 子句,决定最终的输出结果里的字段clientid,e.name as name,e.idx as idx
INCASE                  ## INCASE 相当于针对当前循环中对象的 WHERE 语句e.idx >= 1          ## 对DO选择出来的某个字段施加条件过滤
FROM "t/#"              ## 子句将规则挂载到某个主题上

5.2.2 案例演示

假设有 ClientID 为 c_steve、主题为 t/1 的消息,消息体为 JSON 格式,其中 sensors 字段为包含多个 Object 的数组:

{"date": "2024-07-05","sensors": [{"name": "a", "idx":0},{"name": "b", "idx":1},{"name": "c", "idx":2}]
}

示例 1:要求将 sensors 里的各个对象,分别作为数据输入重新发布消息到 sensors/${idx} 主题,内容为 ${name}。即最终规则将会发出 3 条消息:

  1. 主题:sensors/0 内容:a
  2. 主题:sensors/1 内容:b
  3. 主题:sensors/2 内容:c

要完成这个规则,我们需要配置如下动作:

1、动作类型:消息重新发布 (republish)

2、目的主题:sensors/${idx}

3、目的 QoS:0

4、消息内容模板:${name}

以及如下 SQL 语句:

FOREACHpayload.sensors
FROM "t/#"

示例解析:

这个 SQL 中,FOREACH 子句指定需要进行遍历的数组 sensors,则选取结果为(json):

[{"name": "a","idx": 0},{"name": "b","idx": 1},{"name": "c","idx": 2}
]

FOREACH 语句将会对于结果数组里的每个对象分别执行 消息重新发布 动作,所以将会执行重新发布动作 3 次。

输出动作添加如下所示:

在这里插入图片描述
也可以对上述的案例进行改造,使用DO字句指定选择出感兴趣的字段,如下所示:

FOREACHpayload.sensors as e
DO e.name as name , e.idx as idx
FROM "t/1"

此时在指定输出动作的时候可以省略item:
在这里插入图片描述
示例 2:要求将 sensors 里的 idx 值大于或等于 1 的对象,分别作为数据输入重新发布消息到 sensors/${idx} 主题,内容为

clientid=${clientid},name=${name},date=${date}。即最终规则将会发出 2 条消息:

  1. 主题:sensors/1 内容:clientid=c_steve,name=b,date=2020-04-24
  2. 主题:sensors/2 内容:clientid=c_steve,name=c,date=2020-04-24

要完成这个规则,我们需要配置如下动作:

1、动作类型:消息重新发布 (republish)

2、目的主题:sensors/${idx}

3、目的 QoS:0

4、消息内容模板:clientid=${clientid},name=${name},date=${date}

以及如下 SQL 语句:

FOREACHpayload.sensors as e
DOclientid , payload.date as date,e.idx as idx ,e.name as name
INCASEe.idx >= 1  
FROM "t/#"

此时在指定输出动作的时候可以省略item:
在这里插入图片描述

5.3 CASE-WHEN 语法示例

CASE-WHEN语法和MySQL中的很类似,当满足某一个条件的时候,取指定的数据值,如下所示:

示例:将消息中 x 字段的值范围限定在 0~7 之间。

SELECTCASE WHEN payload.x < 0 THEN 0WHEN payload.x > 7 THEN 7ELSE payload.xEND as x
FROM "t/#"

假设消息为:

{"x": 8}

则上面的 SQL 输出为:

{"x": 7}

5.4 内置SQL函数

规则引擎提供了各种内置函数,您可以在 SQL 中使用这些函数实现基本的数据处理,包括 数学运算、数据类型判断、数据类型转换、字符串操作、映射操作、数组操作、

哈希、压缩与解压缩、位操作、位序列操作、编解码 以及 日期与时间转换。

官网地址:https://docs.emqx.com/zh/emqx/v5.6/data-integration/rule-sql-builtin-functions.html

举例说明:

FOREACHpayload.sensors as e
DO abs(-1) as abs,concat(e.name , 'xian') as address ,clientid ,e.name as name , e.idx as idx
INCASEe.idx >= 1
FROM "t/1"

向主题’t/1’发送如下消息:

{"date": "2024-07-05","sensors": [{"name": "a", "idx":0},{"name": "b", "idx":1},{"name": "c", "idx":2}]
}

观察控制台日志输出:
在这里插入图片描述

6 Webhook

6.1 Webhook简介

Webhook 提供了一种将 EMQX 客户端消息和事件集成到外部 HTTP 服务器的方法。

Webhook 是 EMQX 中开箱即用的功能。当客户端向特定主题发布消息,或执行特定操作时就会触发 Webhook,将事件数据和消息数据转发到预设的 HTTP 服务器中。
在这里插入图片描述

6.2 Webhook演示

具体步骤:

1、定义http的请求接口

@RestController
@RequestMapping(value = "/webHook")
public class WebHookController {@PostMapping(value = "/notify")public void notify(@RequestBody Map<Object , Object> body) {System.out.println(body);}}

2、在Dashboard中创建Webhook
在这里插入图片描述
3、通过MQTTX向a/1主题发布消息,观察http服务控制台输出


总结

以上,就是数据集成的介绍。


http://www.hkcw.cn/article/EKMirWVbmL.shtml

相关文章

榴莲能从奢侈果变成亲民果吗 供应增加价格下降

夏季是各类水果集中上市的季节,榴莲爱好者们最近有口福了,市场上榴莲价格大幅下降,甚至出现“腰斩”的情况。这背后的原因是什么?榴莲是否会从“奢侈果”变成“亲民果”?走进浙江湖州的一家榴莲批发店,可以看到榴莲成堆地摆放在货架上,老板肖女士正在通过直播销售榴莲。…

育碧“夭折”的沙盒游戏概念图曝光:类似《我的世界》风格 创意总监离职

育碧在经历长期财务困境导致的大规模重组背景下,陆续叫停了多个项目,包括《全境封锁:郊野行动》《XDefiant》以及“Project Q”。当地时间1日,外媒MP1ST披露了育碧另一个尚未官宣但已夭折的计划——“Project Renaissance”。早在2023年,Kotaku曾报道该项目的存在。据报道…

菲律宾拉欧盟搞“安全与防务对话” 深化防务合作应对挑战

在南海紧张局势不断升级之际,菲律宾外长马纳罗与欧盟外交与安全政策高级代表卡拉斯在马尼拉召开联合记者会,宣布双方同意建立安全与防务对话机制。该机制旨在应对网络攻击、外来干预和信息操纵等跨境新兴安全威胁和挑战。安全与防务对话将以现有机制为基础,重点关注海域意识…

谈判现场曝光 俄团长冷眼审视乌代表表情严肃!

谈判现场曝光 俄团长冷眼审视乌代表。当地时间6月2日,俄罗斯代表团与乌克兰代表团,抵达土耳其伊斯坦布尔举行第二轮俄乌谈判。随后俄曝光了俄乌谈判现场!谈判现场曝光 俄团长冷眼审视乌代表谈判现场曝光 俄团长冷眼审视乌代表谈判现场曝光 俄团长冷眼审视乌代表谈判现场曝光…

ubuntu24.04 查看时区并设置Asia/Shanghai时区

一、查看当前系统设置的时区 timedatectl 二、修改为Asia/Shanghai sudo timedatectl set-timezone Asia/Shanghai sudo nano /etc/timezone #修改内容为&#xff1a;Asia/Shanghai sudo dpkg-reconfigure --frontend noninteractive tzdata

贾冰瘦了 网友:压力给到沈腾 减肥热潮席卷娱乐圈

5月31日,演员贾冰的妻子发布了一段视频,祝福大家端午节快乐,并配文“从此我家多了个瘦子”。在两人合影中,贾冰明显瘦了很多。评论区里,网友们纷纷询问他如何瘦这么多,甚至有人表示他瘦得都脱相了。贾冰妻子回复说,主要是通过少吃(一天一顿的那种)和运动来达到减肥效果…

5月100个城市新房均价上涨 政策利好支撑需求释放

5月100个城市新房均价上涨 政策利好支撑需求释放!6月1日,中指研究院发布《中国房地产指数系统百城价格指数报告》。报告显示,5月全国100个城市新建住宅平均价格环比上涨0.30%,同比上涨2.56%。从涨跌城市来看,33个城市环比上涨,54个城市环比下跌,13个城市持平。二手房价格…

没有假球 全是世仇 苏超火出圈:比赛第一,友谊第十四

“友谊第一,比赛第二。” “等等!重来!” “友谊第一,比赛第十四!” “不对!再来!” “比赛第一,友谊第十四。”近日,被称为“苏超”的江苏省首届城市足球联赛爆火出圈。盐城现场观众达到22613人,网友感叹上座率堪比世界杯。据闪电新闻报道,“苏超”观众数量甚至超过…

这种凉鞋易致孩子性早熟还有毒 邻苯超标严重

这种凉鞋易致孩子性早熟还有毒!目前正值儿童凉鞋购买高峰期,各种款式的儿童凉拖鞋如水晶鞋、洞洞鞋、果冻鞋等因其外观漂亮可爱且穿着方便而受到小朋友和家长们的喜爱。然而,在孩子们穿着这些漂亮的凉鞋奔跑嬉戏时,潜在的安全隐患也悄然存在。由于材质差异,如果不仔细甄别…

中小学生攀比起"体考神器"碳板鞋 校园内的新潮流

最近,碳板鞋在校园里变得非常流行。这类鞋子外观炫酷、科技感十足,受到很多学生的喜爱。一些家长认为这类专业跑步鞋能帮助孩子提高体育成绩,因此不惜重金购买高端碳板鞋。然而,这种现象也引发了一些问题。碳板跑鞋是一种在中底嵌入碳纤维板的跑步鞋,设计初衷是为了提高精…

金价大涨!金饰价格重回1000元/克 国际金价反弹带动

美东时间5月29日,国际金价出现反弹。现货黄金价格上涨0.96%,达到3317.8美元/盎司;COMEX黄金期货上涨0.61%,报3342.6美元/盎司;COMEX白银期货则上涨0.84%,报33.44美元/盎司。次日早间,金价再度下跌。现货黄金价格微跌0.02%,报3316.6美元/盎司;COMEX黄金期货下跌0.17%,…

一步一步配置 Ubuntu Server 的 NodeJS 服务器详细实录——4. 配置服务器终端环境 zsh , oh my zsh, vim

前言 通过前面几篇文章&#xff0c;我们顺利的 安装了 ubuntu server 服务器&#xff0c;并且配置好了 ssh 免密登录服务器&#xff0c;也安装好了 服务器常用软件安装,接下来&#xff0c;我们要仔细的配置一下我们的终端环境&#xff0c;让服务器的终端更加好用。 一般情况下…

AI大模型开发架构设计(21)——LLM大模型构建AI Agents案例实战

文章目录 1 AI Agent 智能体以及应用场景剖析什么是 AI Agent 智能体?什么是大语音模型?LLM 大模型存在的局限性LLM Agent是什么?2 基于 LLM 大模型的 AI Agent 技术架构深度剖析规划能力是什么?记忆能力是什么?工具使用能力是什么?1 AI Agent 智能体以及应用场景剖析 什…

(Python)列表的操作(增删改查、排序)

一、增 append()【整体添加&#xff0c;将一个元素整体添加】 2.extend()【分散添加&#xff0c;将元素逐一添加】 insert()【在制定位置插入元素】 二、删 del【根据下标删除】 pop【根据下标删除】 remove【根据值进行删除&#xff0c;默认指定删除第一个出现的元素】 三、…

【更正补全】edu教育申请通过方案

见字如面&#xff0c;竹相左边 只分享验证可行的前沿技术。明年还要做设计 端午出差前我申请了3个谷歌账号&#xff0c;用来测试北卡莱纳州立大学申请edu教育邮箱。很可惜直到儿童节当天都没有收到后续的邮件。 但是经过我的反复对比研究&#xff0c;我找到了更正的方案。特…

pikachu靶场通关笔记10 XSS关卡06-XSS之盲打

目录 一、XSS盲打 二、源码分析 1、进入靶场 2、源码分析 3、渗透思路 三、渗透实战 1、探测是否有过滤 2、管理员端查看输出 3、盲打页面注入Payload 4、管理员查看攻击效果 本系列为通过《pikachu靶场通关笔记》的XSS关卡(共10关&#xff09;渗透集合&#xff0c…

抛砖引玉:RadarDet4D,NuScenes数据集Radar模态目标检测第二名(即将开源)

这几年一直在关注自动驾驶3D目标检测相关的研究。在NuScenes数据集上有很多经典的模型被提出并得到了验证&#xff0c;纯视觉3D目标检测经典的方法有BEVFormer、BEVDet系列、DETR3D、Sparse4D等工作&#xff0c;基于LiDAR的有CenterPoint、多模态有BEVFusion、DAL、UniTR等。 …

《多状态DP:状态设计与状态转移方程速成指南》​

1.按摩师 题目链接&#xff1a;面试题 17.16. 按摩师 - 力扣&#xff08;LeetCode&#xff09; 题目描述&#xff1a;从一个预约请求队列中&#xff0c;找出一个总预约时间最长的预约集合&#xff0c;不能选择相邻位置的预约 算法讲解&#xff1a;动态规划 1.状态表示&#…

Spring Cloud 开发入门:环境搭建与微服务项目实战(上)

一、开发环境搭建 1. JDK 安装与版本选择 版本选择解析 Java 是 Spring Cloud 微服务开发的基础&#xff0c;选择合适的 JDK 版本至关重要&#xff0c;特别是在框架兼容性和生产环境稳定性方面。 &#xff08;1&#xff09;主流 JDK 版本对比 版本发布年份支持状态特点简述J…

PINN for PDE(偏微分方程)3 - 正向问题 - Burgers’ equation

PINN for PDE(偏微分方程)3 - 正向问题 - Burgers’ equation 目录 PINN for PDE(偏微分方程)3 - 正向问题 - Burgers’ equation一、什么是PINN的正问题二、求解的实际例子 - Burgers’ equation2.1 Burgers方程2.2 无解析解解决办法 - 龙哥库达&#xff08;Runge-Kutta 4th O…