Skip to content

EventConsumer

hololinked.core.zmq.brokers.BaseEventConsumer

Bases: BaseZMQClient

Consumes events published at PUB sockets using SUB socket

Source code in hololinked/hololinked/core/zmq/brokers.py
class BaseEventConsumer(BaseZMQClient):
    """Consumes events published at PUB sockets using SUB socket"""

    def __init__(
        self,
        id: str,
        event_unique_identifier: str,
        access_point: str,
        context: zmq.Context | None = None,
        **kwargs,
    ) -> None:
        """
        Parameters
        ----------
        id: str
            unique identity for the consumer
        event_unique_identifier: str
            unique identifier of the event registered at the PUB socket
        access_point: str
            socket address of the event publisher (`EventPublisher`), properly qualified with transport method
        context: zmq.Context
            ZMQ context to use, if None, a global context is used.
        **kwargs:
            additional arguments:

            - `logger`: `logging.Logger`, logger instance to use. If None, a default
            - `poll_timeout`: `int`, socket polling timeout in milliseconds greater than 0.
            - `server_id`: `str`, id of the PUB socket server, usually not necessary as `access_point` is sufficient.
        """

        if isinstance(self, BaseSyncZMQ):
            self.context = context or global_config.zmq_context()
            self.poller = zmq.Poller()
            socket_class = zmq.Socket
            self._poller_lock = threading.Lock()
        elif isinstance(self, BaseAsyncZMQ):
            self.context = context or global_config.zmq_context()
            self.poller = zmq.asyncio.Poller()
            socket_class = zmq.asyncio.Socket
            self._poller_lock = asyncio.Lock()
        else:
            raise TypeError("BaseEventConsumer must be subclassed by either BaseSyncZMQ or BaseAsyncZMQ")
        super().__init__(id=id, server_id=kwargs.get("server_id", None), **kwargs)
        logger = kwargs.get("logger", None)
        if not logger:
            logger = structlog.get_logger().bind(
                component="broker",
                impl=self.__class__.__name__,
                id=id,
                event_id=event_unique_identifier,
            )
        self.logger = logger  # type: structlog.stdlib.BoundLogger
        self.create_socket(
            server_id=id,
            socket_id=id,
            node_type="client",
            context=self.context,
            socket_type=zmq.SUB,
            access_point=access_point,
            **kwargs,
        )
        self.event_unique_identifier = bytes(event_unique_identifier, encoding="utf-8")
        short_uuid = uuid_hex()
        self.interruptor = self.context.socket(zmq.PAIR, socket_class=socket_class)
        self.interruptor.setsockopt_string(zmq.IDENTITY, f"interrupting-server-{short_uuid}")
        self.interrupting_peer = self.context.socket(zmq.PAIR, socket_class=socket_class)
        self.interrupting_peer.setsockopt_string(zmq.IDENTITY, f"interrupting-client-{short_uuid}")
        self.interruptor.bind(f"inproc://{self.id}-{short_uuid}/interruption")
        self.interrupting_peer.connect(f"inproc://{self.id}-{short_uuid}/interruption")
        self._stop = False

    def subscribe(self) -> None:
        """subscribe to the event at the PUB socket"""
        self.socket.setsockopt(zmq.SUBSCRIBE, self.event_unique_identifier)
        # pair sockets cannot be polled unforunately, so we use router
        # if self.socket in self.poller._map:
        #     self.poller.unregister(self.socket)
        # if self.interruptor in self.poller._map:
        #     self.poller.unregister(self.interruptor)
        self.poller.register(self.socket, zmq.POLLIN)
        self.poller.register(self.interruptor, zmq.POLLIN)

    def stop_polling(self) -> None:
        """stop polling for events when `receive()` is called"""
        self._stop = True

    @property
    def interrupt_message(self) -> EventMessage:
        """
        craft an interrupt message to be sent to the interruptor socket, if `stop_polling()` is not sufficient as
        the poll timeout is infinite. Used internally by `interrupt()` method.
        """
        return EventMessage.craft_from_arguments(
            event_id=f"{self.id}/interrupting-server",
            sender_id=self.id,
            payload=SerializableData("INTERRUPT", content_type="application/json"),
        )

    def exit(self):
        try:
            BaseZMQ.exit(self)
            self.poller.unregister(self.socket)
            self.poller.unregister(self.interruptor)
        except Exception as ex:  # noqa
            # TODO - log message and undo noqa
            self.logger.warning(f"could not unregister sockets from poller for event consumer - {str(ex)}")
        try:
            self.socket.close(0)
            self.interruptor.close(0)
            self.interrupting_peer.close(0)
            self.logger.info(f"terminated event consuming socket {self.socket_address}")
        except Exception as ex:
            self.logger.warning(f"could not terminate sockets. exception message - {str(ex)}")

