Skip to content

hololinked.core.zmq.brokers.AsyncZMQServer

Bases: BaseZMQServer, BaseAsyncZMQ

Implements both blocking (non-polled) and non-blocking/polling form of receive messages and send replies This server can be stopped from server side by calling stop_polling() unlike AsyncZMQServer which cannot be stopped manually unless a message arrives.

Parameters:

Name Type Description Default

id

str

id of the Thing which the server serves

required

server_type

server type metadata - currently not useful/important

required

context

Union[Context, None]

ZeroMQ Context object to use. All sockets share this context. Automatically created when None is supplied.

None

socket_type

SocketType

socket type of ZMQ socket, default is ROUTER (enables address based routing of messages)

ROUTER

transport

ZMQ_TRANSPORTS

Use TCP for network access, IPC for multi-process applications, and INPROC for multi-threaded applications.

IPC

poll_timeout

time in milliseconds to poll the sockets specified under procotols. Useful for calling stop_polling() where the max delay to stop polling will be poll_timeout

25
Source code in hololinked\core\zmq\brokers.py
class AsyncZMQServer(BaseZMQServer, BaseAsyncZMQ):
    """
    Implements both blocking (non-polled) and non-blocking/polling form of receive messages and send replies
    This server can be stopped from server side by calling ``stop_polling()`` unlike ``AsyncZMQServer`` which 
    cannot be stopped manually unless a message arrives.

    Parameters
    ----------
    id: str
        ``id`` of the Thing which the server serves
    server_type: str
        server type metadata - currently not useful/important
    context: Optional, zmq.asyncio.Context
        ZeroMQ Context object to use. All sockets share this context. Automatically created when None is supplied.
    socket_type: zmq.SocketType, default zmq.ROUTER
        socket type of ZMQ socket, default is ROUTER (enables address based routing of messages)
    transport: Enum, default ZMQ_TRANSPORTS.IPC
        Use TCP for network access, IPC for multi-process applications, and INPROC for multi-threaded applications.  
    poll_timeout: int, default 25
        time in milliseconds to poll the sockets specified under ``procotols``. Useful for calling ``stop_polling()``
        where the max delay to stop polling will be ``poll_timeout``
    """

    def __init__(self, *, id: str, context: typing.Union[zmq.asyncio.Context, None] = None, 
                socket_type: zmq.SocketType = zmq.ROUTER, transport: ZMQ_TRANSPORTS = ZMQ_TRANSPORTS.IPC, 
                poll_timeout = 25, **kwargs) -> None:
        super().__init__(id=id, **kwargs)
        self.create_socket(id=id, node_type='server', context=context, transport=transport, 
                        socket_type=socket_type, **kwargs) 
        self._terminate_context = context == None # terminate if it was created by instance
        self.poller = zmq.asyncio.Poller()
        self.poller.register(self.socket, zmq.POLLIN)
        self.poll_timeout = poll_timeout


    @property
    def poll_timeout(self) -> int:
        """
        socket polling timeout in milliseconds greater than 0. 
        """
        return self._poll_timeout

    @poll_timeout.setter
    def poll_timeout(self, value) -> None:
        if not isinstance(value, int) or value < 0:
            raise ValueError(f"polling period must be an integer greater than 0, not {value}. Value is considered in milliseconds.")
        self._poll_timeout = value 


    async def async_recv_request(self) -> RequestMessage:
        """
        Receive one message in a blocking form. Async for multi-server paradigm, each server should schedule
        this method in the event loop explicitly. This is taken care by the ``Eventloop`` & ``RPCServer``.   

        Returns
        -------
        message: RequestMessage
            received message with important content (operation, arguments, thing execution context) deserialized. 
        """
        while True:
            raw_message = await self.socket.recv_multipart()
            request_message = RequestMessage(raw_message)
            if not self.handled_default_message_types(request_message) and raw_message:
                self.logger.debug(f"received message from client '{request_message.sender_id}' with msg-ID '{request_message.id}'")
                return request_message


    async def async_recv_requests(self) -> typing.List[RequestMessage]:
        """
        Receive all currently available messages in blocking form. Async for multi-server paradigm, each server should schedule
        this method in the event loop explicitly. This is taken care by the ``Eventloop`` & ``RPCServer``. 

        Returns
        -------
        messages: typing.List[RequestMessage]
            list of received messages with important content (operation, arguments, execution context) deserialized.
        """
        messages = [await self.async_recv_request()]
        while True:
            try:
                raw_message = await self.socket.recv_multipart(zmq.NOBLOCK)
                request_message = RequestMessage(raw_message)
                if not self.handled_default_message_types(request_message) and raw_message:
                    self.logger.debug(f"received message from client '{request_message.sender_id}' with msg-ID '{request_message.id}'")
                    messages.append(request_message)
            except zmq.Again: 
                break 
        return messages


    async def async_send_response(self, 
                    request_message: RequestMessage, 
                    payload: SerializableData = SerializableNone, 
                    preserialized_payload: PreserializedData = PreserializedEmptyByte
                ) -> None:
        """
        Send response message for a request message. 

        Parameters
        ----------
        request_message: List[bytes]
            original message so that the response can be properly crafted and routed
        data: Any
            serializable data to be sent as response
        pre_encoded_data: bytes
            pre-encoded data, generally used for large or custom data that is already serialized

        Returns
        -------
        None
        """
        await self.socket.send_multipart(
            ResponseMessage.craft_reply_from_request(
                request_message=request_message,
                payload=payload,
                preserialized_payload=preserialized_payload                
            ).byte_array
        )
        self.logger.debug(f"sent response to client '{request_message.sender_id}' with msg-ID '{request_message.id}'")


    async def async_send_response_with_message_type(self, 
                                        request_message: RequestMessage, 
                                        message_type: str,
                                        payload: SerializableData = SerializableNone,
                                        preserialized_payload: PreserializedData = PreserializedEmptyByte
                                    ) -> None:
        """
        Send response message for a request message. 

        Parameters
        ----------
        request_message: List[bytes]
            original message so that the response can be properly crafted and routed
        data: Any
            serializable data to be sent as response

        Returns
        -------
        None
        """
        await self.socket.send_multipart(
            ResponseMessage.craft_from_arguments(
                receiver_id=request_message.sender_id,
                sender_id=self.id, 
                message_type=message_type or REPLY,
                message_id=request_message.id,
                payload=payload,
                preserialized_payload=preserialized_payload
            ).byte_array
        )
        self.logger.debug(f"sent response to client '{request_message.sender_id}' with msg-ID '{request_message.id}'")


    async def poll_requests(self) -> typing.List[RequestMessage]:
        """
        poll for messages with specified timeout (``poll_timeout``) and return if any messages are available.
        This method blocks, so make sure other methods are scheduled which can stop polling. 

        Returns
        -------
        messages: List[List[bytes]]
            list of received messages with important content (operation, arguments, thing execution context) deserialized.
        """
        self.stop_poll = False
        messages = []
        while not self.stop_poll:
            sockets = await self.poller.poll(self._poll_timeout) # type hints dont work in this line
            for socket, _ in sockets:
                while True:
                    try:
                        raw_message = await socket.recv_multipart(zmq.NOBLOCK)
                    except zmq.Again:
                        break
                    else:                        
                        request_message = RequestMessage(raw_message)
                        if not self.handled_default_message_types(request_message) and raw_message:
                            self.logger.debug(f"received message from client '{request_message.sender_id}' with msg-ID '{request_message.id}'")
                            messages.append(request_message)
            if len(messages) > 0:
                break
        return messages

    def stop_polling(self) -> None:
        """
        stop polling and unblock ``poll_messages()`` method
        """
        self.stop_poll = True 


    async def _handshake(self, request_message: RequestMessage) -> None:
        """
        Inner method that handles handshake. Scheduled by ``handshake()`` method, signature same as ``handshake()``.
        """
        # Note that for ROUTER sockets, once the message goes through the sending socket, the address of the receiver 
        # is replaced by the address of the sender once received 
        await self.socket.send_multipart(
            ResponseMessage.craft_from_arguments(
                receiver_id=request_message.sender_id,
                sender_id=self.id,
                message_type=HANDSHAKE, 
                message_id=request_message.id
            ).byte_array
        )
        self.logger.info(f"sent handshake to client '{request_message.sender_id}'")


    async def _handle_timeout(self, request_message: RequestMessage, timeout_type: str) -> None:
        """
        Inner method that handles timeout. Scheduled by ``handle_timeout()``, signature same as ``handle_timeout``.
        """
        await self.socket.send_multipart(
            ResponseMessage.craft_from_arguments(
                receiver_id=request_message.sender_id,
                sender_id=self.id,
                message_type=TIMEOUT,
                message_id=request_message.id,
                payload=SerializableData(timeout_type, content_type='application/json')
            ).byte_array   
        )
        self.logger.info(f"sent timeout to client '{request_message.sender_id}'")


    async def _handle_invalid_message(self, request_message: RequestMessage, exception: Exception) -> None:
        """
        Inner method that handles invalid messages. Scheduled by ``handle_invalid_message()``, 
        signature same as ``handle_invalid_message()``.
        """
        await self.socket.send_multipart(
            ResponseMessage.craft_from_arguments(
                receiver_id=request_message.sender_id,
                sender_id=self.id,
                message_type=INVALID_MESSAGE,
                message_id=request_message.id,
                payload=SerializableData(dict(exception=format_exception_as_json(exception)), content_type='application/json')
            ).byte_array
        )         
        self.logger.info(f"sent invalid message to client '{request_message.sender_id}'." +
                            f" exception - {str(exception)}") 	


    async def _handle_error_message(self, 
                                request_message: RequestMessage,
                                exception: Exception
                            ) -> None:
        response_message = ResponseMessage.craft_with_message_type(
                                                            request_message=request_message,
                                                            message_type=logging.ERROR,
                                                            payload=SerializableData(exception, content_type='application/json')
                                                        )
        await self.socket.send_multipart(response_message.byte_array)    
        self.logger.info(f"sent exception message to client '{response_message.receiver_id}'." +
                            f" exception - {str(exception)}")


    def exit(self) -> None:
        """
        unregister socket from poller and terminate socket and context.
        """
        try:
            BaseZMQ.exit(self)
            self.poller.unregister(self.socket)
            self.socket.close(0)
            self.logger.info(f"terminated socket of server '{self.id}' of type {self.__class__}")
        except Exception as ex:
            self.logger.warning(f"could not unregister socket {self.id} from polling - {str(ex)}")
        try:
            if self._terminate_context:
                self.context.term()
                self.logger.info("terminated context of socket '{}' of type '{}'".format(self.id, self.__class__))
        except Exception as ex:
            self.logger.warning("could not properly terminate context or attempted to terminate an already terminated " +
                            f" context '{self.id}'. Exception message: {str(ex)}")

