Skip to content

hololinked.core.zmq.brokers.SyncZMQClient

Bases: BaseZMQClient, BaseSyncZMQ

Sync ZMQ client (as-in not asyncio compatible) that connect with a ZMQ server. Implements REQ-REP pattern.

Source code in hololinked/hololinked/core/zmq/brokers.py
class SyncZMQClient(BaseZMQClient, BaseSyncZMQ):
    """Sync ZMQ client (as-in not asyncio compatible) that connect with a ZMQ server. Implements REQ-REP pattern."""

    def __init__(
        self,
        id: str,
        server_id: str,
        context: zmq.Context | None = None,
        access_point: str = ZMQ_TRANSPORTS.IPC,
        handshake: bool = True,
        **kwargs,
    ) -> None:
        """
        Parameters
        ----------
        id: str
            Unique id of the client to receive messages from the server. Each client connecting to same server must
            still have unique ID.
        server_id: str
            The server id to connect to
        context: Optional, zmq.Context
            ZeroMQ Context object to use. If None, a global context is used.
        access_point: Enum | str, default ZMQ_TRANSPORTS.IPC
            Use `TCP` or `tcp://<host>:<port>` for network access, `IPC` for multi-process applications,
            and `INPROC` for multi-threaded applications.
        handshake: bool
            When true, handshake with the server first before receiving any other message and block until that handshake was
            accomplished, highly recommended.
        kwargs: dict[str, Any]
            Additional arguments:

            - `poll_timeout`: `int`. The timeout for polling the socket (in milliseconds)
            - `handshake_timeout`: `int`. The timeout for the handshake process (in milliseconds) to complete.
                If handshake does not complete within this time, an exception is raised. Only relevant if `handshake` is True.
            - `logger`: `logging.Logger`. logger instance to use. If None, a default logger is created.
        """
        super().__init__(id=id, server_id=server_id, **kwargs)
        self.create_socket(
            server_id=server_id,
            socket_id=id,
            node_type="client",
            context=context,
            access_point=access_point,
            **kwargs,
        )
        self.poller = zmq.Poller()
        self.poller.register(self.socket, zmq.POLLIN)
        self._poller_lock = threading.Lock()
        if handshake:
            self.handshake(kwargs.pop("handshake_timeout", 60000))

    def send_request(
        self,
        thing_id: str,
        objekt: str,
        operation: str,
        payload: SerializableData = SerializableNone,
        preserialized_payload: PreserializedData = PreserializedEmptyByte,
        server_execution_context: ServerExecutionContext = default_server_execution_context,
        thing_execution_context: ThingExecutionContext = default_thing_execution_context,
    ) -> bytes:
        """
        send request message to server.

        Parameters
        ----------
        thing_id: str
            `id` of the `Thing` on which an operation is to be performed
        objekt: str
            name of property, action or event (usually only property or action)
        operation: str
            operation to be performed, like `readproperty`, `writeproperty`, `invokeaction` etc.
        payload: SerializableData
            serializable data to be sent as payload
        preserialized_payload: PreserializedData
            pre-encoded data to be sent as payload, generally used for large or custom data that is already serialized
        server_execution_context: dict[str, Any]
            Specify server level execution context like `invokationTimeout`, `executionTimeout`, `oneway` operation etc.
        thing_execution_context: dict[str, Any]
            Specify thing level execution context like `fetchExecutionLogs` etc.

        Returns
        -------
        bytes
            a message id in bytes
        """
        request_message = RequestMessage.craft_from_arguments(
            receiver_id=self.server_id,
            sender_id=self.id,
            thing_id=thing_id,
            objekt=objekt,
            operation=operation,
            payload=payload,
            preserialized_payload=preserialized_payload,
            server_execution_context=server_execution_context,
            thing_execution_context=thing_execution_context,
        )
        self.socket.send_multipart(request_message.byte_array)
        self.logger.debug(
            "sent message to server",
            msg_id=request_message.id,
            message_type=request_message.type,
            thing_id=thing_id,
            operation=operation,
            objekt=objekt,
        )
        return request_message.id

    def recv_response(self, message_id: bytes) -> ResponseMessage:
        """
        Receives response from server. Messages are identified by message id, and out of order messages are sent to
        a cache which may be popped later. This method blocks until the expected message is received or `stop_polling()`
        is called from another thread.

        Parameters
        ----------
        message_id: bytes
            the message id of the expected response message
        """
        self._stop = False
        while not self._stop:
            if message_id in self._response_cache:
                return self._response_cache.pop(message_id)
            try:
                if not self._poller_lock.acquire(timeout=self.poll_timeout / 1000 if self.poll_timeout else -1):
                    continue
                sockets = self.poller.poll(self.poll_timeout)
                response_message = None  # type: ResponseMessage
                for socket, _ in sockets:
                    try:
                        raw_message = socket.recv_multipart(zmq.NOBLOCK)
                        response_message = ResponseMessage(raw_message)
                    except zmq.Again:
                        pass
                    if response_message:
                        if self.handled_default_message_types(response_message):
                            continue
                        if message_id != response_message.id:
                            self._response_cache[response_message.id] = response_message
                            self.logger.debug(
                                "cached response as it does not corresponding to expected ID",
                                msg_id=response_message.id,
                                expected_msg_id=message_id,
                            )
                        else:
                            self.logger.debug(
                                "received response",
                                msg_id=response_message.id,
                                message_type=response_message.type,
                            )
                            return response_message
            finally:
                try:
                    self._poller_lock.release()
                except Exception as ex:  # noqa
                    # TODO log exception message and undo noqa
                    # 1. no need to release an unacquired lock, which can happen if another thread polling
                    # put the expected message in response message cache
                    # 2. also release the lock in every iteration because a message may be added in response cache
                    # and may not return the method, which means the loop will run again and the lock needs to reacquired
                    self.logger.warning(f"could not release poller lock for recv_response - {str(ex)}")

    def execute(
        self,
        thing_id: bytes,
        objekt: str,
        operation: str,
        payload: SerializableData = SerializableNone,
        preserialized_payload: PreserializedData = PreserializedEmptyByte,
        server_execution_context: ServerExecutionContext = default_server_execution_context,
        thing_execution_context: ThingExecutionContext = default_thing_execution_context,
    ) -> ResponseMessage:
        """
        send an operation and receive the response for it.

        Parameters
        ----------
        thing_id: str
            `id` of the `Thing` on which an operation is to be performed
        objekt: str
            name of property, action or event (usually only property or action)
        operation: str
            operation to be performed, like `readproperty`, `writeproperty`, `invokeaction` etc.
        payload: SerializableData
            serializable data to be sent as payload
        preserialized_payload: PreserializedData
            pre-encoded data to be sent as payload, generally used for large or custom data that is already serialized
        server_execution_context: dict[str, Any]
            Specify server level execution context like `invokationTimeout`, `executionTimeout`, `oneway` operation etc.
        thing_execution_context: dict[str, Any]
            Specify thing level execution context like `fetchExecutionLogs` etc.

        Returns
        -------
        ResponseMessage
            response message from server after completing the operation
        """
        message_id = self.send_request(
            thing_id=thing_id,
            objekt=objekt,
            operation=operation,
            payload=payload,
            preserialized_payload=preserialized_payload,
            server_execution_context=server_execution_context,
            thing_execution_context=thing_execution_context,
        )
        return self.recv_response(message_id=message_id)

    def handshake(self, timeout: float | int = 60000) -> None:
        """
        handshake with server before sending first message

        Parameters
        ----------
        timeout: float | int
            timeout in milliseconds to wait for handshake to complete. If handshake does not complete within this time,
            a `ConnectionError` is raised. If None, wait indefinitely until handshake completes.
        """
        self._stop = False
        start_time = time.time_ns()
        while not self._stop:
            if timeout is not None and (time.time_ns() - start_time) / 1e6 > timeout:
                raise ConnectionError(f"Unable to contact server '{self.server_id}' from client '{self.id}'")
            handshake_message = RequestMessage.craft_with_message_type(self.id, self.server_id, HANDSHAKE)
            self.socket.send_multipart(handshake_message.byte_array)
            self.logger.info("sent Handshake to server")
            if self.poller.poll(500):
                try:
                    raw_message = self.socket.recv_multipart(zmq.NOBLOCK)
                    response_message = ResponseMessage(raw_message)
                except zmq.Again:
                    pass
                else:
                    if response_message.type == HANDSHAKE:
                        self.logger.info("client handshook with server")
                        break
                    elif self.handled_default_message_types(response_message):
                        continue
                    else:
                        self._response_cache[response_message.id] = response_message
            else:
                self.logger.info("got no response for handshake")
        self._monitor_socket = self.socket.get_monitor_socket()
        self.poller.register(self._monitor_socket, zmq.POLLIN)

