Channel
Protocol
A protocol is used to determine the message format for all communicating parties. Generally, protocols are described using a language-agnostic IDL (Interface Description Language) and then converted into code for each language by some tool.
Protobuf
Protobuf is a lightweight, efficient data exchange format for serializing structured data, developed by Google and widely used as an IDL.
The current AimRT Python release only supports the protobuf protocol. Before using AimRT Python to send/subscribe to messages, users must first generate some Python stub code based on the protobuf protocol.
To use it, developers first define a .proto file in which a message structure is defined. For example, example.proto:
syntax = "proto3";
message ExampleMsg {
string msg = 1;
int32 num = 2;
}
Then use the official protoc tool provided by Protobuf to convert it and generate Python code, for example:
protoc --python_out=. example.proto
This will produce the example_pb2.py file, which contains Python interfaces generated from the defined message types. Our business code needs to import this file.
ROS2 Message
ROS2 Message is a structured data format used for communication and data exchange in ROS2. To use it, developers first create a ROS2 Package and define a .msg file within it, such as example.msg:
int32 num
float32 num2
char data
Then directly use the CMake method provided by ROS2, rosidl_generate_interfaces, to generate C++ code and CMake Targets for the message, for example:
rosidl_generate_interfaces(
example_ros2_msg_gencode
"msg/example.msg"
)
After building, you also need to set the corresponding environment variables before the generated message types can be used in Python. In the aimrt build directory, execute:
source install/share/example_ros2/setup.bash
For more detailed information on generating custom ROS2 Messages, please refer to the ROS2 official documentation.
ChannelHandle
A module can obtain a ChannelHandleRef handle by calling the GetChannelHandle() interface of the CoreRef handle to use Channel functionality. Its core interfaces are:
GetPublisher(str)->PublisherRefGetSubscriber(str)->SubscriberRef
Developers can call the GetPublisher and GetSubscriber methods in ChannelHandleRef to obtain PublisherRef and SubscriberRef type handles for a specified Topic name, used for Channel publishing and subscribing respectively. Usage notes for these two methods are as follows:
These two interfaces are thread-safe.
These two interfaces can be used during the
Initializephase and theStartphase.
Publish
If a user needs to publish a Msg, the main interfaces involved are the following two:
aimrt_py.RegisterPublishType(publisher, msg_type)->bool: Used to register this message type;The first parameter
publisheris aPublisherRefhandle representing a certain Topic;The second parameter
msg_typeis aProtobuftype;The return value is a bool indicating whether the registration was successful;
aimrt_py.Publish(publisher, msg, ctx | serialization_type): Used to publish a message;The first parameter
publisheris aPublisherRefhandle representing a certain Topic;The second parameter
msgis an instance of aProtobuftype, which must correspond to the message type registered;The third parameter can be a
Contexttype instance, aContextRefhandle, or aserialization_typestring to specify the message’s context or serialization type.serialization_typecan only be set topborjson. Bothctxandserialization_typecan be empty; when empty, pb serialization is used by default;This function also has the following overload:
aimrt_py.Publish(publisher, ctx | serialization_type, msg)
Users need two steps to implement message publishing at the logical level:
Step 1: Use the
aimrt_py.RegisterPublishTypemethod to register the protocol type;Registration can only be done during the
Initializephase;Re-registering the same type in a
PublisherRefis not allowed;If registration fails, it returns false;
Step 2: Use the
aimrt_py.Publishmethod to publish data;Data can only be published after the
Startphase;When calling the
Publishinterface, the developer must ensure that the Msg passed in does not change until thePublishinterface returns; otherwise, the behavior is undefined;
After a user Publishes a message, a specific Channel backend will handle the actual message publishing request. Depending on the backend implementation, this may block for a period of time, so the time consumed by the Publish method is undefined. However, generally, Channel backends do not block the Publish method for long. For detailed information, please refer to the documentation of the corresponding backend.
Subscribe
If a user needs to subscribe to a Msg, the following interface must be used:
aimrt_py.Subscribe(subscriber, msg_type, handle)->bool: Used to subscribe to a message type;The first parameter
subscriberis aSubscriberRefhandle representing a certain Topic;The second parameter
msg_typeis aProtobuftype;The third parameter
handleis a message processing callback with the signature(msg)->voidor(ctx_ref, msg)->void. The type ofmsgis themsg_typetype passed during subscription, andctx_refis the message’s context handle;The return value is a bool indicating whether the registration was successful;
The subscription interface can only be called during
Initialize;Re-subscribing to the same type in a
SubscriberRefis not allowed;If subscription fails, it returns false;
Additionally, note that which executor runs the subscription callback depends on the specific Channel backend implementation and can only be determined at runtime through configuration. Users should not make any assumptions when writing logical code. For detailed information, please refer to the documentation of the corresponding backend.
Best practice: If the task in the callback is very lightweight, such as just setting a variable, it can be handled directly in the callback; but if the task in the callback is relatively heavy, it is best to schedule it to another executor dedicated to handling tasks.## Context
Context is a data structure in AimRT used to pass contextual information. The interfaces it supports are as follows:
Reset()->void: Resets the context; after resetting, the context can be reused;CheckUsed()->bool: Checks whether the context has been used;SetUsed()->void: Sets the context as used;GetType()->aimrt_channel_context_type_t: Gets the context type;SetMetaValue(key: str, value: str)->void: Sets metadata;GetMetaValue(key: str)->str: Gets metadata;GetMetaKeys()->List[str]: Gets a list of keys from all metadata key-value pairs;SetSerializationType(serialization_type: str)->void: Sets the serialization type;GetSerializationType()->str: Gets the serialization type;ToString()->str: Gets context information, returning human-readable information in string form;
ContextRef is the reference type of Context. Except for not having the Reset interface, all other interfaces are identical to Context.
aimrt_channel_context_type_t is an enumeration type that defines the context type. The specific values are AIMRT_CHANNEL_PUBLISHER_CONTEXT or AIMRT_CHANNEL_SUBSCRIBER_CONTEXT, indicating whether this is a publisher or subscriber context.## Usage Examples
Below is an example of using AimRT Python to Publish, obtaining the CoreRef handle via the Create Module approach. If you are using the Module pattern and obtaining the CoreRef handle in the Initialize method, the usage is similar:
import aimrt_py
import threading
from google.protobuf.json_format import MessageToJson
import event_pb2
def main():
aimrt_core = aimrt_py.Core()
# Initialize
core_options = aimrt_py.CoreOptions()
core_options.cfg_file_path = "path/to/cfg/xxx_cfg.yaml"
aimrt_core.Initialize(core_options)
# Create Module
module_handle = aimrt_core.CreateModule("NormalPublisherPyModule")
# Register publish type
topic_name = "test_topic"
publisher = module_handle.GetChannelHandle().GetPublisher(topic_name)
assert publisher, f"Get publisher for topic '{topic_name}' failed."
aimrt_py.RegisterPublishType(publisher, event_pb2.ExampleEventMsg)
# Start
thread = threading.Thread(target=aimrt_core.Start)
thread.start()
# Sleep for seconds
time.sleep(1)
# Publish event
event_msg = event_pb2.ExampleEventMsg()
event_msg.msg = "Publish without ctx or serialization_type"
event_msg.num = 1
aimrt_py.Publish(publisher, event_msg)
aimrt_py.info(module_handle.GetLogger(),
f"Publish new pb event, data: {MessageToJson(event_msg)}")
# Publish event with json serialization
event_msg.msg = "Publish with json serialization"
event_msg.num = 2
aimrt_py.Publish(publisher, "json", event_msg)
aimrt_py.info(module_handle.GetLogger(),
f"Publish new pb event, data: {MessageToJson(event_msg)}")
# Publish event with context
ctx = aimrt_py.Context()
ctx.SetMetaValue("key1", "value1")
event_msg.msg = "Publish with context"
event_msg.num = 3
aimrt_py.Publish(publisher, ctx, event_msg)
aimrt_py.info(module_handle.GetLogger(),
f"Publish new pb event, data: {MessageToJson(event_msg)}")
# Publish event with context ref
ctx.Reset() # Reset context, then it can be used again
ctx_ref = aimrt_py.ContextRef(ctx)
ctx_ref.SetMetaValue("key2", "value2")
ctx_ref.SetSerializationType("json")
event_msg.msg = "Publish with context ref"
event_msg.num = 4
aimrt_py.Publish(publisher, ctx_ref, event_msg)
aimrt_py.info(module_handle.GetLogger(),
f"Publish new pb event, data: {MessageToJson(event_msg)}")
# Sleep for seconds
time.sleep(1)
# Shutdown
aimrt_core.Shutdown()
thread.join()
if __name__ == '__main__':
main()
Below is an example of using AimRT Python to Subscribe, obtaining the CoreRef handle via the Create Module approach. If you are using the Module pattern and obtaining the CoreRef handle in the Initialize method, the usage is similar:
import aimrt_py
import threading
from google.protobuf.json_format import MessageToJson
import event_pb2
global_aimrt_core = None
def signal_handler(sig, frame):
global global_aimrt_core
if (global_aimrt_core and (sig == signal.SIGINT or sig == signal.SIGTERM)):
global_aimrt_core.Shutdown()
return
sys.exit(0)
def main():
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
aimrt_core = aimrt_py.Core()
global global_aimrt_core
global_aimrt_core = aimrt_core
# Initialize
core_options = aimrt_py.CoreOptions()
core_options.cfg_file_path = "path/to/cfg/xxx_cfg.yaml"
aimrt_core.Initialize(core_options)
# Create Module
module_handle = aimrt_core.CreateModule("NormalSubscriberPyModule")
# Subscribe
topic_name = "test_topic"
subscriber = module_handle.GetChannelHandle().GetSubscriber(topic_name)
assert subscriber, f"Get subscriber for topic '{topic_name}' failed."
def EventHandle(ctx_ref, msg):
aimrt_py.info(module_handle.GetLogger(),
f"Get new pb event, ctx: {ctx_ref.ToString()}, data: {MessageToJson(msg)}")
aimrt_py.Subscribe(subscriber, event_pb2.ExampleEventMsg, EventHandle)
# Start
thread = threading.Thread(target=aimrt_core.Start)
thread.start()
while thread.is_alive():
thread.join(1.0)
if name == ‘main’: main()