DDS Overview
DDS (Data Distribution Service) is a middleware standard released by OMG for distributed communication. It uses a publish/subscribe model and provides multiple QoS (Quality of Service) policies to ensure real-time, efficient, and flexible data distribution, suitable for various distributed real-time communication applications.
pnd_sdk_python is a wrapper on top of DDS, supporting QoS configuration of DDS components. It provides simple interfaces for application development and implements an RPC mechanism based on DDS Topics. It is suitable for inter-process communication within Adam as well as between external processes and Adam, using both publish/subscribe and request/response modes for different data communication scenarios.
API Description
pndbotics_sdk_py.core.channel.ChannelPublisher
The ChannelPublisher class implements message publishing for a specified message type.
| Class |
Constructor |
| ChannelPublisher |
ChannelPublisher(self, name: str, type: Any) |
| Function |
Init |
| Prototype |
def Init(self) |
| Description |
Initialize the channel and prepare to send messages. |
| Parameters |
None |
| Return |
None |
| Note |
Initializes the channel writer. Ensure the channel is ready to send messages. |
| Function |
Close |
| Prototype |
def Close(self) |
| Description |
Close the channel writer and stop publishing messages. |
| Parameters |
None |
| Return |
None |
| Note |
After calling Close, the publisher cannot write messages until re-initialized. |
| Function |
Write |
| Prototype |
def Write(self, sample: Any, timeout: float = None) |
| Description |
Publish a message of the specified type (sample). |
| Parameters |
sample: Any (data to write, can be any Python object) |
| Return |
The return value of self.__channel.Write |
| Note |
If a timeout is provided, it specifies the maximum time to wait for the write operation to complete. |
pndbotics_sdk_py.core.channel.ChannelSubscriber
ChannelSubscriber implements subscription to messages of a specified type.
| Class |
Constructor |
| ChannelSubscriber |
ChannelSubscriber(self, name: str, type: Any) |
| Function |
Init |
| Prototype |
def Init(self, handler: Callable = None, queueLen: int = 0) |
| Description |
Initialize the channel, set the reader and handler, and optionally specify the queue length. |
| Parameters |
handler: Callable = None (optional, callback function to handle messages) queueLen: int = 0 (optional, integer specifying the queue length) |
| Return |
None |
| Note |
Only initialize the channel once (self.__inited flag prevents re-initialization). If handler is None, a default handler is used. |
| Function |
Close |
| Prototype |
def Close(self) |
| Description |
Close the reader associated with the channel and mark it as uninitialized. |
| Parameters |
None |
| Return |
None |
| Note |
Stops reading from the channel and cleans up state. After calling Close, the channel must be re-initialized before reuse. |
| Function |
Read |
| Prototype |
def Read(self, timeout: int = None) |
| Description |
Read data from the channel, optionally with a timeout. |
| Parameters |
timeout: int = None (optional, timeout in seconds for the read operation) |
| Return |
Depends on the channel's Read implementation, usually the read data or None if timed out. |
| Note |
If timeout is not provided, the read operation typically has no time limit. |
pndbotics_sdk_py.core.channel.ChannelFactoryInitialize
This function initializes the channel environment.
| Function |
ChannelFactoryInitialize |
| Prototype |
def ChannelFactoryInitialize(id: int = 0, networkInterface: str = None) |
| Description |
Initialize the ChannelFactory to create and manage channels in a specified domain. |
| Parameters |
id: int = 0 (DDS participant domain ID) networkInterface: str = None (optional, network interface name for the channels) |
| Return |
Raises an exception if initialization fails, otherwise None. |
| Note |
Ensure that the DDS domain and participant are created before using channels. |