Functions

__init__

__init__(id: str, server_id: str, context: Context | None = None, access_point: str = ZMQ_TRANSPORTS.IPC, handshake: bool = True, **kwargs) -> None

Parameters:

Name Type Description Default

id

str

Unique id of the client to receive messages from the server. Each client connecting to same server must still have unique ID.

required

server_id

str

The server id to connect to

required

context

Context | None

ZeroMQ Context object to use. If None, a global context is used.

None

access_point

str

Use TCP or tcp://<host>:<port> for network access, IPC for multi-process applications, and INPROC for multi-threaded applications.

IPC

handshake

bool

When true, handshake with the server first before receiving any other message and block until that handshake was accomplished, highly recommended.

True

kwargs

Additional arguments:

  • poll_timeout: int. The timeout for polling the socket (in milliseconds)
  • handshake_timeout: int. The timeout for the handshake process (in milliseconds) to complete. If handshake does not complete within this time, an exception is raised. Only relevant if handshake is True.
  • logger: logging.Logger. logger instance to use. If None, a default logger is created.
{}
Source code in hololinked/hololinked/core/zmq/brokers.py
def __init__(
    self,
    id: str,
    server_id: str,
    context: zmq.Context | None = None,
    access_point: str = ZMQ_TRANSPORTS.IPC,
    handshake: bool = True,
    **kwargs,
) -> None:
    """
    Parameters
    ----------
    id: str
        Unique id of the client to receive messages from the server. Each client connecting to same server must
        still have unique ID.
    server_id: str
        The server id to connect to
    context: Optional, zmq.Context
        ZeroMQ Context object to use. If None, a global context is used.
    access_point: Enum | str, default ZMQ_TRANSPORTS.IPC
        Use `TCP` or `tcp://<host>:<port>` for network access, `IPC` for multi-process applications,
        and `INPROC` for multi-threaded applications.
    handshake: bool
        When true, handshake with the server first before receiving any other message and block until that handshake was
        accomplished, highly recommended.
    kwargs: dict[str, Any]
        Additional arguments:

        - `poll_timeout`: `int`. The timeout for polling the socket (in milliseconds)
        - `handshake_timeout`: `int`. The timeout for the handshake process (in milliseconds) to complete.
            If handshake does not complete within this time, an exception is raised. Only relevant if `handshake` is True.
        - `logger`: `logging.Logger`. logger instance to use. If None, a default logger is created.
    """
    super().__init__(id=id, server_id=server_id, **kwargs)
    self.create_socket(
        server_id=server_id,
        socket_id=id,
        node_type="client",
        context=context,
        access_point=access_point,
        **kwargs,
    )
    self.poller = zmq.Poller()
    self.poller.register(self.socket, zmq.POLLIN)
    self._poller_lock = threading.Lock()
    if handshake:
        self.handshake(kwargs.pop("handshake_timeout", 60000))

