Skip to content

hololinked.core.zmq.brokers.EventPublisher

Bases: BaseZMQServer, BaseSyncZMQ

Event publisher for broadcasting messages to all connected clients. Implements PUB-SUB pattern.

Source code in hololinked/hololinked/core/zmq/brokers.py
class EventPublisher(BaseZMQServer, BaseSyncZMQ):
    """Event publisher for broadcasting messages to all connected clients. Implements PUB-SUB pattern."""

    _standard_address_suffix = "/event-publisher"
    _standard_address_suffix_filename_replacement = "event-publisher"

    def __init__(
        self,
        id: str,
        context: zmq.Context | None = None,
        access_point: str = ZMQ_TRANSPORTS.IPC,
        **kwargs,
    ) -> None:
        """
        Parameters
        ----------
        id: str
            unique identifier of the publisher
        context: zmq.Context
            ZMQ context to use, if None, a global context is used.
        access_point: Enum | str, default ZMQ_TRANSPORTS.IPC
            access point for the publisher to bind to, usually `INPROC`
        """
        super().__init__(id=id, **kwargs)
        self.create_socket(
            server_id=id,
            socket_id=id,
            node_type="server",
            context=context,
            access_point=access_point,
            socket_type=zmq.PUB,
            **kwargs,
        )
        self.events = set()  # type is set[EventDispatcher]
        self.event_ids = set()  # type: set[str]
        self._send_lock = threading.Lock()

    def register(self, event: "EventDispatcher") -> None:
        """
        register event with a specific (unique) name

        Parameters
        ----------
        event: `EventDispatcher`
            `Event` object that needs to be registered. Events created at `__init__()` of `Thing` are
            automatically registered.
        """
        if event._unique_identifier in self.events and event not in self.events:
            raise AttributeError(f"event {event._unique_identifier} already registered, please use another name.")
        self.event_ids.add(event._unique_identifier)
        self.events.add(event)

    def unregister(self, event: "EventDispatcher") -> None:
        """
        unregister event with a specific (unique) name

        Parameters
        ----------
        event: `EventDispatcher`
            `Event` object that needs to be unregistered.
        """
        if event in self.events:
            self.events.remove(event)
            self.event_ids.remove(event._unique_identifier)
        else:
            warnings.warn(
                f"event {event._unique_identifier} not found, did you mean to unregister another event?",
                UserWarning,
            )

    def publish(self, event, data: Any) -> None:
        """
        publish an event with given unique name.

        Parameters
        ----------
        event: `EventDispatcher`
            `Event` object that needs to be published.
        data: Any
            data to be sent as payload of the event
        """
        # uncomment for type definitions
        # from ...core.events import EventDispatcher
        # assert isinstance(event, EventDispatcher), "event must be an instance of EventDispatcher"

        try:
            self._send_lock.acquire()
            if event._unique_identifier in self.event_ids:
                serializer = Serializers.for_object(
                    event._owner_inst.id,
                    event._owner_inst.__class__.__name__,
                    event._descriptor.name,
                )
                content_type_if_no_serializer = Serializers.get_content_type_for_object(
                    event._owner_inst.id,
                    event._owner_inst.__class__.__name__,
                    event._descriptor.name,
                )
                if not isinstance(data, bytes):
                    payload = SerializableData(data, serializer=serializer)
                    preserialized_payload = PreserializedEmptyByte
                else:
                    payload = SerializableNone
                    preserialized_payload = PreserializedData(data, content_type=content_type_if_no_serializer)

                event_message = EventMessage.craft_from_arguments(
                    event._unique_identifier,
                    self.id,
                    payload=payload,
                    preserialized_payload=preserialized_payload,
                )
                self.socket.send_multipart(event_message.byte_array)
                self.logger.debug(f"published event with unique identifier {event._unique_identifier}")
                # print("published event with unique identifier {}".format(event._unique_identifier))
                return
            raise AttributeError(f"event name {event._unique_identifier} not registered")
        finally:
            try:
                self._send_lock.release()
            except Exception as ex:
                self.logger.warning(f"could not release publish lock for event publisher - {str(ex)}")

    def exit(self):
        try:
            BaseZMQ.exit(self)
            self.socket.close(0)
            self.logger.info("terminated event publishing socket")
        except Exception as ex:
            self.logger.warning(
                "could not properly terminate context or attempted to terminate an already terminated context."
                + f" Exception message: {str(ex)}"
            )

Functions

__init__

__init__(id: str, context: Context | None = None, access_point: str = ZMQ_TRANSPORTS.IPC, **kwargs) -> None

Parameters:

