Skip to content

hololinked.core.zmq.brokers.AsyncZMQClient

Bases: BaseZMQClient, BaseAsyncZMQ

Asynchronous client to talk to a ZMQ server where the server is identified by the instance name. The identity of the client needs to be different from the server, unlike the ZMQ Server. The client will also perform handshakes if necessary.

Parameters:

Name Type Description Default

server_id

str

The instance name of the server (or Thing)

required

id

str

Unique identity of the client to receive messages from the server. Each client connecting to same server must still have unique ID.

required

client_type

ZMQ or HTTP Server

required

handshake

bool

when true, handshake with the server first before allowing first message and block until that handshake was accomplished.

True

transport

str

transport implemented by the server

'IPC'

**kwargs

socket_address: str socket address for connecting to TCP server zmq_serializer: custom implementation of ZMQ serializer if necessary http_serializer: custom implementation of JSON serializer if necessary

{}
Source code in hololinked\core\zmq\brokers.py
class AsyncZMQClient(BaseZMQClient, BaseAsyncZMQ):
    """ 
    Asynchronous client to talk to a ZMQ server where the server is identified by the instance name. The identity 
    of the client needs to be different from the server, unlike the ZMQ Server. The client will also perform handshakes 
    if necessary.

    Parameters
    ----------
    server_id: str
        The instance name of the server (or ``Thing``)  
    id: str
        Unique identity of the client to receive messages from the server. Each client connecting to same server must 
        still have unique ID.
    client_type: str
        ZMQ or HTTP Server
    handshake: bool
        when true, handshake with the server first before allowing first message and block until that handshake was
        accomplished.
    transport: str | Enum, TCP, IPC or INPROC, default IPC
        transport implemented by the server
    **kwargs:
        socket_address: str
            socket address for connecting to TCP server
        zmq_serializer:
            custom implementation of ZMQ serializer if necessary
        http_serializer:
            custom implementation of JSON serializer if necessary
    """

    def __init__(self, 
                id: str,
                server_id: str, 
                handshake: bool = True, 
                transport: str = "IPC", 
                context: zmq.asyncio.Context | None = None, 
                **kwargs
            ) -> None:
        super().__init__(id=id, server_id=server_id, **kwargs)
        self.create_socket(id=id, 
                        node_type='client', 
                        socket_address=server_id if str(transport) in ["IPC", "INPROC"] else kwargs.pop('socket_address', None),
                        context=context, 
                        transport=transport, 
                        **kwargs)
        self._monitor_socket = self.socket.get_monitor_socket()
        self.poller = zmq.asyncio.Poller()
        self.poller.register(self.socket, zmq.POLLIN)
        self.poller.register(self._monitor_socket, zmq.POLLIN) 
        self._terminate_context = context == None
        self._handshake_event = asyncio.Event()
        self._handshake_event.clear()
        if handshake:
            self.handshake(kwargs.pop("handshake_timeout", 60000))

    def handshake(self, timeout: int | None = 60000) -> None:
        """
        automatically called when handshake argument at init is True. When not automatically called, it is necessary
        to call this method before awaiting ``handshake_complete()``.
        """
        run_callable_somehow(self._handshake(timeout))

    async def _handshake(self, timeout: float | int | None = 60000) -> None:
        """
        hanshake with server before sending first message
        """
        if self._monitor_socket is not None and self._monitor_socket in self.poller:
            self.poller.unregister(self._monitor_socket)
        self._handshake_event.clear()
        start_time = time.time_ns()    
        while True:
            if timeout is not None and (time.time_ns() - start_time)/1e6 > timeout:
                raise ConnectionError(f"Unable to contact server '{self.server_id}' from client '{self.id}'")
            await self.socket.send_multipart(RequestMessage.craft_with_message_type(self.id, self.server_id, HANDSHAKE).byte_array)
            self.logger.info(f"sent Handshake to server '{self.server_id}'")
            if await self.poller.poll(500):
                try:
                    raw_message = await self.socket.recv_multipart(zmq.NOBLOCK)
                    response_message = ResponseMessage(raw_message)
                except zmq.Again:
                    pass 
                else:
                    if response_message.type == HANDSHAKE: # type: ignore
                        self.logger.info(f"client '{self.id}' handshook with server '{self.server_id}'")               
                        break
                    else:
                        raise ConnectionAbortedError(f"Handshake cannot be done with server '{self.server_id}'." + 
                                                    " Another message arrived before handshake complete.")
            else:
                self.logger.info('got no response for handshake')
        self._handshake_event.set()

    async def handshake_complete(self):
        """
        wait for handshake to complete
        """
        await self._handshake_event.wait()

    async def async_send_request(self, 
                                thing_id: str, 
                                objekt: str, 
                                operation: str, 
                                payload: SerializableData = SerializableNone,
                                preserialized_payload: PreserializedData = PreserializedEmptyByte,
                                server_execution_context: ServerExecutionContext = default_server_execution_context,
                                thing_execution_context: typing.Dict[str, typing.Any] = default_thing_execution_context
                            ) -> str:
        """
        send message to server. 

        client's message to server:
        ::
            [address, bytes(), client type, message type, messsage id, 
            [   0   ,   1    ,     2      ,      3      ,      4     , 

            server execution context, operation, arguments, thing execution context] 
                5                   ,      6   ,     7    ,       8                ]

        Server Execution Context Definitions (typing.Dict[str, typing.Any] or JSON):
            - "invokation_timeout" - time in seconds to wait for server to start executing the operation
            - "execution_timeout" - time in seconds to wait for server to complete the operation
            - "oneway" - if True, server will not send a response back

        Thing Execution Context Definitions (typing.Dict[str, typing.Any] or JSON):
            - "fetch_execution_logs" - fetches logs that were accumulated while execution

        Parameters
        ----------
        operation: str
            unique str identifying a server side or ``Thing`` resource. These values corresponding 
            to automatically extracted name from the object name or the URL_path prepended with the instance name. 
        arguments: Dict[str, Any]
            if the operation invokes a method, arguments of that method. 
        server_execution_context: Dict[str, Any]
            see execution context definitions
        thing_execution_context: Dict[str, Any]
            see execution context definitions

        Returns
        -------
        message id: bytes
            a byte representation of message id
        """
        request_message = RequestMessage.craft_from_arguments(
                                                        receiver_id=self.server_id,
                                                        sender_id=self.id,
                                                        thing_id=thing_id, 
                                                        objekt=objekt, 
                                                        operation=operation, 
                                                        payload=payload,
                                                        preserialized_payload=preserialized_payload,
                                                        server_execution_context=server_execution_context, 
                                                        thing_execution_context=thing_execution_context
                                                    ) 
        await self.socket.send_multipart(request_message.byte_array)
        self.logger.debug(f"sent operation '{operation}' to server '{self.id}' with msg-id '{request_message.id}'")
        return request_message.id

    async def async_recv_response(self, message_id: str) -> typing.List[ResponseMessage]:
        """
        Receives response from server. Messages are identified by message id, so call this method immediately after 
        calling ``send_request()`` to avoid receiving messages out of order. Or, use other methods like
        ``execute()``.

        Parameters
        ----------
        message_id: bytes
            message id of the message sent to server
        timeout: int
            time in milliseconds to wait for response
        raise_client_side_exception: bool, default False
            if True, any exceptions raised during execution inside ``Thing`` instance will be raised on the client.
            See docs of ``raise_local_exception()`` for info on exception 
        deserialize_response: bool
            deserializes the data field of the message
        """
        while True:
            if message_id in self._response_cache:
                return self._response_cache.pop(message_id)
            sockets = await self.poller.poll(self._poll_timeout)
            response_message = None
            for socket, _ in sockets:
                try:    
                    raw_message = await socket.recv_multipart(zmq.NOBLOCK)
                    response_message = ResponseMessage(raw_message)
                except zmq.Again:
                    pass 
            if response_message: 
                if message_id != response_message.id:
                    self._response_cache[response_message.id] = response_message
                    self.logger.debug("cached response with msg-id {}".format(response_message.id))
                else:
                    self.logger.debug(f"received response with msg-id {response_message.id}")
                    return response_message



    async def async_execute(self, 
                        thing_id: str, 
                        objekt: str, 
                        operation: str, 
                        payload: SerializableData = SerializableNone,
                        preserialized_payload: PreserializedData = PreserializedEmptyByte,
                        server_execution_context: ServerExecutionContext = default_server_execution_context, 
                        thing_execution_context: ThingExecutionContext = default_thing_execution_context
                    ) -> ResponseMessage: 
        """
        send an operation and receive the response for it. 

        Parameters
        ----------
        operation: str
            unique str identifying a server side or ``Thing`` resource. These values corresponding 
            to automatically extracted name from the object name or the URL_path prepended with the instance name. 
        arguments: Dict[str, Any]
            if the operation invokes a method, arguments of that method. 
        server_execution_context: Dict[str, Any]
            see execution context definitions
        thing_execution_context: Dict[str, Any]
            see execution context definitions
        raise_client_side_exception: bool
            if True, any exceptions raised during execution inside ``Thing`` instance will be raised on the client.
        deserialize_response: bool
            deserializes the data field of the message        

        Returns
        -------
        message id: bytes
            a byte representation of message id
        """

        message_id = await self.async_send_request(
                                        thing_id=thing_id, 
                                        objekt=objekt, 
                                        operation=operation, 
                                        payload=payload,
                                        preserialized_payload=preserialized_payload,
                                        server_execution_context=server_execution_context, 
                                        thing_execution_context=thing_execution_context
                                    )
        return await self.async_recv_response(message_id)