send_request

send_request(thing_id: str, objekt: str, operation: str, payload: SerializableData = SerializableNone, preserialized_payload: PreserializedData = PreserializedEmptyByte, server_execution_context: ServerExecutionContext = default_server_execution_context, thing_execution_context: ThingExecutionContext = default_thing_execution_context) -> bytes

send request message to server.

Parameters:

Name Type Description Default

thing_id

str

id of the Thing on which an operation is to be performed

required

objekt

str

name of property, action or event (usually only property or action)

required

operation

str

operation to be performed, like readproperty, writeproperty, invokeaction etc.

required

payload

SerializableData

serializable data to be sent as payload

SerializableNone

preserialized_payload

PreserializedData

pre-encoded data to be sent as payload, generally used for large or custom data that is already serialized

PreserializedEmptyByte

server_execution_context

ServerExecutionContext

Specify server level execution context like invokationTimeout, executionTimeout, oneway operation etc.

default_server_execution_context

thing_execution_context

ThingExecutionContext

Specify thing level execution context like fetchExecutionLogs etc.

default_thing_execution_context

Returns:

Type Description
bytes

a message id in bytes

Source code in hololinked/hololinked/core/zmq/brokers.py
def send_request(
    self,
    thing_id: str,
    objekt: str,
    operation: str,
    payload: SerializableData = SerializableNone,
    preserialized_payload: PreserializedData = PreserializedEmptyByte,
    server_execution_context: ServerExecutionContext = default_server_execution_context,
    thing_execution_context: ThingExecutionContext = default_thing_execution_context,
) -> bytes:
    """
    send request message to server.

    Parameters
    ----------
    thing_id: str
        `id` of the `Thing` on which an operation is to be performed
    objekt: str
        name of property, action or event (usually only property or action)
    operation: str
        operation to be performed, like `readproperty`, `writeproperty`, `invokeaction` etc.
    payload: SerializableData
        serializable data to be sent as payload
    preserialized_payload: PreserializedData
        pre-encoded data to be sent as payload, generally used for large or custom data that is already serialized
    server_execution_context: dict[str, Any]
        Specify server level execution context like `invokationTimeout`, `executionTimeout`, `oneway` operation etc.
    thing_execution_context: dict[str, Any]
        Specify thing level execution context like `fetchExecutionLogs` etc.

    Returns
    -------
    bytes
        a message id in bytes
    """
    request_message = RequestMessage.craft_from_arguments(
        receiver_id=self.server_id,
        sender_id=self.id,
        thing_id=thing_id,
        objekt=objekt,
        operation=operation,
        payload=payload,
        preserialized_payload=preserialized_payload,
        server_execution_context=server_execution_context,
        thing_execution_context=thing_execution_context,
    )
    self.socket.send_multipart(request_message.byte_array)
    self.logger.debug(
        "sent message to server",
        msg_id=request_message.id,
        message_type=request_message.type,
        thing_id=thing_id,
        operation=operation,
        objekt=objekt,
    )
    return request_message.id

