ThingsBoard
开源文化 ThingsBoard 开源中间件 Kubernetes DevOps KubeEdge EdgeX Foundry Node-RED
Documentation > 二次开发 > 遥测

On this page

遥测

一、遥测数据规划

1.遥测规划

  1. 遥测数据上传ThingsBoard平台
  2. ThingsBoard通过规则引擎整合元数据
  3. ThingsBoard通过规则引擎按设备类型对数据分类
  4. ThingsBoard通过规则引擎把消息路由到消息队列(RabbitMQ或Kafka、EMQX),根据设备类型建topic
  5. 消息队列(RabbitMQ)根据topic消费数据,保存到数据库
  6. 根据微服务划分创建数据库,根据时序数据规则创建表

二、准备工作

1.上传遥测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package com.iothub.telemetry;
import com.iothub.mqtt.EmqClient;
import com.iothub.mqtt.MqttProperties;
import com.iothub.mqtt.QosEnum;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;

@Component
public class Upload {

    @Autowired
    private EmqClient emqClient;

    @Autowired
    private MqttProperties properties;

    @PostConstruct
    public void init(){
        //连接服务端
        emqClient.connect(properties.getUsername(),properties.getPassword());
        //订阅一个主题
        //emqClient.subscribe("testtopic/#", QosEnum.QoS1);
    }

    @Scheduled(fixedRate = 2000)
    public void publish(){

        String data = getData(2);

        emqClient.publish("v1/devices/me/telemetry",data,
                QosEnum.QoS1,false);
    }

