Skip to content

hololinked.core.zmq.brokers.AsyncZMQClient

Bases: BaseZMQClient, BaseAsyncZMQ

Async ZMQ client (asyncio compatible) that connect with a ZMQ server. Implements REQ-REP pattern.

Source code in hololinked/hololinked/core/zmq/brokers.py
class AsyncZMQClient(BaseZMQClient, BaseAsyncZMQ):
    """Async ZMQ client (asyncio compatible) that connect with a ZMQ server. Implements REQ-REP pattern."""

    def __init__(
        self,
        id: str,
        server_id: str,
        context: zmq.asyncio.Context | None = None,
        access_point: str = ZMQ_TRANSPORTS.IPC,
        handshake: bool = True,
        **kwargs,
    ) -> None:
        """
        Parameters
        ----------
        id: str
            Unique id of the client to receive messages from the server. Each client connecting to same server must
            still have unique ID.
        server_id: str
            The server id to connect to
        context: Optional, zmq.Context
            ZeroMQ Context object to use. If None, a global context is used.
        access_point: Enum | str, default ZMQ_TRANSPORTS.IPC
            Use `TCP` or `tcp://<host>:<port>` for network access, `IPC` for multi-process applications,
            and `INPROC` for multi-threaded applications.
        handshake: bool
            When true, handshake with the server first before receiving any other message and block until that handshake was
            accomplished, highly recommended.
        kwargs: dict[str, Any]
            Additional arguments:

            - `poll_timeout`: `int`. The timeout for polling the socket (in milliseconds)
            - `handshake_timeout`: `int`. The timeout for the handshake process (in milliseconds) to complete.
                If handshake does not complete within this time, an exception is raised. Only relevant if `handshake` is True.
            - `logger`: `logging.Logger`. logger instance to use. If None, a default logger is created.
        """
        super().__init__(id=id, server_id=server_id, **kwargs)
        self.create_socket(
            server_id=server_id,
            socket_id=id,
            node_type="client",
            context=context,
            access_point=access_point,
            **kwargs,
        )
        self._monitor_socket = self.socket.get_monitor_socket()
        self.poller = zmq.asyncio.Poller()
        self.poller.register(self.socket, zmq.POLLIN)
        self.poller.register(self._monitor_socket, zmq.POLLIN)
        self._poller_lock = asyncio.Lock()
        self._handshake_event = asyncio.Event()
        self._handshake_event.clear()
        if handshake:
            self.handshake(kwargs.pop("handshake_timeout", 60000))

    def handshake(self, timeout: int | None = 60000) -> None:
        """
        schedules a handshake coroutine in the running event loop
        or completes handshake synchronously if no event loop is running.
        Use `handshake_complete()` async method to check if handshake is complete.

        Parameters
        ----------
        timeout: float | int
            timeout in milliseconds to wait for handshake to complete. If handshake does not complete within this time,
            a `ConnectionError` is raised. If None, wait indefinitely until handshake completes.
        """
        run_callable_somehow(self._handshake(timeout))

    async def _handshake(self, timeout: float | int | None = 60000) -> None:
        """handshake with server before sending first message"""
        self._stop = False
        if self._monitor_socket is not None and self._monitor_socket in self.poller:
            self.poller.unregister(self._monitor_socket)
        self._handshake_event.clear()
        start_time = time.time_ns()
        while not self._stop:
            if timeout is not None and (time.time_ns() - start_time) / 1e6 > timeout:
                raise ConnectionError(f"Unable to contact server '{self.server_id}' from client '{self.id}'")
            handshake_message = RequestMessage.craft_with_message_type(self.id, self.server_id, HANDSHAKE)
            await self.socket.send_multipart(handshake_message.byte_array)
            self.logger.info("sent Handshake to server")
            if await self.poller.poll(500):
                try:
                    raw_message = await self.socket.recv_multipart(zmq.NOBLOCK)
                    response_message = ResponseMessage(raw_message)
                except zmq.Again:
                    pass
                else:
                    if response_message.type == HANDSHAKE:  # type: ignore
                        self.logger.info("client handshook with server")
                        break
                    elif self.handled_default_message_types(response_message):
                        continue
                    else:
                        self._response_cache[response_message.id] = response_message
            else:
                self.logger.info("got no response for handshake")
        self.poller.register(self._monitor_socket, zmq.POLLIN)
        self._handshake_event.set()

    async def handshake_complete(self, timeout: float | int = 60000) -> None:
        """
        wait for handshake to complete

        Parameters
        ----------
        timeout: float | int
            timeout in milliseconds to wait for handshake to complete. If handshake does not complete within this time,
            a `TimeoutError` is raised.
        """
        await asyncio.wait_for(self._handshake_event.wait(), int(timeout / 1000) if timeout else None)
        if not self._handshake_event.is_set():
            raise TimeoutError(f"Handshake with server timed out after {timeout} ms")

    async def async_send_request(
        self,
        thing_id: str,
        objekt: str,
        operation: str,
        payload: SerializableData = SerializableNone,
        preserialized_payload: PreserializedData = PreserializedEmptyByte,
        server_execution_context: ServerExecutionContext = default_server_execution_context,
        thing_execution_context: dict[str, Any] = default_thing_execution_context,
    ) -> str:
        """
        send request message to server.

        Parameters
        ----------
        thing_id: str
            `id` of the `Thing` on which an operation is to be performed
        objekt: str
            name of property, action or event (usually only property or action)
        operation: str
            operation to be performed, like `readproperty`, `writeproperty`, `invokeaction` etc.
        payload: SerializableData
            serializable data to be sent as payload
        preserialized_payload: PreserializedData
            pre-encoded data to be sent as payload, generally used for large or custom data that is already serialized
        server_execution_context: dict[str, Any]
            Specify server level execution context like `invokationTimeout`, `executionTimeout`, `oneway` operation etc.
        thing_execution_context: dict[str, Any]
            Specify thing level execution context like `fetchExecutionLogs` etc.

        Returns
        -------
        bytes
            a message id in bytes
        """
        request_message = RequestMessage.craft_from_arguments(
            receiver_id=self.server_id,
            sender_id=self.id,
            thing_id=thing_id,
            objekt=objekt,
            operation=operation,
            payload=payload,
            preserialized_payload=preserialized_payload,
            server_execution_context=server_execution_context,
            thing_execution_context=thing_execution_context,
        )
        await self.socket.send_multipart(request_message.byte_array)
        self.logger.debug(
            "sent message to server",
            msg_id=request_message.id,
            message_type=request_message.type,
            thing_id=thing_id,
            operation=operation,
            objekt=objekt,
        )
        return request_message.id

    async def async_recv_response(self, message_id: str) -> list[ResponseMessage]:
        """
        Receives response from server. Messages are identified by message id, and out of order messages are sent to
        a cache which may be popped later. This method blocks until the expected message is received or `stop_polling()`
        is called from another thread.

        Parameters
        ----------
        message_id: bytes
            the message id of the expected response message
        """
        self._stop = False
        while not self._stop:
            if message_id in self._response_cache:
                return self._response_cache.pop(message_id)
            try:
                try:
                    await asyncio.wait_for(
                        self._poller_lock.acquire(),
                        timeout=self.poll_timeout / 1000 if self.poll_timeout else None,
                    )
                except TimeoutError:
                    continue
                sockets = await self.poller.poll(self._poll_timeout)
                response_message = None
                for socket, _ in sockets:
                    try:
                        raw_message = await socket.recv_multipart(zmq.NOBLOCK)
                        response_message = ResponseMessage(raw_message)
                    except zmq.Again:
                        continue
                    if response_message:
                        if self.handled_default_message_types(response_message):
                            continue
                        if message_id != response_message.id:
                            self._response_cache[response_message.id] = response_message
                            self.logger.debug(
                                "cached response as it does not corresponding to expected ID",
                                msg_id=response_message.id,
                                expected_msg_id=message_id,
                            )
                        else:
                            self.logger.debug(
                                "received response",
                                msg_id=response_message.id,
                                message_type=response_message.type,
                            )
                            return response_message
            finally:
                try:
                    self._poller_lock.release()
                except Exception as ex:
                    self.logger.warning(f"could not release poller lock for async_recv_response - {str(ex)}")

    async def async_execute(
        self,
        thing_id: str,
        objekt: str,
        operation: str,
        payload: SerializableData = SerializableNone,
        preserialized_payload: PreserializedData = PreserializedEmptyByte,
        server_execution_context: ServerExecutionContext = default_server_execution_context,
        thing_execution_context: ThingExecutionContext = default_thing_execution_context,
    ) -> ResponseMessage:
        """
        send an operation and receive the response for it.

        Parameters
        ----------
        thing_id: str
            `id` of the `Thing` on which an operation is to be performed
        objekt: str
            name of property, action or event (usually only property or action)
        operation: str
            operation to be performed, like `readproperty`, `writeproperty`, `invokeaction` etc.
        payload: SerializableData
            serializable data to be sent as payload
        preserialized_payload: PreserializedData
            pre-encoded data to be sent as payload, generally used for large or custom data that is already serialized
        server_execution_context: dict[str, Any]
            Specify server level execution context like `invokationTimeout`, `executionTimeout`, `oneway` operation etc.
        thing_execution_context: dict[str, Any]
            Specify thing level execution context like `fetchExecutionLogs` etc.

        Returns
        -------
        ResponseMessage
            response message from server after completing the operation
        """
        message_id = await self.async_send_request(
            thing_id=thing_id,
            objekt=objekt,
            operation=operation,
            payload=payload,
            preserialized_payload=preserialized_payload,
            server_execution_context=server_execution_context,
            thing_execution_context=thing_execution_context,
        )
        return await self.async_recv_response(message_id)