Functions

__init__

__init__(*, id: str, context: typing.Union[zmq.asyncio.Context, None] = None, socket_type: zmq.SocketType = zmq.ROUTER, transport: ZMQ_TRANSPORTS = ZMQ_TRANSPORTS.IPC, poll_timeout=25, **kwargs) -> None
Source code in hololinked\core\zmq\brokers.py
def __init__(self, *, id: str, context: typing.Union[zmq.asyncio.Context, None] = None, 
            socket_type: zmq.SocketType = zmq.ROUTER, transport: ZMQ_TRANSPORTS = ZMQ_TRANSPORTS.IPC, 
            poll_timeout = 25, **kwargs) -> None:
    super().__init__(id=id, **kwargs)
    self.create_socket(id=id, node_type='server', context=context, transport=transport, 
                    socket_type=socket_type, **kwargs) 
    self._terminate_context = context == None # terminate if it was created by instance
    self.poller = zmq.asyncio.Poller()
    self.poller.register(self.socket, zmq.POLLIN)
    self.poll_timeout = poll_timeout

async_recv_request async

async_recv_request() -> RequestMessage

Receive one message in a blocking form. Async for multi-server paradigm, each server should schedule this method in the event loop explicitly. This is taken care by the Eventloop & RPCServer.

Returns:

