Skip to content

hololinked.core.zmq.brokers.SyncZMQClient

Bases: BaseZMQClient, BaseSyncZMQ

Synchronous ZMQ client that connect with sync or async server based on ZMQ transport. Works like REQ-REP socket. Each request is blocking until response is received. Suitable for most purposes.

Parameters:

Name Type Description Default

server_id

str

The instance name of the server (or Thing)

required

id

str

Unique id of the client to receive messages from the server. Each client connecting to same server must still have unique ID.

required

client_type

ZMQ or HTTP Server

required

handshake

bool

when true, handshake with the server first before allowing first message and block until that handshake was accomplished.

True

transport

str

transport implemented by the server

IPC

**kwargs

socket_address: str socket address for connecting to TCP server

{}
Source code in hololinked\core\zmq\brokers.py
class SyncZMQClient(BaseZMQClient, BaseSyncZMQ):
    """
    Synchronous ZMQ client that connect with sync or async server based on ZMQ transport. Works like REQ-REP socket. 
    Each request is blocking until response is received. Suitable for most purposes. 

    Parameters
    ----------
    server_id: str
        The instance name of the server (or ``Thing``)
    id: str 
        Unique id of the client to receive messages from the server. Each client connecting to same server must 
        still have unique ID. 
    client_type: str
        ZMQ or HTTP Server
    handshake: bool
        when true, handshake with the server first before allowing first message and block until that handshake was
        accomplished.
    transport: str | Enum, TCP, IPC or INPROC, default IPC 
        transport implemented by the server 
    **kwargs:
        socket_address: str
            socket address for connecting to TCP server
    """

    def __init__(self, 
                id: str,
                server_id: str, 
                handshake: bool = True, 
                transport: str = ZMQ_TRANSPORTS.IPC, 
                context: zmq.Context | None = None,
                **kwargs
            ) -> None:
        super().__init__(id=id, server_id=server_id, **kwargs)
        self.create_socket(id=id, 
                        node_type='client', 
                        socket_address=server_id if str(transport) in ["IPC", "INPROC"] else kwargs.pop('tcp_socket_address', None),
                        context=context, 
                        transport=transport, 
                        **kwargs
                    )
        self._terminate_context = context == None
        self.poller = zmq.Poller()
        self.poller.register(self.socket, zmq.POLLIN)
        # print("context on client", self.context)
        if handshake:
            self.handshake(kwargs.pop("handshake_timeout", 60000))

    def send_request(self, 
                    thing_id: bytes, 
                    objekt: str, 
                    operation: str, 
                    payload: SerializableData = SerializableNone,
                    preserialized_payload: PreserializedData = PreserializedEmptyByte,
                    server_execution_context: ServerExecutionContext = default_server_execution_context,
                    thing_execution_context: ThingExecutionContext = default_thing_execution_context
                ) -> bytes:
        """
        send message to server. 

        Parameters
        ----------
        operation: str
            unique str identifying a server side or ``Thing`` resource. These values corresponding 
            to automatically extracted name from the object name or the URL_path prepended with the instance name. 
        arguments: Dict[str, Any]
            if the operation invokes a method, arguments of that method. 
        server_execution_context: Dict[str, Any]
            see execution context definitions
        thing_execution_context: Dict[str, Any]
            see execution context definitions

        Returns
        -------
        message id: bytes
            a byte representation of message id
        """
        request_message = RequestMessage.craft_from_arguments(
                                            receiver_id=self.server_id,
                                            sender_id=self.id,
                                            thing_id=thing_id,
                                            objekt=objekt,
                                            operation=operation,
                                            payload=payload,
                                            preserialized_payload=preserialized_payload,
                                            server_execution_context=server_execution_context,
                                            thing_execution_context=thing_execution_context
                                        )
        self.socket.send_multipart(request_message.byte_array)
        self.logger.debug(f"sent operation '{operation}' on thing '{thing_id}' to server '{self.server_id}' with msg-id '{request_message.id}'")
        return request_message.id

    def recv_response(self, message_id: bytes) -> ResponseMessage:
        """
        Receives response from server. Messages are identified by message id, so call this method immediately after 
        calling ``send_request()`` to avoid receiving messages out of order. Or, use other methods like
        ``execute()``, ``read_attribute()`` or ``write_attribute()``.

        Parameters
        ----------
        raise_client_side_exception: bool, default False
            if True, any exceptions raised during execution inside ``Thing`` instance will be raised on the client.
            See docs of ``raise_local_exception()`` for info on exception 
        """
        while True:
            if message_id in self._response_cache:
                return self._response_cache.pop(message_id)
            sockets = self.poller.poll(self.poll_timeout)
            response_message = None # type: ResponseMessage
            for socket, _ in sockets:
                try:    
                    raw_message = socket.recv_multipart(zmq.NOBLOCK)
                    response_message = ResponseMessage(raw_message)
                except zmq.Again:
                    pass 
            if response_message: 
                if message_id != response_message.id:
                    self._response_cache[message_id] = response_message
                    self.logger.debug("cached response with msg-id {}".format(response_message.id))
                else:
                    self.logger.debug("received response with msg-id {}".format(response_message.id))
                    return response_message


    def execute(self, 
            thing_id: bytes, 
            objekt: str,
            operation: str, 
            payload: SerializableData = SerializableNone,
            preserialized_payload: PreserializedData = PreserializedEmptyByte,
            server_execution_context: ServerExecutionContext = default_server_execution_context,
            thing_execution_context: ThingExecutionContext = default_thing_execution_context,
        ) -> ResponseMessage:
        """
        send an operation and receive the response for it. 

        Parameters
        ----------
        operation: str
            unique str identifying a server side or ``Thing`` resource. These values corresponding 
            to automatically extracted name from the object name or the URL_path prepended with the instance name. 
        arguments: Dict[str, Any]
            if the operation invokes a method, arguments of that method. 
        server_execution_context: Dict[str, Any]
            see execution context definitions
        thing_execution_context: Dict[str, Any]
            see execution context definitions
        raise_client_side_exception: bool, default False
            if True, any exceptions raised during execution inside ``Thing`` instance will be raised on the client.
            See docs of ``raise_local_exception()`` for info on exception
        deserialize_response: bool, default True
            if True, deserializes the response from server

        Returns
        -------
        message id: bytes
            a byte representation of message id
        """
        message_id = self.send_request(
                                thing_id=thing_id, 
                                objekt=objekt, 
                                operation=operation, 
                                payload=payload,
                                preserialized_payload=preserialized_payload,
                                server_execution_context=server_execution_context, 
                                thing_execution_context=thing_execution_context
                            )
        return self.recv_response(message_id=message_id)


    def handshake(self, timeout: typing.Union[float, int] = 60000) -> None: 
        """
        hanshake with server before sending first message
        """
        start_time = time.time_ns()
        while True:
            if timeout is not None and (time.time_ns() - start_time)/1e6 > timeout:
                raise ConnectionError(f"Unable to contact server '{self.server_id}' from client '{self.id}'")
            self.socket.send_multipart(RequestMessage.craft_with_message_type(self.id, self.server_id, HANDSHAKE).byte_array)
            self.logger.info(f"sent Handshake to server '{self.server_id}'")
            if self.poller.poll(500):
                try:
                    raw_message = self.socket.recv_multipart(zmq.NOBLOCK)
                    response_message = ResponseMessage(raw_message)
                except zmq.Again:
                    pass 
                else:
                    if response_message.type == HANDSHAKE:  
                        self.logger.info(f"client '{self.id}' handshook with server '{self.server_id}'")
                        break
                    else:
                        raise ConnectionAbortedError(f"Handshake cannot be done with '{self.server_id}'." + 
                                                    "Another message arrived before handshake complete.")
            else:
                self.logger.info('got no response for handshake')
        self._monitor_socket = self.socket.get_monitor_socket()
        self.poller.register(self._monitor_socket, zmq.POLLIN) 