Functions

async_execute async

async_execute(thing_id: str, objekt: str, operation: str, payload: SerializableData = SerializableNone, preserialized_payload: PreserializedData = PreserializedEmptyByte, server_execution_context: ServerExecutionContext = default_server_execution_context, thing_execution_context: ThingExecutionContext = default_thing_execution_context) -> ResponseMessage

send an operation and receive the response for it.

Parameters:

Name Type Description Default

operation

str

unique str identifying a server side or Thing resource. These values corresponding to automatically extracted name from the object name or the URL_path prepended with the instance name.

required

arguments

if the operation invokes a method, arguments of that method.

required

server_execution_context

ServerExecutionContext

see execution context definitions

default_server_execution_context

thing_execution_context

ThingExecutionContext

see execution context definitions

default_thing_execution_context

raise_client_side_exception

if True, any exceptions raised during execution inside Thing instance will be raised on the client.

required

deserialize_response

deserializes the data field of the message

required

Returns:

Type Description
message id: bytes

a byte representation of message id

Source code in hololinked\core\zmq\brokers.py
async def async_execute(self, 
                    thing_id: str, 
                    objekt: str, 
                    operation: str, 
                    payload: SerializableData = SerializableNone,
                    preserialized_payload: PreserializedData = PreserializedEmptyByte,
                    server_execution_context: ServerExecutionContext = default_server_execution_context, 
                    thing_execution_context: ThingExecutionContext = default_thing_execution_context
                ) -> ResponseMessage: 
    """
    send an operation and receive the response for it. 

    Parameters
    ----------
    operation: str
        unique str identifying a server side or ``Thing`` resource. These values corresponding 
        to automatically extracted name from the object name or the URL_path prepended with the instance name. 
    arguments: Dict[str, Any]
        if the operation invokes a method, arguments of that method. 
    server_execution_context: Dict[str, Any]
        see execution context definitions
    thing_execution_context: Dict[str, Any]
        see execution context definitions
    raise_client_side_exception: bool
        if True, any exceptions raised during execution inside ``Thing`` instance will be raised on the client.
    deserialize_response: bool
        deserializes the data field of the message        

    Returns
    -------
    message id: bytes
        a byte representation of message id
    """

    message_id = await self.async_send_request(
                                    thing_id=thing_id, 
                                    objekt=objekt, 
                                    operation=operation, 
                                    payload=payload,
                                    preserialized_payload=preserialized_payload,
                                    server_execution_context=server_execution_context, 
                                    thing_execution_context=thing_execution_context
                                )
    return await self.async_recv_response(message_id)