Name Type Description
message RequestMessage

received message with important content (operation, arguments, thing execution context) deserialized.

Source code in hololinked\core\zmq\brokers.py
async def async_recv_request(self) -> RequestMessage:
    """
    Receive one message in a blocking form. Async for multi-server paradigm, each server should schedule
    this method in the event loop explicitly. This is taken care by the ``Eventloop`` & ``RPCServer``.   

    Returns
    -------
    message: RequestMessage
        received message with important content (operation, arguments, thing execution context) deserialized. 
    """
    while True:
        raw_message = await self.socket.recv_multipart()
        request_message = RequestMessage(raw_message)
        if not self.handled_default_message_types(request_message) and raw_message:
            self.logger.debug(f"received message from client '{request_message.sender_id}' with msg-ID '{request_message.id}'")
            return request_message

async_recv_requests async

async_recv_requests() -> typing.List[RequestMessage]

Receive all currently available messages in blocking form. Async for multi-server paradigm, each server should schedule this method in the event loop explicitly. This is taken care by the Eventloop & RPCServer.

Returns:

Name Type Description
messages List[RequestMessage]

list of received messages with important content (operation, arguments, execution context) deserialized.

Source code in hololinked\core\zmq\brokers.py
async def async_recv_requests(self) -> typing.List[RequestMessage]:
    """
    Receive all currently available messages in blocking form. Async for multi-server paradigm, each server should schedule
    this method in the event loop explicitly. This is taken care by the ``Eventloop`` & ``RPCServer``. 

    Returns
    -------
    messages: typing.List[RequestMessage]
        list of received messages with important content (operation, arguments, execution context) deserialized.
    """
    messages = [await self.async_recv_request()]
    while True:
        try:
            raw_message = await self.socket.recv_multipart(zmq.NOBLOCK)
            request_message = RequestMessage(raw_message)
            if not self.handled_default_message_types(request_message) and raw_message:
                self.logger.debug(f"received message from client '{request_message.sender_id}' with msg-ID '{request_message.id}'")
                messages.append(request_message)
        except zmq.Again: 
            break 
    return messages