Functions

__init__

__init__(id: str, server_id: str, handshake: bool = True, transport: str = ZMQ_TRANSPORTS.IPC, context: zmq.Context | None = None, **kwargs) -> None
Source code in hololinked\core\zmq\brokers.py
def __init__(self, 
            id: str,
            server_id: str, 
            handshake: bool = True, 
            transport: str = ZMQ_TRANSPORTS.IPC, 
            context: zmq.Context | None = None,
            **kwargs
        ) -> None:
    super().__init__(id=id, server_id=server_id, **kwargs)
    self.create_socket(id=id, 
                    node_type='client', 
                    socket_address=server_id if str(transport) in ["IPC", "INPROC"] else kwargs.pop('tcp_socket_address', None),
                    context=context, 
                    transport=transport, 
                    **kwargs
                )
    self._terminate_context = context == None
    self.poller = zmq.Poller()
    self.poller.register(self.socket, zmq.POLLIN)
    # print("context on client", self.context)
    if handshake:
        self.handshake(kwargs.pop("handshake_timeout", 60000))

send_request

send_request(thing_id: bytes, objekt: str, operation: str, payload: SerializableData = SerializableNone, preserialized_payload: PreserializedData = PreserializedEmptyByte, server_execution_context: ServerExecutionContext = default_server_execution_context, thing_execution_context: ThingExecutionContext = default_thing_execution_context) -> bytes

