一、概述
1.遥测数据
ThingsBoard提供了一组与时序列数据相关的功能:
- 采集 基于各种协议和集成采集数据
- 存储 基于SQL(PostgreSQL)或NoSQL(Cassandra或Timescale)数据库存储时间序列数据
- 查询 基于API查询最新时序数据或指定时间范围内的所有数据
- 订阅 基于WebSockets订阅数据进行可视化或实时分析
- 可视化 基于仪表板和部件可视化时序数据
- 过滤和分析 基于规则引擎过滤和分析数据
- 生成 基于采集数据产生警报
- 转发 基于External规则节点将数据发送到外部系统(例如:Kafka或RabbitMQ规则节点)
2.数据点
ThingsBoard 将遥测(时序)数据视为带时间戳的键值对,将一个带时间戳的键值对称为数据点,键值对的数据结构具有简单灵活的特点可以无缝与物联网设备集;key始终是一个字符串而值可以是string、boolean、double、integer或者JSON。
以下示例使用内部数据格式设备本身可以使用各种协议和数据格式上传数据,更多详细信息请参见时间序列数据上传API 。
以下数据包含5个数据点: 温度 (double), 湿度 (integer), hvac启用(boolean), hvac状态(string)和配置(JSON):
1
2
3
4
5
6
7
8
9
10
11
{
"temperature": 42.2,
"humidity": 70,
"hvacEnabled": true,
"hvacState": "IDLE",
"configuration": {
"someNumber": 42,
"someArray": [1,2,3],
"someNestedObject": {"key": "value"}
}
}
你可能会注意到上面列出的JSON没有时间戳在这种情况下ThingsBoard使用当前服务器时间戳但是你可以在消息中包含时间戳信息。
请参见下面的例:
1
2
3
4
5
6
7
{
"ts": 1527863043000,
"values": {
"temperature": 42.2,
"humidity": 70
}
}
3.时序数据API
你可以使用内置的传输协议:
上面的数协议都支持JSON、Protobuf或自己的数据格式, 对于其他协议请查看“如何连接设备”指南。
4.MQTT协议
4.1.MQTT基础
MQTT是一种轻量级的发布-订阅消息传递协议,它可能最适合各种物联网设备。
你可以在此处找到有关MQTT的更多信息,ThingsBoard服务器支持QoS级别0(最多一次)和QoS级别1(至少一次)以及一组预定义主题的MQTT代理。
客户端
你可以在网上找到大量的MQTT客户端库,本文中的示例将基于Mosquitto和MQTT.js您可以使用我们的Hello World指南中的说明。
MQTT连接
我们将在本文中使用令牌凭据对进行设备访问,这些凭证稍后将称为$ACCESS_TOKEN应用程序需要发送用户名包含$ACCESS_TOKEN的MQTT CONNECT消息。
连接状态码说明:
- 0x00 连接成功 - 成功连接
- 0x04 连接失败 - 用户名或密码错误。
- 0x05 连接未授权 - -用户名包含无效的 $ACCESS_TOKEN。
Key-value格式
ThingsBoard支持以JSON格式的key-value字符串值可以是string、bool、float、long或者二进制格式的序列化字符串;有关更多详细信息请参见协议自定义。
1
2
3
4
5
6
7
8
9
10
11
12
13
{
"stringKey": "value1",
"booleanKey": true,
"doubleKey": 42.0,
"longKey": 73,
"jsonKey": {
"someNumber": 42,
"someArray": [1, 2, 3],
"someNestedObject": {
"key": "value"
}
}
}
4.2.遥测上传API
发布遥测数据到ThingsBoard服务端必须PUBLISH消息发送到下面主题:
1
v1/devices/me/telemetry
支持的最简单的数据格式是:
1
{"key1":"value1", "key2":"value2"}
或者
1
[{"key1":"value1"}, {"key2":"value2"}]
请注意 在这种情况下服务端时间戳将分配给上传的数据!
如果您的设备能够获得客户端时间戳,则可以使用以下格式:
1
{"ts":1451649600512, "values":{"key1":"value1", "key2":"value2"}}
在上面的示例中我们假设“1451649600512”是具有毫秒精度的Unix时间戳。 例如:值’1451649600512’对应于’2016年1月1日 星期五 12:00:00.512 GMT’
4.3.遥测数据格式
1.telemetry-data-as-object.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"ts": 1451649600512,
"values": {
"stringKey": "value1",
"booleanKey": true,
"doubleKey": 42.0,
"longKey": 73,
"jsonKey": {
"someNumber": 42,
"someArray": [1, 2, 3],
"someNestedObject": {
"key": "value"
}
}
}
}
2.telemetry-data-with-ts.json
1
2
3
4
5
6
7
8
9
10
11
{
"stringKey": "value1",
"booleanKey": true,
"doubleKey": 42.0,
"longKey": 73,
"jsonKey": {
"someNumber": 42,
"someArray": [1,2,3],
"someNestedObject": {"key": "value"}
}
}
3.telemetry-data-as-array.json
1
[{"key1":"value1"}, {"key2":true}]
5.时序数据表
5.1.时序数据表
- ts_kv:时序数据历史数据表
- ts_kv_dictionary:键值映射表
- ts_kv_latest:最新时序数据
1.ts_kv_dictionary
2.ts_kv
3.ts_kv_latest
5.2.数据表关系
- ts_kv_dictionary:定义key和key_id的关系
- 通过key_id到ts_kv表查询历史数据
- 通过key_id到ts_kv_latest表查询最新数据
二、MQTT上传遥测数据
1.环境准备
- 创建测试设备 test-3
- 创建测试工程 tb-telemetry
1.程序配置
1
2
3
4
5
mqtt:
broker-url: tcp://192.168.202.188:1883
client-id: emq-client-xxxxxx
username: FO3JsWBgdukOr9pZjhab
password:
2.程序源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package com.iiotos.telemetry;
import com.iiotos.mqtt.EmqClient;
import com.iiotos.mqtt.MqttProperties;
import com.iiotos.mqtt.QosEnum;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class Upload {
@Autowired
private EmqClient emqClient;
@Autowired
private MqttProperties properties;
@PostConstruct
public void init(){
//连接服务端
emqClient.connect(properties.getUsername(),properties.getPassword());
//订阅一个主题
//emqClient.subscribe("testtopic/#", QosEnum.QoS1);
}
@Scheduled(fixedRate = 2000)
public void publish(){
String data = getData(1);
emqClient.publish("v1/devices/me/telemetry",data,
QosEnum.QoS1,false);
}
private String getData(Integer type){
if (type == 1) {
// 携带时间戳
String data = "{\n" +
"\t\"ts\": 1451649600512,\n" +
"\t\"values\": {\n" +
"\t\t\"stringKey\": \"value1\",\n" +
"\t\t\"booleanKey\": true,\n" +
"\t\t\"doubleKey\": 42.0,\n" +
"\t\t\"longKey\": 73,\n" +
"\t\t\"jsonKey\": {\n" +
"\t\t\t\"someNumber\": 42,\n" +
"\t\t\t\"someArray\": [1, 2, 3],\n" +
"\t\t\t\"someNestedObject\": {\n" +
"\t\t\t\t\"key\": \"value\"\n" +
"\t\t\t}\n" +
"\t\t}\n" +
"\t}\n" +
"}";
return data;
} else if (type == 2) {
// 不携带时间戳
String data = "{\n" +
" \"stringKey\": \"value1\",\n" +
" \"booleanKey\": true,\n" +
" \"doubleKey\": 42.0,\n" +
" \"longKey\": 73,\n" +
" \"jsonKey\": {\n" +
" \"someNumber\": 42,\n" +
" \"someArray\": [1,2,3],\n" +
" \"someNestedObject\": {\"key\": \"value\"}\n" +
" }\n" +
"}";
return data;
}else {
// 数组
String data = "[{\"key1\":\"value1\"}, {\"key2\":true}]";
return data;
}
}
}
2.测试
2.1.携带时间戳数据
1.telemetry-data-as-object.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"ts": 1451649600512,
"values": {
"stringKey": "value1",
"booleanKey": true,
"doubleKey": 42.0,
"longKey": 73,
"jsonKey": {
"someNumber": 42,
"someArray": [1, 2, 3],
"someNestedObject": {
"key": "value"
}
}
}
}
历史数据相同时间戳覆盖
1
SELECT * from ts_kv where key > 56;
1
SELECT * from ts_kv_latest where key > 56;
2.2.不携带时间戳数据
2.telemetry-data-with-ts.json
1
2
3
4
5
6
7
8
9
10
11
{
"stringKey": "value1",
"booleanKey": true,
"doubleKey": 42.0,
"longKey": 73,
"jsonKey": {
"someNumber": 42,
"someArray": [1,2,3],
"someNestedObject": {"key": "value"}
}
}
历史数据
1
SELECT * from ts_kv where key > 56;
1
SELECT * from ts_kv_latest where key > 56;
2.3.数组数据
3.telemetry-data-as-array.json
1
[{"key1":"value1"}, {"key2":true}]