async_send_response async

async_send_response(request_message: RequestMessage, payload: SerializableData = SerializableNone, preserialized_payload: PreserializedData = PreserializedEmptyByte) -> None

Send response message for a request message.

Parameters:

Name Type Description Default

request_message

RequestMessage

original message so that the response can be properly crafted and routed

required

data

serializable data to be sent as response

required

pre_encoded_data

pre-encoded data, generally used for large or custom data that is already serialized

required

Returns:

Type Description
None
Source code in hololinked\core\zmq\brokers.py
async def async_send_response(self, 
                request_message: RequestMessage, 
                payload: SerializableData = SerializableNone, 
                preserialized_payload: PreserializedData = PreserializedEmptyByte
            ) -> None:
    """
    Send response message for a request message. 

    Parameters
    ----------
    request_message: List[bytes]
        original message so that the response can be properly crafted and routed
    data: Any
        serializable data to be sent as response
    pre_encoded_data: bytes
        pre-encoded data, generally used for large or custom data that is already serialized

    Returns
    -------
    None
    """
    await self.socket.send_multipart(
        ResponseMessage.craft_reply_from_request(
            request_message=request_message,
            payload=payload,
            preserialized_payload=preserialized_payload                
        ).byte_array
    )
    self.logger.debug(f"sent response to client '{request_message.sender_id}' with msg-ID '{request_message.id}'")

async_send_response_with_message_type async

async_send_response_with_message_type(request_message: RequestMessage, message_type: str, payload: SerializableData = SerializableNone, preserialized_payload: PreserializedData = PreserializedEmptyByte) -> None

Send response message for a request message.

Parameters:

Name Type Description Default

request_message

RequestMessage

original message so that the response can be properly crafted and routed

required

data

serializable data to be sent as response

required

Returns:

Type Description
None
Source code in hololinked\core\zmq\brokers.py
async def async_send_response_with_message_type(self, 
                                    request_message: RequestMessage, 
                                    message_type: str,
                                    payload: SerializableData = SerializableNone,
                                    preserialized_payload: PreserializedData = PreserializedEmptyByte
                                ) -> None:
    """
    Send response message for a request message. 

    Parameters
    ----------
    request_message: List[bytes]
        original message so that the response can be properly crafted and routed
    data: Any
        serializable data to be sent as response

    Returns
    -------
    None
    """
    await self.socket.send_multipart(
        ResponseMessage.craft_from_arguments(
            receiver_id=request_message.sender_id,
            sender_id=self.id, 
            message_type=message_type or REPLY,
            message_id=request_message.id,
            payload=payload,
            preserialized_payload=preserialized_payload
        ).byte_array
    )
    self.logger.debug(f"sent response to client '{request_message.sender_id}' with msg-ID '{request_message.id}'")

poll_requests async

poll_requests() -> typing.List[RequestMessage]

poll for messages with specified timeout (poll_timeout) and return if any messages are available. This method blocks, so make sure other methods are scheduled which can stop polling.

Returns:

Name Type Description
messages List[List[bytes]]

list of received messages with important content (operation, arguments, thing execution context) deserialized.