Functions

__init__

__init__(id: str, event_unique_identifier: str, access_point: str, context: Context | None = None, **kwargs) -> None

Parameters:

Name Type Description Default
id
str

unique identity for the consumer

required
event_unique_identifier
str

unique identifier of the event registered at the PUB socket

required
access_point
str

socket address of the event publisher (EventPublisher), properly qualified with transport method

required
context
Context | None

ZMQ context to use, if None, a global context is used.

None
**kwargs

additional arguments:

  • logger: logging.Logger, logger instance to use. If None, a default
  • poll_timeout: int, socket polling timeout in milliseconds greater than 0.
  • server_id: str, id of the PUB socket server, usually not necessary as access_point is sufficient.
{}
Source code in hololinked/hololinked/core/zmq/brokers.py
def __init__(
    self,
    id: str,
    event_unique_identifier: str,
    access_point: str,
    context: zmq.Context | None = None,
    **kwargs,
) -> None:
    """
    Parameters
    ----------
    id: str
        unique identity for the consumer
    event_unique_identifier: str
        unique identifier of the event registered at the PUB socket
    access_point: str
        socket address of the event publisher (`EventPublisher`), properly qualified with transport method
    context: zmq.Context
        ZMQ context to use, if None, a global context is used.
    **kwargs:
        additional arguments:

        - `logger`: `logging.Logger`, logger instance to use. If None, a default
        - `poll_timeout`: `int`, socket polling timeout in milliseconds greater than 0.
        - `server_id`: `str`, id of the PUB socket server, usually not necessary as `access_point` is sufficient.
    """

    if isinstance(self, BaseSyncZMQ):
        self.context = context or global_config.zmq_context()
        self.poller = zmq.Poller()
        socket_class = zmq.Socket
        self._poller_lock = threading.Lock()
    elif isinstance(self, BaseAsyncZMQ):
        self.context = context or global_config.zmq_context()
        self.poller = zmq.asyncio.Poller()
        socket_class = zmq.asyncio.Socket
        self._poller_lock = asyncio.Lock()
    else:
        raise TypeError("BaseEventConsumer must be subclassed by either BaseSyncZMQ or BaseAsyncZMQ")
    super().__init__(id=id, server_id=kwargs.get("server_id", None), **kwargs)
    logger = kwargs.get("logger", None)
    if not logger:
        logger = structlog.get_logger().bind(
            component="broker",
            impl=self.__class__.__name__,
            id=id,
            event_id=event_unique_identifier,
        )
    self.logger = logger  # type: structlog.stdlib.BoundLogger
    self.create_socket(
        server_id=id,
        socket_id=id,
        node_type="client",
        context=self.context,
        socket_type=zmq.SUB,
        access_point=access_point,
        **kwargs,
    )
    self.event_unique_identifier = bytes(event_unique_identifier, encoding="utf-8")
    short_uuid = uuid_hex()
    self.interruptor = self.context.socket(zmq.PAIR, socket_class=socket_class)
    self.interruptor.setsockopt_string(zmq.IDENTITY, f"interrupting-server-{short_uuid}")
    self.interrupting_peer = self.context.socket(zmq.PAIR, socket_class=socket_class)
    self.interrupting_peer.setsockopt_string(zmq.IDENTITY, f"interrupting-client-{short_uuid}")
    self.interruptor.bind(f"inproc://{self.id}-{short_uuid}/interruption")
    self.interrupting_peer.connect(f"inproc://{self.id}-{short_uuid}/interruption")
    self._stop = False