send message to server.

Parameters:

Name Type Description Default

operation

str

unique str identifying a server side or Thing resource. These values corresponding to automatically extracted name from the object name or the URL_path prepended with the instance name.

required

arguments

if the operation invokes a method, arguments of that method.

required

server_execution_context

ServerExecutionContext

see execution context definitions

default_server_execution_context

thing_execution_context

ThingExecutionContext

see execution context definitions

default_thing_execution_context

Returns:

Type Description
message id: bytes

a byte representation of message id

Source code in hololinked\core\zmq\brokers.py
def send_request(self, 
                thing_id: bytes, 
                objekt: str, 
                operation: str, 
                payload: SerializableData = SerializableNone,
                preserialized_payload: PreserializedData = PreserializedEmptyByte,
                server_execution_context: ServerExecutionContext = default_server_execution_context,
                thing_execution_context: ThingExecutionContext = default_thing_execution_context
            ) -> bytes:
    """
    send message to server. 

    Parameters
    ----------
    operation: str
        unique str identifying a server side or ``Thing`` resource. These values corresponding 
        to automatically extracted name from the object name or the URL_path prepended with the instance name. 
    arguments: Dict[str, Any]
        if the operation invokes a method, arguments of that method. 
    server_execution_context: Dict[str, Any]
        see execution context definitions
    thing_execution_context: Dict[str, Any]
        see execution context definitions

    Returns
    -------
    message id: bytes
        a byte representation of message id
    """
    request_message = RequestMessage.craft_from_arguments(
                                        receiver_id=self.server_id,
                                        sender_id=self.id,
                                        thing_id=thing_id,
                                        objekt=objekt,
                                        operation=operation,
                                        payload=payload,
                                        preserialized_payload=preserialized_payload,
                                        server_execution_context=server_execution_context,
                                        thing_execution_context=thing_execution_context
                                    )
    self.socket.send_multipart(request_message.byte_array)
    self.logger.debug(f"sent operation '{operation}' on thing '{thing_id}' to server '{self.server_id}' with msg-id '{request_message.id}'")
    return request_message.id

recv_response

recv_response(message_id: bytes) -> ResponseMessage

Receives response from server. Messages are identified by message id, so call this method immediately after calling send_request() to avoid receiving messages out of order. Or, use other methods like execute(), read_attribute() or write_attribute().

Parameters:

Name Type Description Default

raise_client_side_exception

if True, any exceptions raised during execution inside Thing instance will be raised on the client. See docs of raise_local_exception() for info on exception

required
Source code in hololinked\core\zmq\brokers.py
def recv_response(self, message_id: bytes) -> ResponseMessage:
    """
    Receives response from server. Messages are identified by message id, so call this method immediately after 
    calling ``send_request()`` to avoid receiving messages out of order. Or, use other methods like
    ``execute()``, ``read_attribute()`` or ``write_attribute()``.

    Parameters
    ----------
    raise_client_side_exception: bool, default False
        if True, any exceptions raised during execution inside ``Thing`` instance will be raised on the client.
        See docs of ``raise_local_exception()`` for info on exception 
    """
    while True:
        if message_id in self._response_cache:
            return self._response_cache.pop(message_id)
        sockets = self.poller.poll(self.poll_timeout)
        response_message = None # type: ResponseMessage
        for socket, _ in sockets:
            try:    
                raw_message = socket.recv_multipart(zmq.NOBLOCK)
                response_message = ResponseMessage(raw_message)
            except zmq.Again:
                pass 
        if response_message: 
            if message_id != response_message.id:
                self._response_cache[message_id] = response_message
                self.logger.debug("cached response with msg-id {}".format(response_message.id))
            else:
                self.logger.debug("received response with msg-id {}".format(response_message.id))
                return response_message