Source code in hololinked\core\zmq\brokers.py
async def poll_requests(self) -> typing.List[RequestMessage]:
    """
    poll for messages with specified timeout (``poll_timeout``) and return if any messages are available.
    This method blocks, so make sure other methods are scheduled which can stop polling. 

    Returns
    -------
    messages: List[List[bytes]]
        list of received messages with important content (operation, arguments, thing execution context) deserialized.
    """
    self.stop_poll = False
    messages = []
    while not self.stop_poll:
        sockets = await self.poller.poll(self._poll_timeout) # type hints dont work in this line
        for socket, _ in sockets:
            while True:
                try:
                    raw_message = await socket.recv_multipart(zmq.NOBLOCK)
                except zmq.Again:
                    break
                else:                        
                    request_message = RequestMessage(raw_message)
                    if not self.handled_default_message_types(request_message) and raw_message:
                        self.logger.debug(f"received message from client '{request_message.sender_id}' with msg-ID '{request_message.id}'")
                        messages.append(request_message)
        if len(messages) > 0:
            break
    return messages

_handshake async

_handshake(request_message: RequestMessage) -> None

Inner method that handles handshake. Scheduled by handshake() method, signature same as handshake().

Source code in hololinked\core\zmq\brokers.py
async def _handshake(self, request_message: RequestMessage) -> None:
    """
    Inner method that handles handshake. Scheduled by ``handshake()`` method, signature same as ``handshake()``.
    """
    # Note that for ROUTER sockets, once the message goes through the sending socket, the address of the receiver 
    # is replaced by the address of the sender once received 
    await self.socket.send_multipart(
        ResponseMessage.craft_from_arguments(
            receiver_id=request_message.sender_id,
            sender_id=self.id,
            message_type=HANDSHAKE, 
            message_id=request_message.id
        ).byte_array
    )
    self.logger.info(f"sent handshake to client '{request_message.sender_id}'")

_handle_timeout async

_handle_timeout(request_message: RequestMessage, timeout_type: str) -> None

Inner method that handles timeout. Scheduled by handle_timeout(), signature same as handle_timeout.

Source code in hololinked\core\zmq\brokers.py
async def _handle_timeout(self, request_message: RequestMessage, timeout_type: str) -> None:
    """
    Inner method that handles timeout. Scheduled by ``handle_timeout()``, signature same as ``handle_timeout``.
    """
    await self.socket.send_multipart(
        ResponseMessage.craft_from_arguments(
            receiver_id=request_message.sender_id,
            sender_id=self.id,
            message_type=TIMEOUT,
            message_id=request_message.id,
            payload=SerializableData(timeout_type, content_type='application/json')
        ).byte_array   
    )
    self.logger.info(f"sent timeout to client '{request_message.sender_id}'")

_handle_invalid_message async

_handle_invalid_message(request_message: RequestMessage, exception: Exception) -> None

Inner method that handles invalid messages. Scheduled by handle_invalid_message(), signature same as handle_invalid_message().

Source code in hololinked\core\zmq\brokers.py
async def _handle_invalid_message(self, request_message: RequestMessage, exception: Exception) -> None:
    """
    Inner method that handles invalid messages. Scheduled by ``handle_invalid_message()``, 
    signature same as ``handle_invalid_message()``.
    """
    await self.socket.send_multipart(
        ResponseMessage.craft_from_arguments(
            receiver_id=request_message.sender_id,
            sender_id=self.id,
            message_type=INVALID_MESSAGE,
            message_id=request_message.id,
            payload=SerializableData(dict(exception=format_exception_as_json(exception)), content_type='application/json')
        ).byte_array
    )         
    self.logger.info(f"sent invalid message to client '{request_message.sender_id}'." +
                        f" exception - {str(exception)}") 	

exit

exit() -> None

unregister socket from poller and terminate socket and context.

Source code in hololinked\core\zmq\brokers.py
def exit(self) -> None:
    """
    unregister socket from poller and terminate socket and context.
    """
    try:
        BaseZMQ.exit(self)
        self.poller.unregister(self.socket)
        self.socket.close(0)
        self.logger.info(f"terminated socket of server '{self.id}' of type {self.__class__}")
    except Exception as ex:
        self.logger.warning(f"could not unregister socket {self.id} from polling - {str(ex)}")
    try:
        if self._terminate_context:
            self.context.term()
            self.logger.info("terminated context of socket '{}' of type '{}'".format(self.id, self.__class__))
    except Exception as ex:
        self.logger.warning("could not properly terminate context or attempted to terminate an already terminated " +
                        f" context '{self.id}'. Exception message: {str(ex)}")

poll_timeout

poll_timeout(value) -> None
Source code in hololinked\core\zmq\brokers.py
@poll_timeout.setter
def poll_timeout(self, value) -> None:
    if not isinstance(value, int) or value < 0:
        raise ValueError(f"polling period must be an integer greater than 0, not {value}. Value is considered in milliseconds.")
    self._poll_timeout = value