subscribe

subscribe() -> None

subscribe to the event at the PUB socket

Source code in hololinked/hololinked/core/zmq/brokers.py
def subscribe(self) -> None:
    """subscribe to the event at the PUB socket"""
    self.socket.setsockopt(zmq.SUBSCRIBE, self.event_unique_identifier)
    # pair sockets cannot be polled unforunately, so we use router
    # if self.socket in self.poller._map:
    #     self.poller.unregister(self.socket)
    # if self.interruptor in self.poller._map:
    #     self.poller.unregister(self.interruptor)
    self.poller.register(self.socket, zmq.POLLIN)
    self.poller.register(self.interruptor, zmq.POLLIN)

stop_polling

stop_polling() -> None

stop polling for events when receive() is called

Source code in hololinked/hololinked/core/zmq/brokers.py
def stop_polling(self) -> None:
    """stop polling for events when `receive()` is called"""
    self._stop = True

interrupt_message

interrupt_message() -> EventMessage

craft an interrupt message to be sent to the interruptor socket, if stop_polling() is not sufficient as the poll timeout is infinite. Used internally by interrupt() method.

Source code in hololinked/hololinked/core/zmq/brokers.py
@property
def interrupt_message(self) -> EventMessage:
    """
    craft an interrupt message to be sent to the interruptor socket, if `stop_polling()` is not sufficient as
    the poll timeout is infinite. Used internally by `interrupt()` method.
    """
    return EventMessage.craft_from_arguments(
        event_id=f"{self.id}/interrupting-server",
        sender_id=self.id,
        payload=SerializableData("INTERRUPT", content_type="application/json"),
    )

hololinked.core.zmq.brokers.EventConsumer

Bases: BaseEventConsumer, BaseSyncZMQ

Sync Event Consumer to used outside of async loops

Source code in hololinked/hololinked/core/zmq/brokers.py
class EventConsumer(BaseEventConsumer, BaseSyncZMQ):
    """Sync Event Consumer to used outside of async loops"""

    def receive(self, timeout: float | None = 1000, raise_interrupt_as_exception: bool = False) -> EventMessage | None:
        """
        receive event with given timeout

        Parameters
        ----------
        timeout: float, int, None
            timeout in milliseconds, None for blocking
        raise_interrupt_as_exception: bool
            if True, raises BreakLoop exception when interrupted, otherwise returns None
        """
        self._stop = False
        while not self._stop:
            try:
                if not self._poller_lock.acquire(timeout=timeout / 1000 if timeout else -1):
                    continue
                sockets = self.poller.poll(timeout)  # list[tuple[zmq.Socket, int]]
                if len(sockets) > 1:
                    # if there is an interrupt message as well as an event,
                    # give preference to interrupt message.
                    if sockets[0][0] == self.interrupting_peer:
                        sockets = [sockets[0]]  # we still need the socket, poll event  tuple
                    elif sockets[1][0] == self.interrupting_peer:
                        sockets = [sockets[1]]
                for socket, _ in sockets:
                    try:
                        raw_message = socket.recv_multipart(zmq.NOBLOCK)
                        message = EventMessage(raw_message)
                        if socket == self.interrupting_peer:
                            if message.payload.deserialize() == "INTERRUPT":
                                self.stop_polling()
                                if raise_interrupt_as_exception:
                                    raise BreakLoop("event consumer interrupted")
                                return
                        return message
                    except zmq.Again:
                        pass
                    # if not self.handled_default_message_types(event_message):
            finally:
                try:
                    self._poller_lock.release()
                except Exception as ex:
                    self.logger.warning(f"could not release poller lock for event receive - {str(ex)}")

    def interrupt(self):
        """
        interrupts the event consumer. Generally should be used for exiting this object if there is no poll
        period/infinite polling. Otherwise please use stop_polling().
        """
        self.interrupting_peer.send_multipart(self.interrupt_message.byte_array)

