Rpc
Protocol
The protocol is used to determine the message format between client and server in RPC. Generally, protocols are described using an IDL (Interface Description Language) that is independent of any specific programming language, and then converted into code for each language by some tool. For RPC, two steps are required:
Refer to the Channel chapter; developers first need to use some official tools to generate code for the message types in the protocol file for the target programming language;
Developers need to use the tools provided by AimRT to generate code for the service definitions in the protocol file for the target programming language;
Protobuf
Protobuf is a lightweight, efficient data exchange format for serializing structured data, developed by Google, and is a widely used IDL. It not only describes message structures but also provides the service
statement to define RPC services.
When using it, developers need to first define a .proto
file in which message structures and RPC services are defined. For example, rpc.proto
:
syntax = "proto3";
package example;
message ExampleReq {
string msg = 1;
int32 num = 2;
}
message ExampleRsp {
uint64 code = 1;
string msg = 2;
}
service ExampleService {
rpc ExampleFunc(ExampleReq) returns (ExampleRsp);
}
Then use the protoc tool provided officially by Protobuf to generate the Python code for the message structure part, for example:
protoc --python_out=. rpc.proto
This will generate the rpc_pb2.py
file, which contains the Python interfaces generated from the defined message types.
After that, the protoc plugin provided by AimRT is also needed to generate the Python stub code for the service definition part, for example:
protoc --aimrt_rpc_out=. --plugin=protoc-gen-aimrt_rpc=./protoc_plugin_py_gen_aimrt_py_rpc.py rpc.proto
This will generate the rpc_aimrt_rpc_pb2.py
file, which contains the Python interfaces generated from the defined services, and our business code needs to import this file.
ROS2 Srv
ROS2 Srv is a format used for defining RPC in ROS2. When using it, developers need to first define a ROS2 Package and within it define a .srv
file, such as example.srv
:
byte[] data
---
int64 code
Here, ---
is used to separate the Req and Rsp definitions.
When aimrt_py is installed, a command-line tool aimrt_py-gen-ros2-rpc
is automatically installed, which is used to generate AimRT Python RPC stub code based on ROS2 Srv files.
aimrt_py-gen-ros2-rpc --pkg_name=example_pkg --srv_file=./example.srv --output_path=./
Where pkg_name is the name of the ROS2 Package, srv_file is the path to the ROS2 Srv file, and output_path is the output path for the generated stub code. This will generate an example_aimrt_rpc_ros2.py
file, which contains the Python interfaces generated from the defined services, and our business code needs to import this file.
Note that aimrt_py-gen-ros2-rpc
only generates the definitions for Req and Rsp; it does not generate the code for the message structure part. The message structure code still needs to be generated by the user (i.e., build the ROS2 Package and generate the message structure code; for details, refer to the ROS2 official documentation).
RpcHandle
A module can obtain the RpcHandleRef
handle by calling the GetRpcHandle()
interface of the CoreRef
handle. Generally, developers do not directly use the interfaces provided by RpcHandleRef
, but instead generate some stub code based on the RPC IDL file, encapsulate the RpcHandleRef
handle, and then use these encapsulated interfaces in business code.
The specific form of these encapsulated interfaces will be introduced in subsequent sections of this document. When using RPC functionality, developers need to follow these steps:
Client side:
During the
Initialize
phase, call the interface to register the RPC Client method;During the
Start
phase, call the RPC Invoke interface to perform the RPC call;
Server side:
During the
Initialize
phase, call the interface to register the RPC Server service;
RpcStatus
During RPC invocation or processing, users can obtain error information in the RPC process through a variable of type RpcStatus
, which includes the following interfaces:
OK()->bool
: whether it succeeded;Code()->int
: error code;ToString()->str
: convert to string;
The RpcStatus
type is very lightweight, containing only an error code field. Users can set this Code through the constructor or Set method, and can get this Code through the Get method. The enumeration values for error codes can be found in the rpc_status_base.h file.
Please note that the error information in RpcStatus
generally only indicates framework-level errors, such as service not found, network errors, or serialization errors, etc., for developers to troubleshoot framework-level issues. If developers need to return business-level errors, it is recommended to add corresponding fields in the business package.## RpcContext
RpcContext is the context information during an RPC call. Developers can set some context information during the RPC call, such as timeout duration and Meta information. The specific interfaces are as follows:
CheckUsed()->bool
: Check whether the Context has been used;SetUsed()->None
: Mark the Context as used;Reset()->None
: Reset the Context;GetType()->aimrt_rpc_context_type_t
: Get the Context type;Timeout()->datetime.timedelta
: Get the timeout duration;SetTimeout(timeout: datetime.timedelta)->None
: Set the timeout duration;SetMetaValue(key: str, value: str)->None
: Set metadata;GetMetaValue(key: str)->str
: Get metadata;GetMetaKeys()->List[str]
: Get the list of keys in all metadata key-value pairs;SetToAddr(addr: str)->None
: Set the target address;GetToAddr()->str
: Get the target address;SetSerializationType(serialization_type: str)->None
: Set the serialization type;GetSerializationType()->str
: Get the serialization type;GetFunctionName()->str
: Get the function name;SetFunctionName(func_name: str)->None
: Set the function name;ToString()->str
: Get the context information, returning human-readable information as a string;
RpcContextRef
is a reference type of RpcContext
. Except for not having the Reset
interface, all other interfaces are identical to RpcContext
.
aimrt_rpc_context_type_t
is an enumeration type that defines the context type. The specific values are AIMRT_RPC_CLIENT_CONTEXT
or AIMRT_RPC_SERVER_CONTEXT
, indicating whether this is a client or server context.
Client
In the code generated by the AimRT Python RPC stub tool, such as the xxx_aimrt_rpc_pb2.py
file, the XXXProxy
type is provided. Developers use this Proxy interface to initiate RPC calls. This interface is synchronous; using this Proxy interface to initiate an RPC call will block the current thread until a response is received or the request times out.
Using this Proxy to initiate an RPC call is very simple and generally involves the following steps:
Step 0: Import the stub code package generated according to the protobuf protocol, for example
xxx_aimrt_rpc_pb2.py
;Step 1: During the
Initialize
phase, call the Proxy’s static methodRegisterClientFunc
to register the RPC Client;Step 2: In a business function during the
Start
phase, initiate the RPC call:Step 2-1: Create a Proxy instance, with the constructor parameter being
RpcHandleRef
;Step 2-2: Create the Req and populate its content;
Step 2-3: [Optional] Create ctx and set timeout and other information;
Step 2-4: Based on the proxy, pass in ctx and Req, initiate the RPC call, and synchronously wait for the RPC call to complete. Ensure that ctx and Req remain valid and unchanged throughout the entire call cycle, and finally obtain the returned status and Rsp;
Step 2-5: Parse the status and Rsp;
Below is an example of using AimRT Python for an RPC Client call based on the protobuf protocol, obtaining the CoreRef
handle through the Create Module method. If using the Module
pattern to obtain the CoreRef
handle in the Initialize
method, the usage is similar:
import aimrt_py
import threading
import time
import datetime
from google.protobuf.json_format import MessageToJson
import rpc_pb2
import rpc_aimrt_rpc_pb2
def main():
aimrt_core = aimrt_py.Core()
# Initialize
core_options = aimrt_py.CoreOptions()
core_options.cfg_file_path = "path/to/cfg/xxx_cfg.yaml"
aimrt_core.Initialize(core_options)
# Create Module
module_handle = aimrt_core.CreateModule("NormalRpcClientPyModule")
# Register rpc client
rpc_handle = module_handle.GetRpcHandle()
ret = rpc_aimrt_rpc_pb2.ExampleServiceProxy.RegisterClientFunc(rpc_handle)
assert ret, "Register client failed."
# Start
thread = threading.Thread(target=aimrt_core.Start)
thread.start()
# Sleep for seconds
time.sleep(1)
# Call rpc
proxy = rpc_aimrt_rpc_pb2.ExampleServiceProxy(rpc_handle)
req = rpc_pb2.GetFooDataReq()
req.msg = "example msg"
ctx = aimrt_py.RpcContext()
ctx.SetTimeout(datetime.timedelta(seconds=30))
ctx.SetMetaValue("key1", "value1")
status, rsp = proxy.GetFooData(ctx, req)
aimrt_py.info(module_handle.GetLogger(),
f"Call rpc done, "
f"status: {status.ToString()}, "
f"req: {MessageToJson(req)}, "
f"rsp: {MessageToJson(rsp)}")
# Shutdown
aimrt_core.Shutdown()
thread.join()
if __name__ == '__main__':
main()
Server
In the code generated by the AimRT Python RPC stub tool, such as the xxx_aimrt_rpc_pb2.py
file, a Service base class that inherits from aimrt_py.ServiceBase
is provided. Developers need to inherit this Service base class and implement its virtual interfaces. This Service interface is a synchronous interface, and developers can only block in the handle to complete all operations and finally return the response.
Using this interface to provide RPC services generally involves the following steps:
Step 0: Import the stub code package generated from the protobuf protocol, such as
xxx_aimrt_rpc_pb2.py
;Step 1: Developers implement an Impl class that inherits from
XXXService
in the package and implement its virtual interfaces, with the interface form(ctx, req)->status, rsp
;Step 1-1: Parse Ctx and Req, and fill in Rsp;
Step 1-2: Return
RpcStatus
and Rsp;
Step 2: During the
Initialize
phase, call theRegisterService
method ofRpcHandleRef
to register the RPC Service;
The following is an example of using AimRT Python for RPC Service processing based on the protobuf protocol, obtaining the CoreRef
handle through the Create Module method. If using the Module
pattern to obtain the CoreRef
handle in the Initialize
method, the usage is similar:
import aimrt_py
import threading
import signal
from google.protobuf.json_format import MessageToJson
import rpc_pb2
import rpc_aimrt_rpc_pb2
global_aimrt_core = None
def signal_handler(sig, frame):
global global_aimrt_core
if (global_aimrt_core and (sig == signal.SIGINT or sig == signal.SIGTERM)):
global_aimrt_core.Shutdown()
return
sys.exit(0)
class ExampleServiceImpl(rpc_aimrt_rpc_pb2.ExampleService):
def __init__(self, logger):
super().__init__()
self.logger = logger
@staticmethod
def PrintMetaInfo(logger, ctx_ref):
meta_keys = ctx_ref.GetMetaKeys()
for key in meta_keys:
aimrt_py.info(logger, f"meta key: {key}, value: {ctx_ref.GetMetaValue(key)}")
def GetFooData(self, ctx_ref, req):
rsp = rpc_pb2.GetFooDataRsp()
rsp.msg = "echo " + req.msg
ExampleServiceImpl.PrintMetaInfo(self.logger, ctx_ref)
aimrt_py.info(self.logger,
f"Server handle new rpc call. "
f"context: {ctx_ref.ToString()}, "
f"req: {MessageToJson(req)}, "
f"return rsp: {MessageToJson(rsp)}")
return aimrt_py.RpcStatus(), rsp
def main():
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
aimrt_core = aimrt_py.Core()
global global_aimrt_core
global_aimrt_core = aimrt_core
# Initialize
core_options = aimrt_py.CoreOptions()
core_options.cfg_file_path = "path/to/cfg/xxx_cfg.yaml"
aimrt_core.Initialize(core_options)
# Create Module
module_handle = aimrt_core.CreateModule("NormalRpcServerPymodule")
# Register rpc service
service = ExampleServiceImpl(module_handle.GetLogger())
ret = module_handle.GetRpcHandle().RegisterService(service)
assert ret, "Register service failed."
# Start
thread = threading.Thread(target=aimrt_core.Start)
thread.start()
while thread.is_alive():
thread.join(1.0)
if __name__ == '__main__':
main()
RPC calls and processing based on the ROS2 Srv protocol are basically the same as RPC calls and processing based on the protobuf protocol, except for the different data types.
For complete examples, please refer to: