Source code for pyopendds.DomainParticipant

from typing import Dict, Any, List

from .Topic import Topic
from .Subscriber import Subscriber
from .Publisher import Publisher


[docs]class DomainParticipant:
[docs] def __init__(self, domain: int, qos=None, listener=None): self.domain = int(domain) self.qos = qos self.listener = listener self.topics: Dict[str, Topic] = {} self.subscribers: List[Subscriber] = [] self.publishers: List[Publisher] = [] self._registered_typesupport: List[Any] = [] from _pyopendds import create_participant create_participant(self, domain)
def __del__(self): from _pyopendds import participant_cleanup participant_cleanup(self) def create_topic(self, name: str, topic_type: type, qos=None, listener=None) -> Topic: return Topic(self, name, topic_type, qos, listener) def create_subscriber(self, qos=None, listener=None) -> Subscriber: return Subscriber(self, qos, listener) def create_publisher(self, qos=None, listener=None) -> Publisher: return Publisher(self, qos, listener)