Functions

receive

receive(timeout: float | None = 1000, raise_interrupt_as_exception: bool = False) -> EventMessage | None

receive event with given timeout

Parameters:

Name Type Description Default
timeout
float | None

timeout in milliseconds, None for blocking

1000
raise_interrupt_as_exception
bool

if True, raises BreakLoop exception when interrupted, otherwise returns None

False
Source code in hololinked/hololinked/core/zmq/brokers.py
def receive(self, timeout: float | None = 1000, raise_interrupt_as_exception: bool = False) -> EventMessage | None:
    """
    receive event with given timeout

    Parameters
    ----------
    timeout: float, int, None
        timeout in milliseconds, None for blocking
    raise_interrupt_as_exception: bool
        if True, raises BreakLoop exception when interrupted, otherwise returns None
    """
    self._stop = False
    while not self._stop:
        try:
            if not self._poller_lock.acquire(timeout=timeout / 1000 if timeout else -1):
                continue
            sockets = self.poller.poll(timeout)  # list[tuple[zmq.Socket, int]]
            if len(sockets) > 1:
                # if there is an interrupt message as well as an event,
                # give preference to interrupt message.
                if sockets[0][0] == self.interrupting_peer:
                    sockets = [sockets[0]]  # we still need the socket, poll event  tuple
                elif sockets[1][0] == self.interrupting_peer:
                    sockets = [sockets[1]]
            for socket, _ in sockets:
                try:
                    raw_message = socket.recv_multipart(zmq.NOBLOCK)
                    message = EventMessage(raw_message)
                    if socket == self.interrupting_peer:
                        if message.payload.deserialize() == "INTERRUPT":
                            self.stop_polling()
                            if raise_interrupt_as_exception:
                                raise BreakLoop("event consumer interrupted")
                            return
                    return message
                except zmq.Again:
                    pass
                # if not self.handled_default_message_types(event_message):
        finally:
            try:
                self._poller_lock.release()
            except Exception as ex:
                self.logger.warning(f"could not release poller lock for event receive - {str(ex)}")

interrupt

interrupt()

interrupts the event consumer. Generally should be used for exiting this object if there is no poll period/infinite polling. Otherwise please use stop_polling().

Source code in hololinked/hololinked/core/zmq/brokers.py
def interrupt(self):
    """
    interrupts the event consumer. Generally should be used for exiting this object if there is no poll
    period/infinite polling. Otherwise please use stop_polling().
    """
    self.interrupting_peer.send_multipart(self.interrupt_message.byte_array)

hololinked.core.zmq.brokers.AsyncEventConsumer

Bases: BaseEventConsumer, BaseAsyncZMQ

Async Event Consumer to be used inside async loops

