Mqtt Plugin
Plugin Overview
mqtt_plugin is a network transport plugin implemented based on the mqtt protocol. This plugin provides the following components:
mqtt
type RPC backendmqtt
type Channel backend
The plugin configuration items are as follows:
Node |
Type |
Optional |
Default |
Purpose |
---|---|---|---|---|
broker_addr |
string |
Required |
“” |
Address of the mqtt broker |
client_id |
string |
Required |
“” |
This node’s mqtt client id |
max_pkg_size_k |
int |
Optional |
1024 |
Maximum packet size, unit: KB |
reconnect_interval_ms |
int |
Optional |
1000 |
Reconnection interval to broker, unit: ms |
truststore |
string |
Optional |
“” |
Path to CA certificate |
client_cert |
string |
Optional |
“” |
Path to client certificate |
client_key |
string |
Optional |
“” |
Path to client private key |
client_key_password |
string |
Optional |
“” |
Password set for client private key |
Regarding the configuration of mqtt_plugin, the following points should be noted:
broker_addr
indicates the address of the mqtt broker. Users must ensure that an mqtt broker is running at this address, otherwise startup will fail.client_id
indicates the client id used when this node connects to the mqtt broker.max_pkg_size_k
indicates the maximum packet size during data transmission, default 1 MB. Note that the broker must also support this size.reconnect_interval_ms
indicates the reconnection interval to the broker, default 1 second.truststore
indicates the path to the broker’s CA certificate, e.g./etc/emqx/certs/cacert.pem
. This option takes effect when the protocol ofbroker_addr
is configured asssl
ormqtts
, used to specify the CA certificate path; otherwise, this option is automatically ignored. Please note that configuring only this option is considered one-way authentication.client_cert
indicates the path to the client certificate, e.g./etc/emqx/certs/client-cert.pem
. Used when mutual authentication is required, in conjunction withclient_key
. If broker_addr uses an unencrypted protocol, this option will be ignored.client_key
indicates the path to the client private key, e.g./etc/emqx/certs/client-key.pem
. Used when mutual authentication is required, in conjunction withclient_cert
. If broker_addr uses an unencrypted protocol, this option will be ignored.client_key_password
indicates the password set for the client private key. If the private key is password-protected, this option needs to be set. If broker_addr uses an unencrypted protocol, this option will be ignored.
The mqtt_plugin plugin is encapsulated based on paho.mqtt.c. When in use, the threads provided by paho.mqtt.c are used for Channel subscription callbacks, RPC Server processing methods, and RPC Client returns. When users block threads in callbacks, it may prevent continued message reception/transmission. As mentioned in the Module interface documentation, generally, if the task in the callback is very lightweight, it can be processed directly in the callback; but if the task in the callback is relatively heavy, it’s best to schedule it to another dedicated executor for processing.
Below is a simple example:
aimrt:
plugin:
plugins:
- name: mqtt_plugin
path: ./libaimrt_mqtt_plugin.so
options:
broker_addr: tcp://127.0.0.1:1883
client_id: example_mqtt_client
max_pkg_size_k: 1024
mqtt Type RPC Backend
The mqtt
type RPC backend is an RPC backend provided by mqtt_plugin, used to invoke and handle AimRT RPC requests via MQTT. All its configuration items are as follows:
Node |
Type |
Optional |
Default |
Description |
---|---|---|---|---|
timeout_executor |
string |
Optional |
“” |
Executor used on the client side when an RPC times out |
clients_options |
array |
Optional |
[] |
Rules for the client side when initiating RPC requests |
clients_options[i].func_name |
string |
Required |
“” |
RPC Func name, supports regular expressions |
clients_options[i].server_mqtt_id |
string |
Optional |
“” |
MQTT server id requested when the RPC Func is invoked |
clients_options[i].qos |
int |
Optional |
2 |
MQTT QoS on the RPC client side, allowed values: 0/1/2 |
servers_options |
array |
Optional |
[] |
Rules for the server side when processing RPC requests |
servers_options[i].func_name |
string |
Required |
“” |
RPC Func name, supports regular expressions |
servers_options[i].allow_share |
bool |
Optional |
true |
Whether this RPC service allows shared subscriptions; if not, the service can only be invoked via a specified server id |
servers_options[i].qos |
int |
Optional |
2 |
MQTT QoS on the RPC server side, allowed values: 0/1/2 |
Below is a simple client example:
aimrt:
plugin:
plugins:
- name: mqtt_plugin
path: ./libaimrt_mqtt_plugin.so
options:
broker_addr: tcp://127.0.0.1:1883
client_id: example_client
max_pkg_size_k: 1024
executor:
executors:
- name: timeout_handle
type: time_wheel
rpc:
backends:
- type: mqtt
options:
timeout_executor: timeout_handle
clients_options:
- func_name: "(.*)"
qos: 0
clients_options:
- func_name: "(.*)"
enable_backends: [mqtt]
Below is a simple server example:
aimrt:
plugin:
plugins:
- name: mqtt_plugin
path: ./libaimrt_mqtt_plugin.so
options:
broker_addr: tcp://127.0.0.1:1883
client_id: example_server
max_pkg_size_k: 1024
rpc:
backends:
- type: mqtt
options:
servers_options:
- func_name: "(.*)"
allow_share: true
qos: 0
servers_options:
- func_name: "(.*)"
enable_backends: [mqtt]
In the examples above, both the client and server connect to an MQTT broker at tcp://127.0.0.1:1883
, and the client is configured to handle all RPC requests via the mqtt backend, thus completing the RPC call loop.
If multiple servers register the same RPC service, the client will randomly select one server to send the request. To specify a particular server, you can set ToAddr in the client’s ctx as follows:
auto ctx_ptr = proxy->NewContextSharedPtr();
// mqtt://{{target server mqtt id}}
ctx_ptr->SetToAddr("mqtt://target_server_mqtt_id");
auto status = proxy->Foo(ctx_ptr, req, rsp);
Throughout the RPC process, the underlying MQTT topic names are formatted as follows:
Server side
Topics subscribed for Req (both are subscribed):
$share/aimrt/aimrt_rpc_req/${func_name}
aimrt_rpc_req/${server_id}/${func_name}
Topic published for Rsp:
aimrt_rpc_rsp/${client_id}/${func_name}
Client side
Topics published for Req (choose one):
aimrt_rpc_req/${func_name}
aimrt_rpc_req/${server_id}/${func_name}
Topic subscribed for Rsp:
aimrt_rpc_rsp/${client_id}/${func_name}
${client_id}
and ${server_id}
must be globally unique within the same MQTT broker environment for the client and server, typically using the id registered with the MQTT broker. ${func_name}
is the URL-encoded AimRT RPC method name. The server subscribes using shared subscriptions to ensure only one server handles the request. This feature requires an MQTT 5.0-compliant broker.
For example, if the client registers with the MQTT broker using id example_client
and the func name is /aimrt.protocols.example.ExampleService/GetBarData
, then ${client_id}
is example_client
and ${func_name}
is %2Faimrt.protocols.example.ExampleService%2FGetBarData
.The overall Mqtt packet format from Client -> Server is divided into 5 segments:
Serialization type, usually
pb
orjson
The mqtt topic name that the client wants the server to reply rsp to. The client itself needs to subscribe to this mqtt topic
msg id, 4 bytes, the server will encapsulate it unchanged into the rsp packet for the client to locate which req the rsp corresponds to
context area
number of contexts, 1 byte, maximum 255 contexts
context_1 key, 2-byte length + data area
context_2 key, 2-byte length + data area
…
msg data
| n(0~255) [1 byte] | content type [n byte]
| m(0~255) [1 byte] | rsp topic name [m byte]
| msg id [4 byte]
| context num [1 byte]
| context_1 key size [2 byte] | context_1 key data [key_1_size byte]
| context_1 val size [2 byte] | context_1 val data [val_1_size byte]
| context_2 key size [2 byte] | context_2 key data [key_2_size byte]
| context_2 val size [2 byte] | context_2 val data [val_2_size byte]
| ...
| msg data [remaining byte]
The overall Mqtt packet format from Server -> Client is divided into 4 segments:
Serialization type, usually
pb
orjson
msg id, 4 bytes, the msg id from the req
status code, 4 bytes, framework error code, if this part is non-zero, it means an error occurred on the server side, and the data segment will have no content
msg data
| n(0~255) [1 byte] | content type [n byte]
| msg id [4 byte]
| status code [4 byte]
| msg data [remaining byte]
mqtt Type Channel Backend
The mqtt
type Channel backend is a Channel backend provided by mqtt_plugin, used to publish and subscribe to messages via mqtt. All its configuration items are as follows:
Node |
Type |
Optional |
Default |
Purpose |
---|---|---|---|---|
pub_topics_options |
array |
Optional |
[] |
Rules when publishing Topic |
pub_topics_options[i].topic_name |
string |
Required |
“” |
Topic name, supports regular expressions |
pub_topics_options[i].qos |
int |
Required |
2 |
Publish side mqtt qos, range: 0/1/2 |
sub_topics_options |
array |
Optional |
[] |
Rules when publishing Topic |
sub_topics_options[i].topic_name |
string |
Required |
“” |
Topic name, supports regular expressions |
sub_topics_options[i].qos |
int |
Required |
2 |
Subscribe side mqtt qos, range: 0/1/2 |
Below is a simple publisher example:
aimrt:
plugin:
plugins:
- name: mqtt_plugin
path: ./libaimrt_mqtt_plugin.so
options:
broker_addr: tcp://127.0.0.1:1883
client_id: example_publisher
max_pkg_size_k: 1024
channel:
backends:
- type: mqtt
options:
pub_topics_options:
- topic_name: "(.*)"
qos: 2
pub_topics_options:
- topic_name: "(.*)"
enable_backends: [mqtt]
And here is a simple subscriber example:
aimrt:
plugin:
plugins:
- name: mqtt_plugin
path: ./libaimrt_mqtt_plugin.so
options:
broker_addr: tcp://127.0.0.1:1883
client_id: example_subscriber
max_pkg_size_k: 1024
channel:
backends:
- type: mqtt
sub_topics_options:
- topic_name: "(.*)"
enable_backends: [mqtt]
In the above examples, both the publisher and subscriber connect to an Mqtt broker at address tcp://127.0.0.1:1883
. The publisher is configured to process all messages through the mqtt backend, and the subscriber is configured to trigger callbacks for all messages from the mqtt backend, thereby establishing the message publish-subscribe link.
During this process, the underlying Mqtt Topic name format is: /channel/${topic_name}/${message_type}
. Here, ${topic_name}
is the AimRT Topic name, and ${message_type}
is the url-encoded AimRT message name.
For example, if the AimRT Topic name is test_topic
and the message type is pb:aimrt.protocols.example.ExampleEventMsg
, then the final Mqtt topic name will be: /channel/test_topic/pb%3Aaimrt.protocols.example.ExampleEventMsg
.
In the chain from AimRT publisher to subscriber, the Mqtt packet format is divided into 3 segments:
Serialization type, generally
pb
orjson
context section
context count, 1 byte, maximum 255 contexts
context_1 key, 2-byte length + data section
context_2 key, 2-byte length + data section
…
data
| n(0~255) [1 byte] | content type [n byte]
| context num [1 byte]
| context_1 key size [2 byte] | context_1 key data [key_1_size byte]
| context_1 val size [2 byte] | context_1 val data [val_1_size byte]
| context_2 key size [2 byte] | context_2 key data [key_2_size byte]
| context_2 val size [2 byte] | context_2 val data [val_2_size byte]
| ...
| msg data [len - 1 - n byte] |