async_recv_response async

async_recv_response(message_id: str) -> typing.List[ResponseMessage]

Receives response from server. Messages are identified by message id, so call this method immediately after calling send_request() to avoid receiving messages out of order. Or, use other methods like execute().

Parameters:

Name Type Description Default

message_id

str

message id of the message sent to server

required

timeout

time in milliseconds to wait for response

required

raise_client_side_exception

if True, any exceptions raised during execution inside Thing instance will be raised on the client. See docs of raise_local_exception() for info on exception

required

deserialize_response

deserializes the data field of the message

required
Source code in hololinked\core\zmq\brokers.py
async def async_recv_response(self, message_id: str) -> typing.List[ResponseMessage]:
    """
    Receives response from server. Messages are identified by message id, so call this method immediately after 
    calling ``send_request()`` to avoid receiving messages out of order. Or, use other methods like
    ``execute()``.

    Parameters
    ----------
    message_id: bytes
        message id of the message sent to server
    timeout: int
        time in milliseconds to wait for response
    raise_client_side_exception: bool, default False
        if True, any exceptions raised during execution inside ``Thing`` instance will be raised on the client.
        See docs of ``raise_local_exception()`` for info on exception 
    deserialize_response: bool
        deserializes the data field of the message
    """
    while True:
        if message_id in self._response_cache:
            return self._response_cache.pop(message_id)
        sockets = await self.poller.poll(self._poll_timeout)
        response_message = None
        for socket, _ in sockets:
            try:    
                raw_message = await socket.recv_multipart(zmq.NOBLOCK)
                response_message = ResponseMessage(raw_message)
            except zmq.Again:
                pass 
        if response_message: 
            if message_id != response_message.id:
                self._response_cache[response_message.id] = response_message
                self.logger.debug("cached response with msg-id {}".format(response_message.id))
            else:
                self.logger.debug(f"received response with msg-id {response_message.id}")
                return response_message