Functions

__init__

__init__(id: str, server_id: str, context: Context | None = None, access_point: str = ZMQ_TRANSPORTS.IPC, handshake: bool = True, **kwargs) -> None

Parameters:

Name Type Description Default

id

str

Unique id of the client to receive messages from the server. Each client connecting to same server must still have unique ID.

required

server_id

str

The server id to connect to

required

context

Context | None

ZeroMQ Context object to use. If None, a global context is used.

None

access_point

str

Use TCP or tcp://<host>:<port> for network access, IPC for multi-process applications, and INPROC for multi-threaded applications.

IPC

handshake

bool

When true, handshake with the server first before receiving any other message and block until that handshake was accomplished, highly recommended.

True

kwargs

Additional arguments:

  • poll_timeout: int. The timeout for polling the socket (in milliseconds)
  • handshake_timeout: int. The timeout for the handshake process (in milliseconds) to complete. If handshake does not complete within this time, an exception is raised. Only relevant if handshake is True.
  • logger: logging.Logger. logger instance to use. If None, a default logger is created.
{}
Source code in hololinked/hololinked/core/zmq/brokers.py
def __init__(
    self,
    id: str,
    server_id: str,
    context: zmq.asyncio.Context | None = None,
    access_point: str = ZMQ_TRANSPORTS.IPC,
    handshake: bool = True,
    **kwargs,
) -> None:
    """
    Parameters
    ----------
    id: str
        Unique id of the client to receive messages from the server. Each client connecting to same server must
        still have unique ID.
    server_id: str
        The server id to connect to
    context: Optional, zmq.Context
        ZeroMQ Context object to use. If None, a global context is used.
    access_point: Enum | str, default ZMQ_TRANSPORTS.IPC
        Use `TCP` or `tcp://<host>:<port>` for network access, `IPC` for multi-process applications,
        and `INPROC` for multi-threaded applications.
    handshake: bool
        When true, handshake with the server first before receiving any other message and block until that handshake was
        accomplished, highly recommended.
    kwargs: dict[str, Any]
        Additional arguments:

        - `poll_timeout`: `int`. The timeout for polling the socket (in milliseconds)
        - `handshake_timeout`: `int`. The timeout for the handshake process (in milliseconds) to complete.
            If handshake does not complete within this time, an exception is raised. Only relevant if `handshake` is True.
        - `logger`: `logging.Logger`. logger instance to use. If None, a default logger is created.
    """
    super().__init__(id=id, server_id=server_id, **kwargs)
    self.create_socket(
        server_id=server_id,
        socket_id=id,
        node_type="client",
        context=context,
        access_point=access_point,
        **kwargs,
    )
    self._monitor_socket = self.socket.get_monitor_socket()
    self.poller = zmq.asyncio.Poller()
    self.poller.register(self.socket, zmq.POLLIN)
    self.poller.register(self._monitor_socket, zmq.POLLIN)
    self._poller_lock = asyncio.Lock()
    self._handshake_event = asyncio.Event()
    self._handshake_event.clear()
    if handshake:
        self.handshake(kwargs.pop("handshake_timeout", 60000))

