Skip to content

hololinked.core.zmq.brokers.AsyncZMQServer

Bases: BaseZMQServer, BaseAsyncZMQ

An async ZMQ server that can handle multiple clients. Implements both blocking (non-polled) and non-blocking/polling form of receiving messages and sending replies. This server can be stopped from server side by calling stop_polling().

Source code in hololinked/hololinked/core/zmq/brokers.py
class AsyncZMQServer(BaseZMQServer, BaseAsyncZMQ):
    """
    An async ZMQ server that can handle multiple clients. Implements both blocking (non-polled) and
    non-blocking/polling form of receiving messages and sending replies. This server can be stopped from server side
    by calling `stop_polling()`.
    """

    def __init__(
        self,
        *,
        id: str,
        context: zmq.asyncio.Context | None = None,
        socket_type: zmq.SocketType = zmq.ROUTER,
        access_point: str = ZMQ_TRANSPORTS.IPC,
        poll_timeout: int = 25,
        **kwargs,
    ) -> None:
        """
        Parameters
        ----------
        id: str
            `id` of the server, also used as the identity of the ZMQ socket
        context: Optional, zmq.asyncio.Context
            ZeroMQ Context object to use. If None, a global context is used.
        socket_type: zmq.SocketType, default zmq.ROUTER
            socket type of ZMQ socket, default is ROUTER (enables address based routing of messages)
        access_point: Enum | str, default ZMQ_TRANSPORTS.IPC
            Use `TCP` or `tcp://*:<port>` for network access, `IPC` for multi-process applications,
            and `INPROC` for multi-threaded applications.
        poll_timeout: int, default 25
            time in milliseconds to poll the sockets specified under `procotols`. Useful for calling `stop_polling()`
            where the max delay to stop polling will be `poll_timeout`.
        kwargs: dict
            Additional arguments for `BaseZMQ` and `BaseAsyncZMQ` classes.

            - `logger`: logger instance to use. If None, a default logger is created.
        """

        super().__init__(id=id, **kwargs)
        self.create_socket(
            server_id=id,
            socket_id=id,
            node_type="server",
            context=context,
            access_point=access_point,
            socket_type=socket_type,
            **kwargs,
        )  # for server the server ID and socket ID is the same, only for clients they differ
        self.poller = zmq.asyncio.Poller()
        self.poller.register(self.socket, zmq.POLLIN)
        self.poll_timeout = poll_timeout

    @property
    def poll_timeout(self) -> int:
        """socket polling timeout in milliseconds greater than 0"""
        return self._poll_timeout

    @poll_timeout.setter
    def poll_timeout(self, value) -> None:
        if not isinstance(value, int) or value < 0:
            raise ValueError(
                f"polling period must be an integer greater than 0, not {value}. "
                + "Value is considered in milliseconds."
            )
        self._poll_timeout = value

    async def async_recv_request(self) -> RequestMessage:
        """
        Receive one message in a blocking form. There is no polling, therefore this method blocks until a message is
        received.

        Returns
        -------
        message: RequestMessage
            received message
        """
        while True:
            raw_message = await self.socket.recv_multipart()
            request_message = RequestMessage(raw_message)
            if not self.handled_default_message_types(request_message) and raw_message:
                self.logger.debug(
                    "received message from client",
                    client_id=request_message.sender_id,
                    msg_id=request_message.id,
                    message_type=request_message.type,
                )
                return request_message

    async def async_recv_requests(self) -> list[RequestMessage]:
        """
        Receive all currently available messages in blocking form. There is no polling, therefore this method
        blocks until at least one message is received.

        Returns
        -------
        messages: list[RequestMessage]
            list of received messages
        """
        messages = [await self.async_recv_request()]
        while True:
            try:
                raw_message = await self.socket.recv_multipart(zmq.NOBLOCK)
                request_message = RequestMessage(raw_message)
                if not self.handled_default_message_types(request_message) and raw_message:
                    self.logger.debug(
                        "received message from client",
                        client_id=request_message.sender_id,
                        msg_id=request_message.id,
                        message_type=request_message.type,
                    )
                    messages.append(request_message)
            except zmq.Again:
                break
        return messages

    async def async_send_response(
        self,
        request_message: RequestMessage,
        payload: SerializableData = SerializableNone,
        preserialized_payload: PreserializedData = PreserializedEmptyByte,
    ) -> None:
        """
        Send response message for a request message.

        Parameters
        ----------
        request_message: RequestMessage
            original message based on which the response message can be automatically crafted and routed
        payload: SerializableData
            serializable data to be sent as response
        preserialized_payload: PreserializedData
            pre-encoded data to be sent as payload, generally used for large or custom data that is already serialized
        """
        response_message = ResponseMessage.craft_reply_from_request(
            request_message=request_message,
            payload=payload,
            preserialized_payload=preserialized_payload,
        )
        await self.socket.send_multipart(response_message.byte_array)
        self.logger.debug(
            "sent response to client",
            receiver_id=response_message.receiver_id,
            msg_id=response_message.id,
            message_type=response_message.type,
        )

    async def async_send_response_with_message_type(
        self,
        request_message: RequestMessage,
        message_type: str,
        payload: SerializableData = SerializableNone,
        preserialized_payload: PreserializedData = PreserializedEmptyByte,
    ) -> None:
        """
        Send a specific response message apart from `REPLY` for a request message.

        Parameters
        ----------
        request_message: RequestMessage
            original message based on which the response message can be automatically crafted and routed
        payload: SerializableData
            serializable data to be sent as response
        preserialized_payload: PreserializedData
            pre-encoded data to be sent as payload, generally used for large or custom data that is already serialized
        """
        response_message = ResponseMessage.craft_from_arguments(
            receiver_id=request_message.sender_id,
            sender_id=self.id,
            message_type=message_type or REPLY,
            message_id=request_message.id,
            payload=payload,
            preserialized_payload=preserialized_payload,
        )
        await self.socket.send_multipart(response_message.byte_array)
        self.logger.debug(
            "sent response to client",
            receiver_id=response_message.receiver_id,
            msg_id=response_message.id,
            message_type=response_message.type,
        )

    async def poll_requests(self) -> list[RequestMessage]:
        """
        poll for messages with specified timeout (`poll_timeout`) and return if any messages are available.
        This method can be stopped from another method in a different thread or asyncio task (not in the same thread though).

        Returns
        -------
        messages: list[RequestMessage]
            list of received messages
        """
        self.stop_poll = False
        messages = []
        while not self.stop_poll:
            sockets = await self.poller.poll(self._poll_timeout)  # type hints dont work in this line
            for socket, _ in sockets:
                while True:
                    try:
                        raw_message = await socket.recv_multipart(zmq.NOBLOCK)
                    except zmq.Again:
                        break
                    else:
                        request_message = RequestMessage(raw_message)
                        if not self.handled_default_message_types(request_message) and raw_message:
                            self.logger.debug(
                                "received message from client",
                                sender_id=request_message.sender_id,
                                receiver_id=request_message.receiver_id,
                                msg_id=request_message.id,
                                message_type=request_message.type,
                            )
                            messages.append(request_message)
            if len(messages) > 0:
                break
        return messages

    def stop_polling(self) -> None:
        """stop polling and unblock `poll_messages()` method"""
        self.stop_poll = True

    async def _handshake(self, request_message: RequestMessage) -> None:
        """
        Inner method that handles handshake. Scheduled by `handshake()` method, signature same as `handshake()`.
        """
        # Note that for ROUTER sockets, once the message goes through the sending socket, the address of the receiver
        # is replaced by the address of the sender once received
        handshake_message = ResponseMessage.craft_from_arguments(
            receiver_id=request_message.sender_id,
            sender_id=self.id,
            message_type=HANDSHAKE,
            message_id=request_message.id,
        )
        await self.socket.send_multipart(handshake_message.byte_array)
        self.logger.info(
            "sent handshake to client",
            receiver_id=handshake_message.receiver_id,
            msg_id=handshake_message.id,
        )

    async def _handle_timeout(self, request_message: RequestMessage, timeout_type: str) -> None:
        """
        Inner method that handles timeout. Scheduled by `handle_timeout()`, signature same as `handle_timeout`.
        """
        timeout_message = ResponseMessage.craft_from_arguments(
            receiver_id=request_message.sender_id,
            sender_id=self.id,
            message_type=TIMEOUT,
            message_id=request_message.id,
            payload=SerializableData(timeout_type, content_type="application/json"),
        )
        await self.socket.send_multipart(timeout_message.byte_array)
        self.logger.warning(
            f"sent {timeout_type} timeout to client",
            receiver_id=timeout_message.receiver_id,
            msg_id=timeout_message.id,
        )

    async def _handle_invalid_message(self, request_message: RequestMessage, exception: Exception) -> None:
        """
        Inner method that handles invalid messages. Scheduled by `handle_invalid_message()`,
        signature same as `handle_invalid_message()`.
        """
        invalid_message = ResponseMessage.craft_from_arguments(
            receiver_id=request_message.sender_id,
            sender_id=self.id,
            message_type=INVALID_MESSAGE,
            message_id=request_message.id,
            payload=SerializableData(
                dict(exception=format_exception_as_json(exception)),
                content_type="application/json",
            ),
        )
        await self.socket.send_multipart(invalid_message.byte_array)
        self.logger.warning(
            f"informed client about invalid message due to exception - {str(exception)}",
            receiver_id=invalid_message.receiver_id,
            msg_id=invalid_message.id,
        )

    async def _handle_error_message(self, request_message: RequestMessage, exception: Exception) -> None:
        error_message = ResponseMessage.craft_with_message_type(
            request_message=request_message,
            message_type=ERROR,
            payload=SerializableData(
                dict(exception=format_exception_as_json(exception)),
                content_type="application/json",
            ),
        )
        await self.socket.send_multipart(error_message.byte_array)
        self.logger.warning(
            f"sent error message to client for exception - {str(exception)}",
            receiver_id=error_message.receiver_id,
            msg_id=error_message.id,
        )

    def exit(self) -> None:
        """unregister socket from poller and terminate socket. context is not terminated as it may be shared."""
        try:
            BaseZMQ.exit(self)
            self.poller.unregister(self.socket)
            self.socket.close(0)
            self.logger.info("terminated socket of server")
        except Exception as ex:
            self.logger.warning(f"error while closing socket - {str(ex)}")

