系列文章目录
第一章 总体概述
第二章 在实体机上安装ubuntu
第三章 Windows远程连接ubuntu
第四章 使用Docker安装和运行EMQX
第五章 Docker卸载EMQX
第六章 EMQX客户端MQTTX Desktop的安装与使用
第七章 EMQX客户端MQTTX CLI的安装与使用
第八章 Wireshark工具的安装与使用
第九章 MQTT报文
第十章 MQTT消息质量等级QoS
第十一章 MQTT主题
第十二章 MQTT会话
第十三章 MQTT消息
第十四章 MQTT订阅
第十五章 EMQX访问控制
第十六章 EMQX黑名单与连接抖动检测
第十七章 数据集成
文章目录
- 系列文章目录
- 前言
- 1 数据集成概述
- 2 工作原理介绍
- 3 数据集成入门
- 4 连接器使用
- 4.1 案例一
- 4.2 案例二
- 5 SQL语法介绍
- 5.1 FROM、SELECT 和 WHERE 子句
- 5.2 FOREACH、DO 和 INCASE 子句
- 5.2.1 语法介绍
- 5.2.2 案例演示
- 5.3 CASE-WHEN 语法示例
- 5.4 内置SQL函数
- 6 Webhook
- 6.1 Webhook简介
- 6.2 Webhook演示
- 总结
前言
1 数据集成概述
思考问题:如何将一个物联网设备产生的数据传输到业务系统中?
上述方案的弊端:较为麻烦
数据集成:为 EMQX 引入了与外部数据系统的连接,从而以实现设备与其他业务系统的无缝集成。
EMQX的数据集成功能不单单可以快速的将物联网设备产生的数据传递到业务系统中,也可以和其他的外部数据系统进行集成,实现数据的快速传输。比如:从Kafka某一个主题中获取数据,然后将数据写入到Redis中。
2 工作原理介绍
sink和source组件:
数据集成使用 Sink
与 Source
组件与外部数据系统对接。
1、Sink 用于将消息发送到外部数据系统,例如 MySQL、Kafka 或 HTTP 服务等。
2、Source 则用于从外部数据系统接收消息,例如 MQTT、Kafka 或 GCP PubSub。
连接器:
连接器负责与外部数据系统的连接,用户可以为不同的外部数据系统创建不同的连接器,一个连接器可以为多个 Sink/Source 提供连接。
规则引擎:
规则引擎是 EMQX 内置基于 SQL 的数据处理组件,搭配数据集成无需编写代码即可实现一站式的 IoT 数据提取、过滤、转换、存储与处理,以加速应用集成和业务创新。
规则的组成:规则描述了 数据来源、数据处理过程、处理结果去向 三个方面:
1、数据来源:规则的数据源可以是消息或事件,也可以是外部的数据系统 (source)。规则通过 SQL 的 FROM 子句指定数据的来源;
2、数据处理过程:规则通过 SQL
语句和函数
来描述数据的处理过程。SQL 的 WHERE 子句用于过滤数据,SELECT 子句以及 SQL 函数用于提取和转换数据;
3、处理结果去向:规则可以定义一个或多个动作来处理 SQL 的输出结果。如果 SQL 执行通过,规则将按顺序执行相应的动作,比如将处理结果存储到数据库、或者重新
发布到另一个 MQTT 主题等。支持的动作如下:
- 消息重发布:将结果发布到指定 MQTT 主题
- 控制台输出:将结果输出到控制台或日志中
- 发送到各类 Sink:将结果发送到外部数据系统中,如 MQTT 服务,Kafka,PostgreSQL 等
3 数据集成入门
需求:将客户端发往’t/a’主题中的消息输出到EMQX的控制台
具体步骤:
1、进入到Dashboard中,依次点击"集成" ----> “规则” ----> “创建” 进入到创建规则的表单页面
2、添加动作输出
3、启动调试。
4、测试结果
5、开始创建
6、打开MQTTX,建立连接。
7、监听broker控制台日志
sudo docker ps -a //查看容器名称
sudo docker logs -f <容器名称> //监听当前容器控制台
8、MQTTX客户端开始发送。
9、控制台日志如下
4 连接器使用
4.1 案例一
需求:将客户端发往’t/b’主题中的消息输出到EMQX的控制台和Redis中
在ubuntu中使用docker部署Redis参考我的另一篇文章:Docker 安装 Redis 容器
具体步骤:
1、创建连接器
2、选择Redis,下一步。
3、填写配置信息,点击测试连接,并创建。
4、创建规则
5、点击创建
6、添加动作输出
这里添加两个动作输出:
一个控制台,一个redis
其中命令模板:
HSET emqx_messages:${clientid} username ${username} payload ${payload} timestamp ${timestamp}
7、使用MQTTX客户端测试
向t/b主题发送消息:
查看Tiny RDM:
控制台输出:
查看连接器拓扑图:
4.2 案例二
需求:将发往Kafka中的test_mqtt_topic
主题中的消息输出到EMQX的控制台和Redis中
注意服务器端需要在安全组中开放对应的端口号:2181、8048、9092
具体步骤:
1、在Kafka中创建test_mqtt_topic
主题
2、进入到Dashboard中,依次点击"集成" ----> “规则” ----> “创建” 进入到创建规则的表单页面,点击数据输入
添加一个source
3、在添加source动作的页面,添加连接器"+"号,添加kafka的连接器
4、在创建规则的表单页面,点击动作输出
添加控制台输出和Redis输出的sink
Redis的命令模板如下所示:
HSET kafka_mqtt:${topic} offset ${offset} value ${value}
命令参数可以通过控制台输出进行确定。
5、向Kafka中的test_mqtt_topic
发送消息
具体步骤:
5.1、创建一个基于spring boot 3.0.5构建的web项目,加入如下依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version>
</parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
5.2、在application.yml文件中添加如下配置
spring:kafka:producer:bootstrap-servers: 192.168.136.147:9092
5.3、编写启动类
@SpringBootApplication
public class MqttKafkaApplication {public static void main(String[] args) {SpringApplication.run(MqttKafkaApplication.class , args) ;}}
5.4 编写测试类,发送消息
@SpringBootTest(classes = MqttKafkaApplication.class)
public class MqttKafkaProducerTest {@Autowiredprivate KafkaTemplate<String , String> kafkaTemplate;@Testpublic void sendMsg() {kafkaTemplate.send("test_mqtt_topic" , "mqtt kafka producer msg....") ;}}
发送消息进行测试,观察Redis中的数据状态:
5 SQL语法介绍
SQL 处理结果将以 JSON 形式呈现在输出结果部分。SQL 处理结果中的所有字段都可以通过后续操作(内置操作或 Sink)以 ${key}
的形式进行引用。
5.1 FROM、SELECT 和 WHERE 子句
规则的 SQL 语句基本格式为:
SELECT <字段名> FROM <主题> [WHERE <条件>]
举例:
## SELECT 语句用于决定最终的输出结果里的字段。比如:
## 下面 SQL 的输出结果中将只有两个字段 "a" 和 "b":
SELECT a, b FROM "t/#"# 选取 username 为 'abc' 的终端发来的消息,输出结果为所有可用字段:
SELECT * FROM "#" WHERE username = 'abc'## 选取 clientid 为 'abc' 的终端发来的消息,输出结果将只有 cid 一个字段。
## 注意 cid 变量是在 SELECT 语句中定义的,故可在 WHERE 语句中使用:
SELECT clientid as cid FROM "#" WHERE cid = 'abc'## 选取 username 为 'abc' 的终端发来的消息,输出结果将只有 cid 一个字段。
## 注意虽然 SELECT 语句中只选取了 cid 一个字段,所有消息发布事件中的可用字段 (比如 clientid、username 等) 仍然可以在 WHERE 语句中使用:
SELECT clientid as cid FROM "#" WHERE username = 'abc'## 但下面这个 SQL 语句就不能工作了,因为变量 xyz 既不是消息发布事件中的可用字段,又没有在 SELECT 语句中定义:
SELECT clientid as cid FROM "#" WHERE xyz = 'abc'
FROM 语句用于选择事件来源。如果是消息发布则填写消息的主题,如果是事件则填写对应的事件主题。
5.2 FOREACH、DO 和 INCASE 子句
5.2.1 语法介绍
如果对于一个数组数据,想针对数组中的每个元素分别执行一些操作并执行 Actions,需要使用 FOREACH-DO-INCASE
语法。其基本格式为:
FOREACH <字段名> [DO <条件>] [INCASE <条件>] FROM <主题> [WHERE <条件>]FOREACH 子句用于选择需要做 foreach 操作的字段,注意选择出的字段必须为数组类型
DO 子句用于对 FOREACH 选择出来的数组中的每个元素进行变换,并选择出感兴趣的字段
INCASE 子句用于对 DO 选择出来的某个字段施加条件过滤FOREACHpayload.sensors as e ## 选择出的字段必须为数组类型
DO ## DO 相当于针对当前循环中对象的 SELECT 子句,决定最终的输出结果里的字段clientid,e.name as name,e.idx as idx
INCASE ## INCASE 相当于针对当前循环中对象的 WHERE 语句e.idx >= 1 ## 对DO选择出来的某个字段施加条件过滤
FROM "t/#" ## 子句将规则挂载到某个主题上
5.2.2 案例演示
假设有 ClientID 为 c_steve
、主题为 t/1
的消息,消息体为 JSON 格式,其中 sensors 字段为包含多个 Object 的数组:
{"date": "2024-07-05","sensors": [{"name": "a", "idx":0},{"name": "b", "idx":1},{"name": "c", "idx":2}]
}
示例 1:要求将 sensors 里的各个对象,分别作为数据输入重新发布消息到 sensors/${idx}
主题,内容为 ${name}
。即最终规则将会发出 3 条消息:
- 主题:sensors/0 内容:a
- 主题:sensors/1 内容:b
- 主题:sensors/2 内容:c
要完成这个规则,我们需要配置如下动作:
1、动作类型:消息重新发布 (republish)
2、目的主题:sensors/${idx}
3、目的 QoS:0
4、消息内容模板:${name}
以及如下 SQL 语句:
FOREACHpayload.sensors
FROM "t/#"
示例解析:
这个 SQL 中,FOREACH 子句指定需要进行遍历的数组 sensors,则选取结果为(json):
[{"name": "a","idx": 0},{"name": "b","idx": 1},{"name": "c","idx": 2}
]
FOREACH 语句将会对于结果数组里的每个对象分别执行 消息重新发布
动作,所以将会执行重新发布动作 3 次。
输出动作添加如下所示:
也可以对上述的案例进行改造,使用DO字句指定选择出感兴趣的字段,如下所示:
FOREACHpayload.sensors as e
DO e.name as name , e.idx as idx
FROM "t/1"
此时在指定输出动作的时候可以省略item:
示例 2:要求将 sensors 里的 idx
值大于或等于 1 的对象,分别作为数据输入重新发布消息到 sensors/${idx}
主题,内容为
clientid=${clientid},name=${name},date=${date}
。即最终规则将会发出 2 条消息:
- 主题:sensors/1 内容:clientid=c_steve,name=b,date=2020-04-24
- 主题:sensors/2 内容:clientid=c_steve,name=c,date=2020-04-24
要完成这个规则,我们需要配置如下动作:
1、动作类型:消息重新发布 (republish)
2、目的主题:sensors/${idx}
3、目的 QoS:0
4、消息内容模板:clientid=${clientid},name=${name},date=${date}
以及如下 SQL 语句:
FOREACHpayload.sensors as e
DOclientid , payload.date as date,e.idx as idx ,e.name as name
INCASEe.idx >= 1
FROM "t/#"
此时在指定输出动作的时候可以省略item:
5.3 CASE-WHEN 语法示例
CASE-WHEN语法和MySQL中的很类似,当满足某一个条件的时候,取指定的数据值,如下所示:
示例:将消息中 x 字段的值范围限定在 0~7 之间。
SELECTCASE WHEN payload.x < 0 THEN 0WHEN payload.x > 7 THEN 7ELSE payload.xEND as x
FROM "t/#"
假设消息为:
{"x": 8}
则上面的 SQL 输出为:
{"x": 7}
5.4 内置SQL函数
规则引擎提供了各种内置函数,您可以在 SQL 中使用这些函数实现基本的数据处理,包括 数学运算、数据类型判断、数据类型转换、字符串操作、映射操作、数组操作、
哈希、压缩与解压缩、位操作、位序列操作、编解码 以及 日期与时间转换。
官网地址:https://docs.emqx.com/zh/emqx/v5.6/data-integration/rule-sql-builtin-functions.html
举例说明:
FOREACHpayload.sensors as e
DO abs(-1) as abs,concat(e.name , 'xian') as address ,clientid ,e.name as name , e.idx as idx
INCASEe.idx >= 1
FROM "t/1"
向主题’t/1’发送如下消息:
{"date": "2024-07-05","sensors": [{"name": "a", "idx":0},{"name": "b", "idx":1},{"name": "c", "idx":2}]
}
观察控制台日志输出:
6 Webhook
6.1 Webhook简介
Webhook 提供了一种将 EMQX 客户端消息和事件集成到外部 HTTP 服务器的方法。
Webhook 是 EMQX 中开箱即用的功能。当客户端向特定主题发布消息,或执行特定操作时就会触发 Webhook,将事件数据和消息数据转发到预设的 HTTP 服务器中。
6.2 Webhook演示
具体步骤:
1、定义http的请求接口
@RestController
@RequestMapping(value = "/webHook")
public class WebHookController {@PostMapping(value = "/notify")public void notify(@RequestBody Map<Object , Object> body) {System.out.println(body);}}
2、在Dashboard中创建Webhook
3、通过MQTTX向a/1
主题发布消息,观察http服务控制台输出
总结
以上,就是数据集成的介绍。