async_execute async

async_execute(thing_id: str, objekt: str, operation: str, payload: SerializableData = SerializableNone, preserialized_payload: PreserializedData = PreserializedEmptyByte, server_execution_context: ServerExecutionContext = default_server_execution_context, thing_execution_context: ThingExecutionContext = default_thing_execution_context) -> ResponseMessage

send an operation and receive the response for it.

Parameters:

Name Type Description Default

thing_id

str

id of the Thing on which an operation is to be performed

required

objekt

str

name of property, action or event (usually only property or action)

required

operation

str

operation to be performed, like readproperty, writeproperty, invokeaction etc.

required

payload

SerializableData

serializable data to be sent as payload

SerializableNone

preserialized_payload

PreserializedData

pre-encoded data to be sent as payload, generally used for large or custom data that is already serialized

PreserializedEmptyByte

server_execution_context

ServerExecutionContext

Specify server level execution context like invokationTimeout, executionTimeout, oneway operation etc.

default_server_execution_context

thing_execution_context

ThingExecutionContext

Specify thing level execution context like fetchExecutionLogs etc.

default_thing_execution_context

Returns:

Type Description
ResponseMessage

response message from server after completing the operation

Source code in hololinked/hololinked/core/zmq/brokers.py
async def async_execute(
    self,
    thing_id: str,
    objekt: str,
    operation: str,
    payload: SerializableData = SerializableNone,
    preserialized_payload: PreserializedData = PreserializedEmptyByte,
    server_execution_context: ServerExecutionContext = default_server_execution_context,
    thing_execution_context: ThingExecutionContext = default_thing_execution_context,
) -> ResponseMessage:
    """
    send an operation and receive the response for it.

    Parameters
    ----------
    thing_id: str
        `id` of the `Thing` on which an operation is to be performed
    objekt: str
        name of property, action or event (usually only property or action)
    operation: str
        operation to be performed, like `readproperty`, `writeproperty`, `invokeaction` etc.
    payload: SerializableData
        serializable data to be sent as payload
    preserialized_payload: PreserializedData
        pre-encoded data to be sent as payload, generally used for large or custom data that is already serialized
    server_execution_context: dict[str, Any]
        Specify server level execution context like `invokationTimeout`, `executionTimeout`, `oneway` operation etc.
    thing_execution_context: dict[str, Any]
        Specify thing level execution context like `fetchExecutionLogs` etc.

    Returns
    -------
    ResponseMessage
        response message from server after completing the operation
    """
    message_id = await self.async_send_request(
        thing_id=thing_id,
        objekt=objekt,
        operation=operation,
        payload=payload,
        preserialized_payload=preserialized_payload,
        server_execution_context=server_execution_context,
        thing_execution_context=thing_execution_context,
    )
    return await self.async_recv_response(message_id)