    private String getData(Integer type){

        if (type == 1) {
            // 携带时间戳
            String data = "{\n" +
                    "\t\"ts\": 1451649600512,\n" +
                    "\t\"values\": {\n" +
                    "\t\t\"stringKey\": \"value1\",\n" +
                    "\t\t\"booleanKey\": true,\n" +
                    "\t\t\"doubleKey\": 42.0,\n" +
                    "\t\t\"longKey\": 73,\n" +
                    "\t\t\"jsonKey\": {\n" +
                    "\t\t\t\"someNumber\": 42,\n" +
                    "\t\t\t\"someArray\": [1, 2, 3],\n" +
                    "\t\t\t\"someNestedObject\": {\n" +
                    "\t\t\t\t\"key\": \"value\"\n" +
                    "\t\t\t}\n" +
                    "\t\t}\n" +
                    "\t}\n" +
                    "}";

            return data;

        } else if (type == 2) {
            // 不携带时间戳
            String data = "{\n" +
                    "  \"stringKey\": \"value1\",\n" +
                    "  \"booleanKey\": true,\n" +
                    "  \"doubleKey\": 42.0,\n" +
                    "  \"longKey\": 73,\n" +
                    "  \"jsonKey\": {\n" +
                    "    \"someNumber\": 42,\n" +
                    "    \"someArray\": [1,2,3],\n" +
                    "    \"someNestedObject\": {\"key\": \"value\"}\n" +
                    "  }\n" +
                    "}";

            return data;
        }else {
            // 数组
            String data = "[{\"key1\":\"value1\"}, {\"key2\":true}]";
            return data;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
server:
  port: 8999
spring:
  application:
    name: tb-telemetry

mqtt:
  broker-url: tcp://82.157.166.86:1883
  client-id: emq-client-telemetry
  username: LuUoLXi3DMqOPqDjm20C
  password:

遥测数据

1
2
3
4
5
6
7
8
9
10
11
12
13
{
    "stringKey": "value1",
    "booleanKey": true,
    "doubleKey": 42.0,
    "longKey": 73,
    "jsonKey": {
        "someNumber": 42,
        "someArray": [1, 2, 3],
        "someNestedObject": {
            "key": "value"
        }
    }
}

元数据

1
2
3
4
5
{
    "deviceName": "Test-telemetry",
    "deviceType": "default",
    "ts": "1696730983079"
}

2.RabbitMQ

2.1.Docker部署RabbitMQ

1
2
3
4
5
6
docker run -d  --network host --restart=always --name rabbitmq rabbitmq:management

# 访问地址
http://82.157.166.86:15672/ 
# 账户/密码:
guest/guest 

2.2. 创建直连交换机

1.创建直连交换机

2.创建队列

3.绑定队列

2.3.创建主题交换机

1.创建topic交换机

2.创建队列

3.绑定交换机

3.创建规则链

3.1.创建遥测规则链

3.2.整合元数据

1
2
3
4
5
6
7
# copy keys

{
    "deviceName": "Test-telemetry",
    "deviceType": "default",
    "ts": "1696730983079"
}

3.3.按设备类型消息分类

1
2
3
4
5
6
7
# 按设备类型消息分类

if(msg.deviceType === 'test-telemetry') {
    return ['test-telemetry'];
} else {
    return ['default'];
}

三、上传遥测

1.创建设备

2.上传遥测数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
    "stringKey": "value1",
    "booleanKey": true,
    "doubleKey": 42.0,
    "longKey": 73,
    "jsonKey": {
        "someNumber": 42,
        "someArray": [1, 2, 3],
        "someNestedObject": {
            "key": "value"
        }
    }
}


{
	"stringKey": "value1",
	"booleanKey": true,
	"doubleKey": 42.0,
	"longKey": 73
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@Component
public class Upload {

    @Autowired
    private EmqClient emqClient;

    @Autowired
    private MqttProperties properties;

    @PostConstruct
    public void init(){
        //连接服务端
        emqClient.connect(properties.getUsername(),properties.getPassword());
        //订阅一个主题
        //emqClient.subscribe("testtopic/#", QosEnum.QoS1);
    }

    @Scheduled(fixedRate = 2000)
    public void publish(){

        String data = getData(2);

        emqClient.publish("v1/devices/me/telemetry",data,
                QosEnum.QoS1,false);
    }

    private String getData(Integer type){

        if (type == 1) {
            // 携带时间戳
            String data = "{\n" +
                    "\t\"ts\": 1451649600512,\n" +
                    "\t\"values\": {\n" +
                    "\t\t\"stringKey\": \"value1\",\n" +
                    "\t\t\"booleanKey\": true,\n" +
                    "\t\t\"doubleKey\": 42.0,\n" +
                    "\t\t\"longKey\": 73,\n" +
                    "\t\t\"jsonKey\": {\n" +
                    "\t\t\t\"someNumber\": 42,\n" +
                    "\t\t\t\"someArray\": [1, 2, 3],\n" +
                    "\t\t\t\"someNestedObject\": {\n" +
                    "\t\t\t\t\"key\": \"value\"\n" +
                    "\t\t\t}\n" +
                    "\t\t}\n" +
                    "\t}\n" +
                    "}";

            return data;
        } else if (type == 2) {
            // 不携带时间戳
            String data = "{\n" +
                    "  \"stringKey\": \"value1\",\n" +
                    "  \"booleanKey\": true,\n" +
                    "  \"doubleKey\": 42.0,\n" +
                    "  \"longKey\": 73,\n" +
                    "  \"jsonKey\": {\n" +
                    "    \"someNumber\": 42,\n" +
                    "    \"someArray\": [1,2,3],\n" +
                    "    \"someNestedObject\": {\"key\": \"value\"}\n" +
                    "  }\n" +
                    "}";

            return data;
        }else {
            // 数组
            String data = "[{\"key1\":\"value1\"}, {\"key2\":true}]";
            return data;
        }
    }
}

3.RabbitMQ

1.创建队列

telemetry-default-queue、telemetry-queue

2.创建交换机

4.配置规则引擎

4.1.整体配置

4.2.整个元数据

1
2
3
4
5
{
    "deviceName": "Test-telemetry",
    "deviceType": "default",
    "ts": "1696730983079"
}

4.3.按设备类型消息分类

1
2
3
4
5
if(msg.deviceType === 'test-telemetry') {
    return ['test-telemetry'];
} else {
    return ['default'];
}

5.测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
	"stringKey": "value1",
	"booleanKey": true,
	"doubleKey": 42.0,
	"longKey": 73,
	"jsonKey": {
		"someNumber": 42,
		"someArray": [1, 2, 3],
		"someNestedObject": {
			"key": "value"
		}
	},
	"deviceType": "default",
	"deviceName": "Test-telemetry",
	"ts": "1451649600512"
}
1
2
3
4
5
6
7
8
9
{
	"stringKey": "value1",
	"booleanKey": true,
	"doubleKey": 42.0,
	"longKey": 73,
	"deviceType": "test-telemetry",
	"deviceName": "Test-telemetry1",
	"ts": "1696824491527"
}

四、REST API

1.遥测API