Functions

__init__

__init__(*, id: str, context: Context | None = None, socket_type: SocketType = zmq.ROUTER, access_point: str = ZMQ_TRANSPORTS.IPC, poll_timeout: int = 25, **kwargs) -> None

Parameters:

Name Type Description Default

id

str

id of the server, also used as the identity of the ZMQ socket

required

context

Context | None

ZeroMQ Context object to use. If None, a global context is used.

None

socket_type

SocketType

socket type of ZMQ socket, default is ROUTER (enables address based routing of messages)

ROUTER

access_point

str

Use TCP or tcp://*:<port> for network access, IPC for multi-process applications, and INPROC for multi-threaded applications.

IPC

poll_timeout

int

time in milliseconds to poll the sockets specified under procotols. Useful for calling stop_polling() where the max delay to stop polling will be poll_timeout.

25

kwargs

Additional arguments for BaseZMQ and BaseAsyncZMQ classes.

  • logger: logger instance to use. If None, a default logger is created.
{}
Source code in hololinked/hololinked/core/zmq/brokers.py
def __init__(
    self,
    *,
    id: str,
    context: zmq.asyncio.Context | None = None,
    socket_type: zmq.SocketType = zmq.ROUTER,
    access_point: str = ZMQ_TRANSPORTS.IPC,
    poll_timeout: int = 25,
    **kwargs,
) -> None:
    """
    Parameters
    ----------
    id: str
        `id` of the server, also used as the identity of the ZMQ socket
    context: Optional, zmq.asyncio.Context
        ZeroMQ Context object to use. If None, a global context is used.
    socket_type: zmq.SocketType, default zmq.ROUTER
        socket type of ZMQ socket, default is ROUTER (enables address based routing of messages)
    access_point: Enum | str, default ZMQ_TRANSPORTS.IPC
        Use `TCP` or `tcp://*:<port>` for network access, `IPC` for multi-process applications,
        and `INPROC` for multi-threaded applications.
    poll_timeout: int, default 25
        time in milliseconds to poll the sockets specified under `procotols`. Useful for calling `stop_polling()`
        where the max delay to stop polling will be `poll_timeout`.
    kwargs: dict
        Additional arguments for `BaseZMQ` and `BaseAsyncZMQ` classes.

        - `logger`: logger instance to use. If None, a default logger is created.
    """

    super().__init__(id=id, **kwargs)
    self.create_socket(
        server_id=id,
        socket_id=id,
        node_type="server",
        context=context,
        access_point=access_point,
        socket_type=socket_type,
        **kwargs,
    )  # for server the server ID and socket ID is the same, only for clients they differ
    self.poller = zmq.asyncio.Poller()
    self.poller.register(self.socket, zmq.POLLIN)
    self.poll_timeout = poll_timeout