async_recv_response async

async_recv_response(message_id: str) -> list[ResponseMessage]

Receives response from server. Messages are identified by message id, and out of order messages are sent to a cache which may be popped later. This method blocks until the expected message is received or stop_polling() is called from another thread.

Parameters:

Name Type Description Default

message_id

str

the message id of the expected response message

required
Source code in hololinked/hololinked/core/zmq/brokers.py
async def async_recv_response(self, message_id: str) -> list[ResponseMessage]:
    """
    Receives response from server. Messages are identified by message id, and out of order messages are sent to
    a cache which may be popped later. This method blocks until the expected message is received or `stop_polling()`
    is called from another thread.

    Parameters
    ----------
    message_id: bytes
        the message id of the expected response message
    """
    self._stop = False
    while not self._stop:
        if message_id in self._response_cache:
            return self._response_cache.pop(message_id)
        try:
            try:
                await asyncio.wait_for(
                    self._poller_lock.acquire(),
                    timeout=self.poll_timeout / 1000 if self.poll_timeout else None,
                )
            except TimeoutError:
                continue
            sockets = await self.poller.poll(self._poll_timeout)
            response_message = None
            for socket, _ in sockets:
                try:
                    raw_message = await socket.recv_multipart(zmq.NOBLOCK)
                    response_message = ResponseMessage(raw_message)
                except zmq.Again:
                    continue
                if response_message:
                    if self.handled_default_message_types(response_message):
                        continue
                    if message_id != response_message.id:
                        self._response_cache[response_message.id] = response_message
                        self.logger.debug(
                            "cached response as it does not corresponding to expected ID",
                            msg_id=response_message.id,
                            expected_msg_id=message_id,
                        )
                    else:
                        self.logger.debug(
                            "received response",
                            msg_id=response_message.id,
                            message_type=response_message.type,
                        )
                        return response_message
        finally:
            try:
                self._poller_lock.release()
            except Exception as ex:
                self.logger.warning(f"could not release poller lock for async_recv_response - {str(ex)}")

async_send_request async

async_send_request(thing_id: str, objekt: str, operation: str, payload: SerializableData = SerializableNone, preserialized_payload: PreserializedData = PreserializedEmptyByte, server_execution_context: ServerExecutionContext = default_server_execution_context, thing_execution_context: dict[str, Any] = default_thing_execution_context) -> str