async_send_request async

async_send_request(thing_id: str, objekt: str, operation: str, payload: SerializableData = SerializableNone, preserialized_payload: PreserializedData = PreserializedEmptyByte, server_execution_context: ServerExecutionContext = default_server_execution_context, thing_execution_context: typing.Dict[str, typing.Any] = default_thing_execution_context) -> str

send message to server.

client's message to server: :: [address, bytes(), client type, message type, messsage id, [ 0 , 1 , 2 , 3 , 4 ,

server execution context, operation, arguments, thing execution context] 
    5                   ,      6   ,     7    ,       8                ]

Server Execution Context Definitions (typing.Dict[str, typing.Any] or JSON): - "invokation_timeout" - time in seconds to wait for server to start executing the operation - "execution_timeout" - time in seconds to wait for server to complete the operation - "oneway" - if True, server will not send a response back

Thing Execution Context Definitions (typing.Dict[str, typing.Any] or JSON): - "fetch_execution_logs" - fetches logs that were accumulated while execution

Parameters:

Name Type Description Default

operation

str

unique str identifying a server side or Thing resource. These values corresponding to automatically extracted name from the object name or the URL_path prepended with the instance name.

required

arguments

if the operation invokes a method, arguments of that method.

required

server_execution_context

ServerExecutionContext

see execution context definitions

default_server_execution_context

thing_execution_context

Dict[str, Any]

see execution context definitions

default_thing_execution_context

Returns:

Type Description
message id: bytes

a byte representation of message id

Source code in hololinked\core\zmq\brokers.py
async def async_send_request(self, 
                            thing_id: str, 
                            objekt: str, 
                            operation: str, 
                            payload: SerializableData = SerializableNone,
                            preserialized_payload: PreserializedData = PreserializedEmptyByte,
                            server_execution_context: ServerExecutionContext = default_server_execution_context,
                            thing_execution_context: typing.Dict[str, typing.Any] = default_thing_execution_context
                        ) -> str:
    """
    send message to server. 

    client's message to server:
    ::
        [address, bytes(), client type, message type, messsage id, 
        [   0   ,   1    ,     2      ,      3      ,      4     , 

        server execution context, operation, arguments, thing execution context] 
            5                   ,      6   ,     7    ,       8                ]

    Server Execution Context Definitions (typing.Dict[str, typing.Any] or JSON):
        - "invokation_timeout" - time in seconds to wait for server to start executing the operation
        - "execution_timeout" - time in seconds to wait for server to complete the operation
        - "oneway" - if True, server will not send a response back

    Thing Execution Context Definitions (typing.Dict[str, typing.Any] or JSON):
        - "fetch_execution_logs" - fetches logs that were accumulated while execution

    Parameters
    ----------
    operation: str
        unique str identifying a server side or ``Thing`` resource. These values corresponding 
        to automatically extracted name from the object name or the URL_path prepended with the instance name. 
    arguments: Dict[str, Any]
        if the operation invokes a method, arguments of that method. 
    server_execution_context: Dict[str, Any]
        see execution context definitions
    thing_execution_context: Dict[str, Any]
        see execution context definitions

    Returns
    -------
    message id: bytes
        a byte representation of message id
    """
    request_message = RequestMessage.craft_from_arguments(
                                                    receiver_id=self.server_id,
                                                    sender_id=self.id,
                                                    thing_id=thing_id, 
                                                    objekt=objekt, 
                                                    operation=operation, 
                                                    payload=payload,
                                                    preserialized_payload=preserialized_payload,
                                                    server_execution_context=server_execution_context, 
                                                    thing_execution_context=thing_execution_context
                                                ) 
    await self.socket.send_multipart(request_message.byte_array)
    self.logger.debug(f"sent operation '{operation}' to server '{self.id}' with msg-id '{request_message.id}'")
    return request_message.id

handshake

handshake(timeout: int | None = 60000) -> None

automatically called when handshake argument at init is True. When not automatically called, it is necessary to call this method before awaiting handshake_complete().

Source code in hololinked\core\zmq\brokers.py
def handshake(self, timeout: int | None = 60000) -> None:
    """
    automatically called when handshake argument at init is True. When not automatically called, it is necessary
    to call this method before awaiting ``handshake_complete()``.
    """
    run_callable_somehow(self._handshake(timeout))

handshake_complete async

handshake_complete()

wait for handshake to complete

Source code in hololinked\core\zmq\brokers.py
async def handshake_complete(self):
    """
    wait for handshake to complete
    """
    await self._handshake_event.wait()