async_recv_request async

async_recv_request() -> RequestMessage

Receive one message in a blocking form. There is no polling, therefore this method blocks until a message is received.

Returns:

Name Type Description
message RequestMessage

received message

Source code in hololinked/hololinked/core/zmq/brokers.py
async def async_recv_request(self) -> RequestMessage:
    """
    Receive one message in a blocking form. There is no polling, therefore this method blocks until a message is
    received.

    Returns
    -------
    message: RequestMessage
        received message
    """
    while True:
        raw_message = await self.socket.recv_multipart()
        request_message = RequestMessage(raw_message)
        if not self.handled_default_message_types(request_message) and raw_message:
            self.logger.debug(
                "received message from client",
                client_id=request_message.sender_id,
                msg_id=request_message.id,
                message_type=request_message.type,
            )
            return request_message

async_recv_requests async

async_recv_requests() -> list[RequestMessage]

Receive all currently available messages in blocking form. There is no polling, therefore this method blocks until at least one message is received.

Returns:

Name Type Description
messages list[RequestMessage]

list of received messages

Source code in hololinked/hololinked/core/zmq/brokers.py
async def async_recv_requests(self) -> list[RequestMessage]:
    """
    Receive all currently available messages in blocking form. There is no polling, therefore this method
    blocks until at least one message is received.

    Returns
    -------
    messages: list[RequestMessage]
        list of received messages
    """
    messages = [await self.async_recv_request()]
    while True:
        try:
            raw_message = await self.socket.recv_multipart(zmq.NOBLOCK)
            request_message = RequestMessage(raw_message)
            if not self.handled_default_message_types(request_message) and raw_message:
                self.logger.debug(
                    "received message from client",
                    client_id=request_message.sender_id,
                    msg_id=request_message.id,
                    message_type=request_message.type,
                )
                messages.append(request_message)
        except zmq.Again:
            break
    return messages