recv_response

recv_response(message_id: bytes) -> ResponseMessage

Receives response from server. Messages are identified by message id, and out of order messages are sent to a cache which may be popped later. This method blocks until the expected message is received or stop_polling() is called from another thread.

Parameters:

Name Type Description Default

message_id

bytes

the message id of the expected response message

required
Source code in hololinked/hololinked/core/zmq/brokers.py
def recv_response(self, message_id: bytes) -> ResponseMessage:
    """
    Receives response from server. Messages are identified by message id, and out of order messages are sent to
    a cache which may be popped later. This method blocks until the expected message is received or `stop_polling()`
    is called from another thread.

    Parameters
    ----------
    message_id: bytes
        the message id of the expected response message
    """
    self._stop = False
    while not self._stop:
        if message_id in self._response_cache:
            return self._response_cache.pop(message_id)
        try:
            if not self._poller_lock.acquire(timeout=self.poll_timeout / 1000 if self.poll_timeout else -1):
                continue
            sockets = self.poller.poll(self.poll_timeout)
            response_message = None  # type: ResponseMessage
            for socket, _ in sockets:
                try:
                    raw_message = socket.recv_multipart(zmq.NOBLOCK)
                    response_message = ResponseMessage(raw_message)
                except zmq.Again:
                    pass
                if response_message:
                    if self.handled_default_message_types(response_message):
                        continue
                    if message_id != response_message.id:
                        self._response_cache[response_message.id] = response_message
                        self.logger.debug(
                            "cached response as it does not corresponding to expected ID",
                            msg_id=response_message.id,
                            expected_msg_id=message_id,
                        )
                    else:
                        self.logger.debug(
                            "received response",
                            msg_id=response_message.id,
                            message_type=response_message.type,
                        )
                        return response_message
        finally:
            try:
                self._poller_lock.release()
            except Exception as ex:  # noqa
                # TODO log exception message and undo noqa
                # 1. no need to release an unacquired lock, which can happen if another thread polling
                # put the expected message in response message cache
                # 2. also release the lock in every iteration because a message may be added in response cache
                # and may not return the method, which means the loop will run again and the lock needs to reacquired
                self.logger.warning(f"could not release poller lock for recv_response - {str(ex)}")

execute

execute(thing_id: bytes, objekt: str, operation: str, payload: SerializableData = SerializableNone, preserialized_payload: PreserializedData = PreserializedEmptyByte, server_execution_context: ServerExecutionContext = default_server_execution_context, thing_execution_context: ThingExecutionContext = default_thing_execution_context) -> ResponseMessage