execute

execute(thing_id: bytes, objekt: str, operation: str, payload: SerializableData = SerializableNone, preserialized_payload: PreserializedData = PreserializedEmptyByte, server_execution_context: ServerExecutionContext = default_server_execution_context, thing_execution_context: ThingExecutionContext = default_thing_execution_context) -> ResponseMessage

send an operation and receive the response for it.

Parameters:

Name Type Description Default

operation

str

unique str identifying a server side or Thing resource. These values corresponding to automatically extracted name from the object name or the URL_path prepended with the instance name.

required

arguments

if the operation invokes a method, arguments of that method.

required

server_execution_context

ServerExecutionContext

see execution context definitions

default_server_execution_context

thing_execution_context

ThingExecutionContext

see execution context definitions

default_thing_execution_context

raise_client_side_exception

if True, any exceptions raised during execution inside Thing instance will be raised on the client. See docs of raise_local_exception() for info on exception

required

deserialize_response

if True, deserializes the response from server

required

Returns:

Type Description
message id: bytes

a byte representation of message id

Source code in hololinked\core\zmq\brokers.py
def execute(self, 
        thing_id: bytes, 
        objekt: str,
        operation: str, 
        payload: SerializableData = SerializableNone,
        preserialized_payload: PreserializedData = PreserializedEmptyByte,
        server_execution_context: ServerExecutionContext = default_server_execution_context,
        thing_execution_context: ThingExecutionContext = default_thing_execution_context,
    ) -> ResponseMessage:
    """
    send an operation and receive the response for it. 

    Parameters
    ----------
    operation: str
        unique str identifying a server side or ``Thing`` resource. These values corresponding 
        to automatically extracted name from the object name or the URL_path prepended with the instance name. 
    arguments: Dict[str, Any]
        if the operation invokes a method, arguments of that method. 
    server_execution_context: Dict[str, Any]
        see execution context definitions
    thing_execution_context: Dict[str, Any]
        see execution context definitions
    raise_client_side_exception: bool, default False
        if True, any exceptions raised during execution inside ``Thing`` instance will be raised on the client.
        See docs of ``raise_local_exception()`` for info on exception
    deserialize_response: bool, default True
        if True, deserializes the response from server

    Returns
    -------
    message id: bytes
        a byte representation of message id
    """
    message_id = self.send_request(
                            thing_id=thing_id, 
                            objekt=objekt, 
                            operation=operation, 
                            payload=payload,
                            preserialized_payload=preserialized_payload,
                            server_execution_context=server_execution_context, 
                            thing_execution_context=thing_execution_context
                        )
    return self.recv_response(message_id=message_id)

handshake

handshake(timeout: typing.Union[float, int] = 60000) -> None

hanshake with server before sending first message

Source code in hololinked\core\zmq\brokers.py
def handshake(self, timeout: typing.Union[float, int] = 60000) -> None: 
    """
    hanshake with server before sending first message
    """
    start_time = time.time_ns()
    while True:
        if timeout is not None and (time.time_ns() - start_time)/1e6 > timeout:
            raise ConnectionError(f"Unable to contact server '{self.server_id}' from client '{self.id}'")
        self.socket.send_multipart(RequestMessage.craft_with_message_type(self.id, self.server_id, HANDSHAKE).byte_array)
        self.logger.info(f"sent Handshake to server '{self.server_id}'")
        if self.poller.poll(500):
            try:
                raw_message = self.socket.recv_multipart(zmq.NOBLOCK)
                response_message = ResponseMessage(raw_message)
            except zmq.Again:
                pass 
            else:
                if response_message.type == HANDSHAKE:  
                    self.logger.info(f"client '{self.id}' handshook with server '{self.server_id}'")
                    break
                else:
                    raise ConnectionAbortedError(f"Handshake cannot be done with '{self.server_id}'." + 
                                                "Another message arrived before handshake complete.")
        else:
            self.logger.info('got no response for handshake')
    self._monitor_socket = self.socket.get_monitor_socket()
    self.poller.register(self._monitor_socket, zmq.POLLIN)