async_send_response async

async_send_response(request_message: RequestMessage, payload: SerializableData = SerializableNone, preserialized_payload: PreserializedData = PreserializedEmptyByte) -> None

Send response message for a request message.

Parameters:

Name Type Description Default

request_message

RequestMessage

original message based on which the response message can be automatically crafted and routed

required

payload

SerializableData

serializable data to be sent as response

SerializableNone

preserialized_payload

PreserializedData

pre-encoded data to be sent as payload, generally used for large or custom data that is already serialized

PreserializedEmptyByte
Source code in hololinked/hololinked/core/zmq/brokers.py
async def async_send_response(
    self,
    request_message: RequestMessage,
    payload: SerializableData = SerializableNone,
    preserialized_payload: PreserializedData = PreserializedEmptyByte,
) -> None:
    """
    Send response message for a request message.

    Parameters
    ----------
    request_message: RequestMessage
        original message based on which the response message can be automatically crafted and routed
    payload: SerializableData
        serializable data to be sent as response
    preserialized_payload: PreserializedData
        pre-encoded data to be sent as payload, generally used for large or custom data that is already serialized
    """
    response_message = ResponseMessage.craft_reply_from_request(
        request_message=request_message,
        payload=payload,
        preserialized_payload=preserialized_payload,
    )
    await self.socket.send_multipart(response_message.byte_array)
    self.logger.debug(
        "sent response to client",
        receiver_id=response_message.receiver_id,
        msg_id=response_message.id,
        message_type=response_message.type,
    )

async_send_response_with_message_type async

async_send_response_with_message_type(request_message: RequestMessage, message_type: str, payload: SerializableData = SerializableNone, preserialized_payload: PreserializedData = PreserializedEmptyByte) -> None