send an operation and receive the response for it.

Parameters:

Name Type Description Default

thing_id

bytes

id of the Thing on which an operation is to be performed

required

objekt

str

name of property, action or event (usually only property or action)

required

operation

str

operation to be performed, like readproperty, writeproperty, invokeaction etc.

required

payload

SerializableData

serializable data to be sent as payload

SerializableNone

preserialized_payload

PreserializedData

pre-encoded data to be sent as payload, generally used for large or custom data that is already serialized

PreserializedEmptyByte

server_execution_context

ServerExecutionContext

Specify server level execution context like invokationTimeout, executionTimeout, oneway operation etc.

default_server_execution_context

thing_execution_context

ThingExecutionContext

Specify thing level execution context like fetchExecutionLogs etc.

default_thing_execution_context

Returns:

Type Description
ResponseMessage

response message from server after completing the operation

Source code in hololinked/hololinked/core/zmq/brokers.py
def execute(
    self,
    thing_id: bytes,
    objekt: str,
    operation: str,
    payload: SerializableData = SerializableNone,
    preserialized_payload: PreserializedData = PreserializedEmptyByte,
    server_execution_context: ServerExecutionContext = default_server_execution_context,
    thing_execution_context: ThingExecutionContext = default_thing_execution_context,
) -> ResponseMessage:
    """
    send an operation and receive the response for it.

    Parameters
    ----------
    thing_id: str
        `id` of the `Thing` on which an operation is to be performed
    objekt: str
        name of property, action or event (usually only property or action)
    operation: str
        operation to be performed, like `readproperty`, `writeproperty`, `invokeaction` etc.
    payload: SerializableData
        serializable data to be sent as payload
    preserialized_payload: PreserializedData
        pre-encoded data to be sent as payload, generally used for large or custom data that is already serialized
    server_execution_context: dict[str, Any]
        Specify server level execution context like `invokationTimeout`, `executionTimeout`, `oneway` operation etc.
    thing_execution_context: dict[str, Any]
        Specify thing level execution context like `fetchExecutionLogs` etc.

    Returns
    -------
    ResponseMessage
        response message from server after completing the operation
    """
    message_id = self.send_request(
        thing_id=thing_id,
        objekt=objekt,
        operation=operation,
        payload=payload,
        preserialized_payload=preserialized_payload,
        server_execution_context=server_execution_context,
        thing_execution_context=thing_execution_context,
    )
    return self.recv_response(message_id=message_id)

handshake

handshake(timeout: float | int = 60000) -> None

handshake with server before sending first message

Parameters:

Name Type Description Default

timeout

float | int

timeout in milliseconds to wait for handshake to complete. If handshake does not complete within this time, a ConnectionError is raised. If None, wait indefinitely until handshake completes.

60000
Source code in hololinked/hololinked/core/zmq/brokers.py
def handshake(self, timeout: float | int = 60000) -> None:
    """
    handshake with server before sending first message

    Parameters
    ----------
    timeout: float | int
        timeout in milliseconds to wait for handshake to complete. If handshake does not complete within this time,
        a `ConnectionError` is raised. If None, wait indefinitely until handshake completes.
    """
    self._stop = False
    start_time = time.time_ns()
    while not self._stop:
        if timeout is not None and (time.time_ns() - start_time) / 1e6 > timeout:
            raise ConnectionError(f"Unable to contact server '{self.server_id}' from client '{self.id}'")
        handshake_message = RequestMessage.craft_with_message_type(self.id, self.server_id, HANDSHAKE)
        self.socket.send_multipart(handshake_message.byte_array)
        self.logger.info("sent Handshake to server")
        if self.poller.poll(500):
            try:
                raw_message = self.socket.recv_multipart(zmq.NOBLOCK)
                response_message = ResponseMessage(raw_message)
            except zmq.Again:
                pass
            else:
                if response_message.type == HANDSHAKE:
                    self.logger.info("client handshook with server")
                    break
                elif self.handled_default_message_types(response_message):
                    continue
                else:
                    self._response_cache[response_message.id] = response_message
        else:
            self.logger.info("got no response for handshake")
    self._monitor_socket = self.socket.get_monitor_socket()
    self.poller.register(self._monitor_socket, zmq.POLLIN)