Name Type Description Default

id

str

unique identifier of the publisher

required

context

Context | None

ZMQ context to use, if None, a global context is used.

None

access_point

str

access point for the publisher to bind to, usually INPROC

IPC
Source code in hololinked/hololinked/core/zmq/brokers.py
def __init__(
    self,
    id: str,
    context: zmq.Context | None = None,
    access_point: str = ZMQ_TRANSPORTS.IPC,
    **kwargs,
) -> None:
    """
    Parameters
    ----------
    id: str
        unique identifier of the publisher
    context: zmq.Context
        ZMQ context to use, if None, a global context is used.
    access_point: Enum | str, default ZMQ_TRANSPORTS.IPC
        access point for the publisher to bind to, usually `INPROC`
    """
    super().__init__(id=id, **kwargs)
    self.create_socket(
        server_id=id,
        socket_id=id,
        node_type="server",
        context=context,
        access_point=access_point,
        socket_type=zmq.PUB,
        **kwargs,
    )
    self.events = set()  # type is set[EventDispatcher]
    self.event_ids = set()  # type: set[str]
    self._send_lock = threading.Lock()

publish

publish(event, data: Any) -> None

publish an event with given unique name.

Parameters:

Name Type Description Default

event

Event object that needs to be published.

required

data

Any

data to be sent as payload of the event

required
Source code in hololinked/hololinked/core/zmq/brokers.py
def publish(self, event, data: Any) -> None:
    """
    publish an event with given unique name.

    Parameters
    ----------
    event: `EventDispatcher`
        `Event` object that needs to be published.
    data: Any
        data to be sent as payload of the event
    """
    # uncomment for type definitions
    # from ...core.events import EventDispatcher
    # assert isinstance(event, EventDispatcher), "event must be an instance of EventDispatcher"

    try:
        self._send_lock.acquire()
        if event._unique_identifier in self.event_ids:
            serializer = Serializers.for_object(
                event._owner_inst.id,
                event._owner_inst.__class__.__name__,
                event._descriptor.name,
            )
            content_type_if_no_serializer = Serializers.get_content_type_for_object(
                event._owner_inst.id,
                event._owner_inst.__class__.__name__,
                event._descriptor.name,
            )
            if not isinstance(data, bytes):
                payload = SerializableData(data, serializer=serializer)
                preserialized_payload = PreserializedEmptyByte
            else:
                payload = SerializableNone
                preserialized_payload = PreserializedData(data, content_type=content_type_if_no_serializer)

            event_message = EventMessage.craft_from_arguments(
                event._unique_identifier,
                self.id,
                payload=payload,
                preserialized_payload=preserialized_payload,
            )
            self.socket.send_multipart(event_message.byte_array)
            self.logger.debug(f"published event with unique identifier {event._unique_identifier}")
            # print("published event with unique identifier {}".format(event._unique_identifier))
            return
        raise AttributeError(f"event name {event._unique_identifier} not registered")
    finally:
        try:
            self._send_lock.release()
        except Exception as ex:
            self.logger.warning(f"could not release publish lock for event publisher - {str(ex)}")

register

register(event: 'EventDispatcher') -> None

register event with a specific (unique) name

Parameters:

Name Type Description Default

event

'EventDispatcher'

Event object that needs to be registered. Events created at __init__() of Thing are automatically registered.

required
Source code in hololinked/hololinked/core/zmq/brokers.py
def register(self, event: "EventDispatcher") -> None:
    """
    register event with a specific (unique) name

    Parameters
    ----------
    event: `EventDispatcher`
        `Event` object that needs to be registered. Events created at `__init__()` of `Thing` are
        automatically registered.
    """
    if event._unique_identifier in self.events and event not in self.events:
        raise AttributeError(f"event {event._unique_identifier} already registered, please use another name.")
    self.event_ids.add(event._unique_identifier)
    self.events.add(event)

unregister

unregister(event: 'EventDispatcher') -> None

unregister event with a specific (unique) name

Parameters:

Name Type Description Default

event

'EventDispatcher'

Event object that needs to be unregistered.

required
Source code in hololinked/hololinked/core/zmq/brokers.py
def unregister(self, event: "EventDispatcher") -> None:
    """
    unregister event with a specific (unique) name

    Parameters
    ----------
    event: `EventDispatcher`
        `Event` object that needs to be unregistered.
    """
    if event in self.events:
        self.events.remove(event)
        self.event_ids.remove(event._unique_identifier)
    else:
        warnings.warn(
            f"event {event._unique_identifier} not found, did you mean to unregister another event?",
            UserWarning,
        )