Send a specific response message apart from REPLY for a request message.

Parameters:

Name Type Description Default

request_message

RequestMessage

original message based on which the response message can be automatically crafted and routed

required

payload

SerializableData

serializable data to be sent as response

SerializableNone

preserialized_payload

PreserializedData

pre-encoded data to be sent as payload, generally used for large or custom data that is already serialized

PreserializedEmptyByte
Source code in hololinked/hololinked/core/zmq/brokers.py
async def async_send_response_with_message_type(
    self,
    request_message: RequestMessage,
    message_type: str,
    payload: SerializableData = SerializableNone,
    preserialized_payload: PreserializedData = PreserializedEmptyByte,
) -> None:
    """
    Send a specific response message apart from `REPLY` for a request message.

    Parameters
    ----------
    request_message: RequestMessage
        original message based on which the response message can be automatically crafted and routed
    payload: SerializableData
        serializable data to be sent as response
    preserialized_payload: PreserializedData
        pre-encoded data to be sent as payload, generally used for large or custom data that is already serialized
    """
    response_message = ResponseMessage.craft_from_arguments(
        receiver_id=request_message.sender_id,
        sender_id=self.id,
        message_type=message_type or REPLY,
        message_id=request_message.id,
        payload=payload,
        preserialized_payload=preserialized_payload,
    )
    await self.socket.send_multipart(response_message.byte_array)
    self.logger.debug(
        "sent response to client",
        receiver_id=response_message.receiver_id,
        msg_id=response_message.id,
        message_type=response_message.type,
    )

poll_requests async

poll_requests() -> list[RequestMessage]

poll for messages with specified timeout (poll_timeout) and return if any messages are available. This method can be stopped from another method in a different thread or asyncio task (not in the same thread though).

Returns:

Name Type Description
messages list[RequestMessage]

list of received messages

Source code in hololinked/hololinked/core/zmq/brokers.py
async def poll_requests(self) -> list[RequestMessage]:
    """
    poll for messages with specified timeout (`poll_timeout`) and return if any messages are available.
    This method can be stopped from another method in a different thread or asyncio task (not in the same thread though).

    Returns
    -------
    messages: list[RequestMessage]
        list of received messages
    """
    self.stop_poll = False
    messages = []
    while not self.stop_poll:
        sockets = await self.poller.poll(self._poll_timeout)  # type hints dont work in this line
        for socket, _ in sockets:
            while True:
                try:
                    raw_message = await socket.recv_multipart(zmq.NOBLOCK)
                except zmq.Again:
                    break
                else:
                    request_message = RequestMessage(raw_message)
                    if not self.handled_default_message_types(request_message) and raw_message:
                        self.logger.debug(
                            "received message from client",
                            sender_id=request_message.sender_id,
                            receiver_id=request_message.receiver_id,
                            msg_id=request_message.id,
                            message_type=request_message.type,
                        )
                        messages.append(request_message)
        if len(messages) > 0:
            break
    return messages

stop_polling

stop_polling() -> None

stop polling and unblock poll_messages() method

Source code in hololinked/hololinked/core/zmq/brokers.py
def stop_polling(self) -> None:
    """stop polling and unblock `poll_messages()` method"""
    self.stop_poll = True

poll_timeout

poll_timeout(value) -> None
Source code in hololinked/hololinked/core/zmq/brokers.py
@poll_timeout.setter
def poll_timeout(self, value) -> None:
    if not isinstance(value, int) or value < 0:
        raise ValueError(
            f"polling period must be an integer greater than 0, not {value}. "
            + "Value is considered in milliseconds."
        )
    self._poll_timeout = value

exit

exit() -> None

unregister socket from poller and terminate socket. context is not terminated as it may be shared.

Source code in hololinked/hololinked/core/zmq/brokers.py
def exit(self) -> None:
    """unregister socket from poller and terminate socket. context is not terminated as it may be shared."""
    try:
        BaseZMQ.exit(self)
        self.poller.unregister(self.socket)
        self.socket.close(0)
        self.logger.info("terminated socket of server")
    except Exception as ex:
        self.logger.warning(f"error while closing socket - {str(ex)}")