send request message to server.

Parameters:

Name Type Description Default

thing_id

str

id of the Thing on which an operation is to be performed

required

objekt

str

name of property, action or event (usually only property or action)

required

operation

str

operation to be performed, like readproperty, writeproperty, invokeaction etc.

required

payload

SerializableData

serializable data to be sent as payload

SerializableNone

preserialized_payload

PreserializedData

pre-encoded data to be sent as payload, generally used for large or custom data that is already serialized

PreserializedEmptyByte

server_execution_context

ServerExecutionContext

Specify server level execution context like invokationTimeout, executionTimeout, oneway operation etc.

default_server_execution_context

thing_execution_context

dict[str, Any]

Specify thing level execution context like fetchExecutionLogs etc.

default_thing_execution_context

Returns:

Type Description
bytes

a message id in bytes

Source code in hololinked/hololinked/core/zmq/brokers.py
async def async_send_request(
    self,
    thing_id: str,
    objekt: str,
    operation: str,
    payload: SerializableData = SerializableNone,
    preserialized_payload: PreserializedData = PreserializedEmptyByte,
    server_execution_context: ServerExecutionContext = default_server_execution_context,
    thing_execution_context: dict[str, Any] = default_thing_execution_context,
) -> str:
    """
    send request message to server.

    Parameters
    ----------
    thing_id: str
        `id` of the `Thing` on which an operation is to be performed
    objekt: str
        name of property, action or event (usually only property or action)
    operation: str
        operation to be performed, like `readproperty`, `writeproperty`, `invokeaction` etc.
    payload: SerializableData
        serializable data to be sent as payload
    preserialized_payload: PreserializedData
        pre-encoded data to be sent as payload, generally used for large or custom data that is already serialized
    server_execution_context: dict[str, Any]
        Specify server level execution context like `invokationTimeout`, `executionTimeout`, `oneway` operation etc.
    thing_execution_context: dict[str, Any]
        Specify thing level execution context like `fetchExecutionLogs` etc.

    Returns
    -------
    bytes
        a message id in bytes
    """
    request_message = RequestMessage.craft_from_arguments(
        receiver_id=self.server_id,
        sender_id=self.id,
        thing_id=thing_id,
        objekt=objekt,
        operation=operation,
        payload=payload,
        preserialized_payload=preserialized_payload,
        server_execution_context=server_execution_context,
        thing_execution_context=thing_execution_context,
    )
    await self.socket.send_multipart(request_message.byte_array)
    self.logger.debug(
        "sent message to server",
        msg_id=request_message.id,
        message_type=request_message.type,
        thing_id=thing_id,
        operation=operation,
        objekt=objekt,
    )
    return request_message.id

handshake

handshake(timeout: int | None = 60000) -> None

schedules a handshake coroutine in the running event loop or completes handshake synchronously if no event loop is running. Use handshake_complete() async method to check if handshake is complete.

Parameters:

Name Type Description Default

timeout

int | None

timeout in milliseconds to wait for handshake to complete. If handshake does not complete within this time, a ConnectionError is raised. If None, wait indefinitely until handshake completes.

60000
Source code in hololinked/hololinked/core/zmq/brokers.py
def handshake(self, timeout: int | None = 60000) -> None:
    """
    schedules a handshake coroutine in the running event loop
    or completes handshake synchronously if no event loop is running.
    Use `handshake_complete()` async method to check if handshake is complete.

    Parameters
    ----------
    timeout: float | int
        timeout in milliseconds to wait for handshake to complete. If handshake does not complete within this time,
        a `ConnectionError` is raised. If None, wait indefinitely until handshake completes.
    """
    run_callable_somehow(self._handshake(timeout))

handshake_complete async

handshake_complete(timeout: float | int = 60000) -> None

wait for handshake to complete

Parameters:

Name Type Description Default

timeout

float | int

timeout in milliseconds to wait for handshake to complete. If handshake does not complete within this time, a TimeoutError is raised.

60000
Source code in hololinked/hololinked/core/zmq/brokers.py
async def handshake_complete(self, timeout: float | int = 60000) -> None:
    """
    wait for handshake to complete

    Parameters
    ----------
    timeout: float | int
        timeout in milliseconds to wait for handshake to complete. If handshake does not complete within this time,
        a `TimeoutError` is raised.
    """
    await asyncio.wait_for(self._handshake_event.wait(), int(timeout / 1000) if timeout else None)
    if not self._handshake_event.is_set():
        raise TimeoutError(f"Handshake with server timed out after {timeout} ms")