On this page
网关(Gateway API)
一、网关规划
1.网关规划
- Gateway API可以定制开发网关
- 能够实现:设备链接、遥测、属性、服务端RPC(单向、双向)
- 平台端设备可以认为网关是透明的,不用关注网关
二、准备工作
1.RPC
1.1.官网文档
1
2
3
| # MQTT Gateway API Reference
https://thingsboard.io/docs/reference/gateway-mqtt-api/
|
1.2.Server-side RPC
In order to subscribe to RPC commands from the server, send SUBSCRIBE message to the following topic:
and expect messages with individual commands in the following format:
1
| {"device": "Device A", "data": {"id": $request_id, "method": "toggle_gpio", "params": {"pin":1}}}
|
Once command is processed by device, gateway can send commands back using following format:
1
| {"device": "Device A", "id": $request_id, "data": {"success": true}}
|
where $request_id is your integer request identifier, Device A is your device name and method is your RPC method name.
1.3.RPC开发
-
设备端(网关)订阅:v1/gateway/rpc
-
设备端配置网关访问令牌
-
应用端调用REST API接口
-
REST API数据格式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| # 发送数据
{
"method": "getValue",
"params": {
"pin": 7,
"value": 1
},
"persistent": false,
"timeout": 5000
}
# 接收数据
{
"device": "Device A",
"data": {
"id": 21,
"method": "setValue",
"params": {
"pin": 7,
"value": 1
}
}
}
|
-
双向RPC返回数据格式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| # 发送数据
{
"method": "getValue",
"params": {
"pin": 7,
"value": 1
},
"persistent": false,
"timeout": 5000
}
# 接收数据
{
"device": "Device A",
"data": {
"id": 21,
"method": "setValue",
"params": {
"pin": 7,
"value": 1
}
}
}
#返回值
{
"device": "Device A",
"id": 21,
"data": {
"success": true
}
}
|
三、服务端RPC
1.工程说明
1
2
3
4
5
6
7
8
9
10
11
| server:
port: 8999
spring:
application:
name: tb-gateway-api
mqtt:
broker-url: tcp://82.157.166.86:1883
client-id: emq-client-gateway-api
username: mC81JBPikrtw82P2pn9Q #网关访问令牌
password:
|
2.设备端(MQTT)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| package com.iothub.gateway;
import com.iothub.mqtt.EmqClient;
import com.iothub.mqtt.MqttProperties;
import com.iothub.mqtt.QosEnum;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class Rpc {
@Autowired
private EmqClient emqClient;
@Autowired
private MqttProperties properties;
@PostConstruct
public void init(){
//连接服务端
emqClient.connect(properties.getUsername(),properties.getPassword());
//订阅一个主题
emqClient.subscribe("v1/gateway/rpc", QosEnum.QoS1);
}
@Scheduled(fixedRate = 2000)
public void publish(){
String data = getData();
//emqClient.publish("v1/gateway/attributes",data, QosEnum.QoS1,false);
}
private String getData(){
String data = "";
return data;
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| /**
* 应用收到消息后触发的回调
* @param topic
* @param message
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
log.info("订阅者订阅到了消息,topic={},messageid={},qos={},payload={}",
topic,
message.getId(),
message.getQos(),
new String(message.getPayload()));
// 双向RPC
/////////////////////////////////////////////////////////
//payload={"device":"Device A","data":{"id":6,"method":"getValue","params":{"pin":88,"value":99}}}
String payload = new String(message.getPayload());
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(payload);
JsonNode data = jsonNode.get("data");
String id = data.get("id").toString();
String retData = "{\"device\": \"Device A\", \"id\": $request_id, \"data\": {\"success\": true}}";
String newData = retData.replace("$request_id", id);
emqClient.publish("v1/gateway/rpc", newData, QosEnum.QoS1,false);
}
|
3.单向RPC(REST API)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
| package com.iothub.rest;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.springframework.http.*;
import org.springframework.web.client.RestTemplate;
public class OneRPC {
public void sendRpc() throws JsonProcessingException {
String baseURL = "http://82.157.166.86:8080";
String token = Login.getToken();
// 地址:/api/rpc/oneway/{deviceId}
// 设备ID: eb9cc990-6711-11ee-afb9-c790163a721a
String apiUrl = "/api/rpc/oneway/{deviceId}";
String deviceId = "9ae2b110-68ae-11ee-afb9-c790163a721a";
HttpHeaders headers = new HttpHeaders();
//headers.add("Authorization", "Bearer "+ mainToken);
headers.set("X-Authorization", "Bearer " + token);
//headers.set("Content-Type", "application/json");
headers.setContentType(MediaType.APPLICATION_JSON);
String body = "{\n" +
" \"method\": \"setValue\",\n" +
" \"params\": {\n" +
" \"pin\": 7,\n" +
" \"value\": 1\n" +
" },\n" +
" \"persistent\": false,\n" +
" \"timeout\": 5000\n" +
"}";
HttpEntity entity = new HttpEntity<>(body, headers);
RestTemplate restTemplate = new RestTemplate();
ResponseEntity response = restTemplate.exchange(baseURL + apiUrl, HttpMethod.POST, entity, String.class, new Object[]{deviceId});
System.out.println(response);
if (response.getStatusCodeValue() == 200) {
String device = (String) response.getBody();
System.out.println(response.getBody());
}
}
}
|
1
2
3
4
5
6
7
8
9
| {
"method": "getValue",
"params": {
"pin": 7,
"value": 1
},
"persistent": false,
"timeout": 5000
}
|
4.双向RPC (REST API)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
| package com.iothub.rest;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.springframework.http.*;
import org.springframework.web.client.RestTemplate;
public class TwoRPC {
public void sendRpc() throws JsonProcessingException {
String baseURL = "http://82.157.166.86:8080";
String token = Login.getToken();
// 地址:/api/rpc/oneway/{deviceId}
// 设备ID: eb9cc990-6711-11ee-afb9-c790163a721a
String apiUrl = "/api/rpc/twoway/{deviceId}";
String deviceId = "9ae2b110-68ae-11ee-afb9-c790163a721a";
HttpHeaders headers = new HttpHeaders();
//headers.add("Authorization", "Bearer "+ mainToken);
headers.set("X-Authorization", "Bearer " + token);
//headers.set("Content-Type", "application/json");
headers.setContentType(MediaType.APPLICATION_JSON);
String body = "{\n" +
" \"method\": \"getValue\",\n" +
" \"params\": {\n" +
" \"pin\": 88,\n" +
" \"value\": 99\n" +
" },\n" +
" \"persistent\": false,\n" +
" \"timeout\": 5000\n" +
"}";
HttpEntity entity = new HttpEntity<>(body, headers);
RestTemplate restTemplate = new RestTemplate();
ResponseEntity response = restTemplate.exchange(baseURL + apiUrl, HttpMethod.POST, entity, String.class, new Object[]{deviceId});
System.out.println(response);
if (response.getStatusCodeValue() == 200) {
String device = (String) response.getBody();
System.out.println(response.getBody());
}
}
}
|
1
2
3
4
5
6
7
8
9
| {
"method": "getValue",
"params": {
"pin": 88,
"value": 99
},
"persistent": false,
"timeout": 5000
}
|
5.测试
5.1.单向RPC
1
2
3
4
5
6
7
8
9
10
11
| http://localhost:8999/rpc/oneway
{
"method": "getValue",
"params": {
"pin": 7,
"value": 1
},
"persistent": false,
"timeout": 5000
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| # 设备端输出
2023-10-13 09:58:34.184 INFO 5728 --- [ent-gateway-api] com.iothub.mqtt.MessageCallback : 订阅者订阅到了消息,topic=v1/gateway/rpc,messageid=1,qos=1,payload={"device":"Device A","data":{"id":21,"method":"setValue","params":{"pin":7,"value":1}}}
{
"device": "Device A",
"data": {
"id": 21,
"method": "setValue",
"params": {
"pin": 7,
"value": 1
}
}
}
# REST API端
<200,[Vary:"Origin", "Access-Control-Request-Method", "Access-Control-Request-Headers", X-Content-Type-Options:"nosniff", X-XSS-Protection:"1; mode=block", Cache-Control:"no-cache, no-store, max-age=0, must-revalidate", Pragma:"no-cache", Expires:"0", Content-Length:"0", Date:"Fri, 13 Oct 2023 01:58:33 GMT", Keep-Alive:"timeout=60", Connection:"keep-alive"]>
null
|
5.2.双向RPC
1
2
3
4
5
6
7
8
9
10
11
| http://localhost:8999/rpc/twoway
{
"method": "getValue",
"params": {
"pin": 88,
"value": 99
},
"persistent": false,
"timeout": 5000
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| # 设备端输出
: 订阅者订阅到了消息,topic=v1/gateway/rpc,messageid=2,qos=1,payload={"device":"Device A","data":{"id":22,"method":"getValue","params":{"pin":88,"value":99}}}
{
"device": "Device A",
"data": {
"id": 22,
"method": "getValue",
"params": {
"pin": 88,
"value": 99
}
}
}
#返回值
{
"device": "Device A",
"id": 22,
"data": {
"success": true
}
}
# REST API端
<200,{"success":true},[Vary:"Origin", "Access-Control-Request-Method", "Access-Control-Request-Headers", X-Content-Type-Options:"nosniff", X-XSS-Protection:"1; mode=block", Cache-Control:"no-cache, no-store, max-age=0, must-revalidate", Pragma:"no-cache", Expires:"0", Content-Type:"application/json", Transfer-Encoding:"chunked", Date:"Fri, 13 Oct 2023 02:11:27 GMT", Keep-Alive:"timeout=60", Connection:"keep-alive"]>
{"success":true}
|