Source code in hololinked/hololinked/core/zmq/brokers.py
class AsyncEventConsumer(BaseEventConsumer, BaseAsyncZMQ):
    """Async Event Consumer to be used inside async loops"""

    async def receive(
        self,
        timeout: float | None = 1000,
        raise_interrupt_as_exception: bool = False,
    ) -> EventMessage | None:
        """
        receive event with given timeout

        Parameters
        ----------
        timeout: float, int, None
            timeout in milliseconds, None for blocking
        raise_interrupt_as_exception: bool
            if True, raises BreakLoop exception when interrupted, otherwise returns None
        """
        # TODO - use raise_interrupt_as_exception
        self._stop = False
        while not self._stop:
            try:
                try:
                    await asyncio.wait_for(
                        self._poller_lock.acquire(),
                        timeout=timeout / 1000 if timeout else None,
                    )
                except TimeoutError:
                    continue
                sockets = await self.poller.poll(timeout)
                if len(sockets) > 1:
                    # if there is an interrupt message as well as an event,
                    # give preference to interrupt message.
                    if sockets[0][0] == self.interrupting_peer:
                        sockets = [sockets[0]]
                    elif sockets[1][0] == self.interrupting_peer:
                        sockets = [sockets[1]]
                for socket, _ in sockets:
                    try:
                        raw_message = await socket.recv_multipart(zmq.NOBLOCK)
                        message = EventMessage(raw_message)
                        if socket == self.interrupting_peer:
                            if message.payload.deserialize() == "INTERRUPT":
                                self.stop_polling()
                                if raise_interrupt_as_exception:
                                    raise BreakLoop("event consumer interrupted")
                                return
                        return message
                    except zmq.Again:
                        pass
            finally:
                try:
                    self._poller_lock.release()
                except Exception as ex:
                    self.logger.warning(f"could not release poller lock for event receive - {str(ex)}")

    async def interrupt(self):
        """
        interrupts the event consumer. Generally should be used for exiting this object if there is no poll
        period/infinite polling. Otherwise please use stop_polling().
        """
        await self.interrupting_peer.send_multipart(self.interrupt_message.byte_array)

Functions

receive async

receive(timeout: float | None = 1000, raise_interrupt_as_exception: bool = False) -> EventMessage | None

receive event with given timeout

Parameters:

Name Type Description Default
timeout
float | None

timeout in milliseconds, None for blocking

1000
raise_interrupt_as_exception
bool

if True, raises BreakLoop exception when interrupted, otherwise returns None

False
Source code in hololinked/hololinked/core/zmq/brokers.py
async def receive(
    self,
    timeout: float | None = 1000,
    raise_interrupt_as_exception: bool = False,
) -> EventMessage | None:
    """
    receive event with given timeout

    Parameters
    ----------
    timeout: float, int, None
        timeout in milliseconds, None for blocking
    raise_interrupt_as_exception: bool
        if True, raises BreakLoop exception when interrupted, otherwise returns None
    """
    # TODO - use raise_interrupt_as_exception
    self._stop = False
    while not self._stop:
        try:
            try:
                await asyncio.wait_for(
                    self._poller_lock.acquire(),
                    timeout=timeout / 1000 if timeout else None,
                )
            except TimeoutError:
                continue
            sockets = await self.poller.poll(timeout)
            if len(sockets) > 1:
                # if there is an interrupt message as well as an event,
                # give preference to interrupt message.
                if sockets[0][0] == self.interrupting_peer:
                    sockets = [sockets[0]]
                elif sockets[1][0] == self.interrupting_peer:
                    sockets = [sockets[1]]
            for socket, _ in sockets:
                try:
                    raw_message = await socket.recv_multipart(zmq.NOBLOCK)
                    message = EventMessage(raw_message)
                    if socket == self.interrupting_peer:
                        if message.payload.deserialize() == "INTERRUPT":
                            self.stop_polling()
                            if raise_interrupt_as_exception:
                                raise BreakLoop("event consumer interrupted")
                            return
                    return message
                except zmq.Again:
                    pass
        finally:
            try:
                self._poller_lock.release()
            except Exception as ex:
                self.logger.warning(f"could not release poller lock for event receive - {str(ex)}")

interrupt async

interrupt()

interrupts the event consumer. Generally should be used for exiting this object if there is no poll period/infinite polling. Otherwise please use stop_polling().

Source code in hololinked/hololinked/core/zmq/brokers.py
async def interrupt(self):
    """
    interrupts the event consumer. Generally should be used for exiting this object if there is no poll
    period/infinite polling. Otherwise please use stop_polling().
    """
    await self.interrupting_peer.send_multipart(self.interrupt_message.byte_array)