On this page
遥测
一、遥测数据规划
1.遥测规划
- 遥测数据上传ThingsBoard平台
- ThingsBoard通过规则引擎整合元数据
- ThingsBoard通过规则引擎按设备类型对数据分类
- ThingsBoard通过规则引擎把消息路由到消息队列(RabbitMQ或Kafka、EMQX),根据设备类型建topic
- 消息队列(RabbitMQ)根据topic消费数据,保存到数据库
- 根据微服务划分创建数据库,根据时序数据规则创建表
二、准备工作
1.上传遥测
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
| package com.iothub.telemetry;
import com.iothub.mqtt.EmqClient;
import com.iothub.mqtt.MqttProperties;
import com.iothub.mqtt.QosEnum;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class Upload {
@Autowired
private EmqClient emqClient;
@Autowired
private MqttProperties properties;
@PostConstruct
public void init(){
//连接服务端
emqClient.connect(properties.getUsername(),properties.getPassword());
//订阅一个主题
//emqClient.subscribe("testtopic/#", QosEnum.QoS1);
}
@Scheduled(fixedRate = 2000)
public void publish(){
String data = getData(2);
emqClient.publish("v1/devices/me/telemetry",data,
QosEnum.QoS1,false);
}
private String getData(Integer type){
if (type == 1) {
// 携带时间戳
String data = "{\n" +
"\t\"ts\": 1451649600512,\n" +
"\t\"values\": {\n" +
"\t\t\"stringKey\": \"value1\",\n" +
"\t\t\"booleanKey\": true,\n" +
"\t\t\"doubleKey\": 42.0,\n" +
"\t\t\"longKey\": 73,\n" +
"\t\t\"jsonKey\": {\n" +
"\t\t\t\"someNumber\": 42,\n" +
"\t\t\t\"someArray\": [1, 2, 3],\n" +
"\t\t\t\"someNestedObject\": {\n" +
"\t\t\t\t\"key\": \"value\"\n" +
"\t\t\t}\n" +
"\t\t}\n" +
"\t}\n" +
"}";
return data;
} else if (type == 2) {
// 不携带时间戳
String data = "{\n" +
" \"stringKey\": \"value1\",\n" +
" \"booleanKey\": true,\n" +
" \"doubleKey\": 42.0,\n" +
" \"longKey\": 73,\n" +
" \"jsonKey\": {\n" +
" \"someNumber\": 42,\n" +
" \"someArray\": [1,2,3],\n" +
" \"someNestedObject\": {\"key\": \"value\"}\n" +
" }\n" +
"}";
return data;
}else {
// 数组
String data = "[{\"key1\":\"value1\"}, {\"key2\":true}]";
return data;
}
}
}
|
1
2
3
4
5
6
7
8
9
10
11
| server:
port: 8999
spring:
application:
name: tb-telemetry
mqtt:
broker-url: tcp://82.157.166.86:1883
client-id: emq-client-telemetry
username: LuUoLXi3DMqOPqDjm20C
password:
|
遥测数据
1
2
3
4
5
6
7
8
9
10
11
12
13
| {
"stringKey": "value1",
"booleanKey": true,
"doubleKey": 42.0,
"longKey": 73,
"jsonKey": {
"someNumber": 42,
"someArray": [1, 2, 3],
"someNestedObject": {
"key": "value"
}
}
}
|
元数据
1
2
3
4
5
| {
"deviceName": "Test-telemetry",
"deviceType": "default",
"ts": "1696730983079"
}
|
2.RabbitMQ
2.1.Docker部署RabbitMQ
1
2
3
4
5
6
| docker run -d --network host --restart=always --name rabbitmq rabbitmq:management
# 访问地址
http://82.157.166.86:15672/
# 账户/密码:
guest/guest
|
2.2. 创建直连交换机
1.创建直连交换机
2.创建队列
3.绑定队列
2.3.创建主题交换机
1.创建topic交换机
2.创建队列
3.绑定交换机
3.创建规则链
3.1.创建遥测规则链
3.2.整合元数据
1
2
3
4
5
6
7
| # copy keys
{
"deviceName": "Test-telemetry",
"deviceType": "default",
"ts": "1696730983079"
}
|
3.3.按设备类型消息分类
1
2
3
4
5
6
7
| # 按设备类型消息分类
if(msg.deviceType === 'test-telemetry') {
return ['test-telemetry'];
} else {
return ['default'];
}
|
三、上传遥测
1.创建设备
2.上传遥测数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| {
"stringKey": "value1",
"booleanKey": true,
"doubleKey": 42.0,
"longKey": 73,
"jsonKey": {
"someNumber": 42,
"someArray": [1, 2, 3],
"someNestedObject": {
"key": "value"
}
}
}
{
"stringKey": "value1",
"booleanKey": true,
"doubleKey": 42.0,
"longKey": 73
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
| @Component
public class Upload {
@Autowired
private EmqClient emqClient;
@Autowired
private MqttProperties properties;
@PostConstruct
public void init(){
//连接服务端
emqClient.connect(properties.getUsername(),properties.getPassword());
//订阅一个主题
//emqClient.subscribe("testtopic/#", QosEnum.QoS1);
}
@Scheduled(fixedRate = 2000)
public void publish(){
String data = getData(2);
emqClient.publish("v1/devices/me/telemetry",data,
QosEnum.QoS1,false);
}
private String getData(Integer type){
if (type == 1) {
// 携带时间戳
String data = "{\n" +
"\t\"ts\": 1451649600512,\n" +
"\t\"values\": {\n" +
"\t\t\"stringKey\": \"value1\",\n" +
"\t\t\"booleanKey\": true,\n" +
"\t\t\"doubleKey\": 42.0,\n" +
"\t\t\"longKey\": 73,\n" +
"\t\t\"jsonKey\": {\n" +
"\t\t\t\"someNumber\": 42,\n" +
"\t\t\t\"someArray\": [1, 2, 3],\n" +
"\t\t\t\"someNestedObject\": {\n" +
"\t\t\t\t\"key\": \"value\"\n" +
"\t\t\t}\n" +
"\t\t}\n" +
"\t}\n" +
"}";
return data;
} else if (type == 2) {
// 不携带时间戳
String data = "{\n" +
" \"stringKey\": \"value1\",\n" +
" \"booleanKey\": true,\n" +
" \"doubleKey\": 42.0,\n" +
" \"longKey\": 73,\n" +
" \"jsonKey\": {\n" +
" \"someNumber\": 42,\n" +
" \"someArray\": [1,2,3],\n" +
" \"someNestedObject\": {\"key\": \"value\"}\n" +
" }\n" +
"}";
return data;
}else {
// 数组
String data = "[{\"key1\":\"value1\"}, {\"key2\":true}]";
return data;
}
}
}
|
3.RabbitMQ
1.创建队列
telemetry-default-queue、telemetry-queue
2.创建交换机
4.配置规则引擎
4.1.整体配置
4.2.整个元数据
1
2
3
4
5
| {
"deviceName": "Test-telemetry",
"deviceType": "default",
"ts": "1696730983079"
}
|
4.3.按设备类型消息分类
1
2
3
4
5
| if(msg.deviceType === 'test-telemetry') {
return ['test-telemetry'];
} else {
return ['default'];
}
|
5.测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| {
"stringKey": "value1",
"booleanKey": true,
"doubleKey": 42.0,
"longKey": 73,
"jsonKey": {
"someNumber": 42,
"someArray": [1, 2, 3],
"someNestedObject": {
"key": "value"
}
},
"deviceType": "default",
"deviceName": "Test-telemetry",
"ts": "1451649600512"
}
|
1
2
3
4
5
6
7
8
9
| {
"stringKey": "value1",
"booleanKey": true,
"doubleKey": 42.0,
"longKey": 73,
"deviceType": "test-telemetry",
"deviceName": "Test-telemetry1",
"ts": "1696824491527"
}
|
四、REST API
1.遥测API