Skip to content

BaseZMQ

hololinked.core.zmq.brokers.BaseZMQ

Base class for all ZMQ message brokers. Implements socket creation & logger config, which are common to all server and client implementations.

Source code in hololinked/hololinked/core/zmq/brokers.py
class BaseZMQ:
    """
    Base class for all ZMQ message brokers. Implements socket creation & logger config,
    which are common to all server and client implementations.
    """

    def __init__(self, id: str, **kwargs) -> None:
        """
        id: str
            unique ID of the server/client. This is used as the identity of the ZMQ socket.
        logger: logging.Logger, optional
            logger instance to use. If None, a default logger is created.
        """
        super().__init__()
        self.id = id  # type: str
        self.context = self.context if hasattr(self, "context") and self.context else None  # type: zmq.Context | zmq.asyncio.Context
        self.socket = self.socket if hasattr(self, "socket") and self.socket else None  # type: zmq.Socket | None
        self.socket_address = self.socket_address if hasattr(self, "socket_address") and self.socket_address else None  # type: str | None

    def exit(self) -> None:
        """
        Cleanup method to terminate ZMQ sockets and contexts before quitting. Called by `__del__()`
        automatically. Each subclass server/client should implement their version of exiting if necessary.
        """
        if not hasattr(self, "logger") or not self.logger:
            self.logger = structlog.get_logger().bind(component="broker", impl=self.__class__.__name__, id=self.id)

    def __del__(self) -> None:
        self.exit()

    @classmethod
    def get_socket(
        cls,
        *,
        server_id: str,
        socket_id: str,
        node_type: str,
        context: zmq.asyncio.Context | zmq.Context,
        access_point: str = ZMQ_TRANSPORTS.IPC,
        socket_type: zmq.SocketType = zmq.ROUTER,
        **kwargs,
    ) -> tuple[zmq.Socket, str]:
        """
        Create a socket with certain specifications. Supported ZeroMQ transports are TCP, IPC & INPROC.
        For IPC sockets, a file is created under TEMP_DIR of global configuration.

        Parameters
        ----------
        server_id: str
            Used to create socket address
        socket_id: str
            Each ROUTER socket require unique identity to correctly route the messages, usually same as `server_id`
        node_type: str
            server or client? i.e. whether to bind (server) or connect (client) as per ZMQ definition
        context: zmq.Context or zmq.asyncio.Context
            ZeroMQ Context object that creates the socket
        access_point: Enum
            `TCP`, `IPC` or `INPROC`. Message crafting-passing-routing is transport-invariant as suggested by ZMQ.
            Speed relationship - `INPROC` > `IPC` > `TCP`. For client side `TCP`, specify the TCP address - `tcp://<host>:<port>`
            or a random port will be chosen.
        socket_type: zmq.SocketType, default zmq.ROUTER
            Usually a ROUTER socket is implemented for both client-server and peer-to-peer communication. But other sockets
            like PAIR, DEALER, etc. can also be used as per the use-case.
        kwargs: dict
            Additional arguments:

            - `socket_class`: `zmq.Socket` class to use, default is `zmq.Socket` for sync and `zmq.asyncio.Socket` for async.

        Returns
        -------
        socket: zmq.Socket
            created socket
        socket_address: str
            qualified address of the socket created for any transport type

        Raises
        ------
        NotImplementedError
            if transport other than `TCP`, `IPC` or `INPROC` is used
        RuntimeError
            if transport is `TCP` and a socket connection from client side is requested but a socket address is not supplied
        """
        if node_type.lower() not in ["server", "client"]:
            raise ValueError(f"Invalid node_type: {node_type}")
        bind = node_type.lower() == "server"
        if len(access_point) == 3 or len(access_point) == 6 or isinstance(access_point, Enum):
            transport = access_point
            socket_address = None
        else:
            transport = access_point.split("://")[0].upper()
            socket_address = access_point

        socket = context.socket(socket_type, socket_class=kwargs.get("socket_class", None))
        socket.setsockopt_string(zmq.IDENTITY, socket_id)

        if transport == ZMQ_TRANSPORTS.IPC or transport.lower() == "ipc":
            if socket_address is None or not socket_address.endswith(".ipc"):
                if not socket_address:
                    filename = get_sanitized_filename_from_random_string(server_id, "ipc")
                elif not socket_address.endswith(".ipc"):
                    filename = get_sanitized_filename_from_random_string(socket_address, "ipc")
                # re-compute for IPC because it looks for a file in a directory
                filename = filename.replace(
                    EventPublisher._standard_address_suffix.replace("/", "_"),
                    f".{EventPublisher._standard_address_suffix_filename_replacement}",
                )
                socket_address = "ipc://{}{}{}".format(global_config.TEMP_DIR_SOCKETS, os.sep, filename)
            if bind:
                socket.bind(socket_address)
            else:
                socket.connect(socket_address)
        elif transport == ZMQ_TRANSPORTS.TCP or transport.lower() == "tcp":
            if bind:
                failed = False
                if socket_address:
                    try:
                        socket.bind(socket_address)
                    except zmq.error.ZMQError as ex:
                        if not ex.strerror.startswith("Address in use"):
                            raise ex from None
                        failed = True
                if failed or not socket_address:
                    for i in range(
                        global_config.TCP_SOCKET_SEARCH_START_PORT,
                        global_config.TCP_SOCKET_SEARCH_END_PORT,
                    ):
                        socket_address = "tcp://0.0.0.0:{}".format(i)
                        try:
                            socket.bind(socket_address)
                            break
                        except zmq.error.ZMQError as ex:
                            if not ex.strerror.startswith("Address in use"):
                                raise ex from None
            elif socket_address:
                socket.connect(socket_address)
            else:
                raise RuntimeError(f"Socket address not supplied for TCP connection to identity - {socket_id}")
        elif transport == ZMQ_TRANSPORTS.INPROC or transport.lower() == "inproc":
            # inproc_id = id.replace('/', '_').replace('-', '_')
            if socket_address is None:
                socket_address = f"inproc://{server_id}"
            elif not socket_address.startswith("inproc://"):
                socket_address = f"inproc://{socket_address}"
            if bind:
                socket.bind(socket_address)
            else:
                socket.connect(socket_address)
        else:
            raise NotImplementedError(
                "transports other than IPC, TCP & INPROC are not implemented now for {}.".format(cls.__name__)
                + f" Given transport {transport}."
            )
        return socket, socket_address

Functions

__init__

__init__(id: str, **kwargs) -> None

id: str unique ID of the server/client. This is used as the identity of the ZMQ socket. logger: logging.Logger, optional logger instance to use. If None, a default logger is created.

Source code in hololinked/hololinked/core/zmq/brokers.py
def __init__(self, id: str, **kwargs) -> None:
    """
    id: str
        unique ID of the server/client. This is used as the identity of the ZMQ socket.
    logger: logging.Logger, optional
        logger instance to use. If None, a default logger is created.
    """
    super().__init__()
    self.id = id  # type: str
    self.context = self.context if hasattr(self, "context") and self.context else None  # type: zmq.Context | zmq.asyncio.Context
    self.socket = self.socket if hasattr(self, "socket") and self.socket else None  # type: zmq.Socket | None
    self.socket_address = self.socket_address if hasattr(self, "socket_address") and self.socket_address else None  # type: str | None

exit

exit() -> None

Cleanup method to terminate ZMQ sockets and contexts before quitting. Called by __del__() automatically. Each subclass server/client should implement their version of exiting if necessary.

Source code in hololinked/hololinked/core/zmq/brokers.py
def exit(self) -> None:
    """
    Cleanup method to terminate ZMQ sockets and contexts before quitting. Called by `__del__()`
    automatically. Each subclass server/client should implement their version of exiting if necessary.
    """
    if not hasattr(self, "logger") or not self.logger:
        self.logger = structlog.get_logger().bind(component="broker", impl=self.__class__.__name__, id=self.id)

get_socket classmethod

get_socket(*, server_id: str, socket_id: str, node_type: str, context: Context | Context, access_point: str = ZMQ_TRANSPORTS.IPC, socket_type: SocketType = zmq.ROUTER, **kwargs) -> tuple[zmq.Socket, str]

Create a socket with certain specifications. Supported ZeroMQ transports are TCP, IPC & INPROC. For IPC sockets, a file is created under TEMP_DIR of global configuration.

Parameters:

Name Type Description Default
server_id
str

Used to create socket address

required
socket_id
str

Each ROUTER socket require unique identity to correctly route the messages, usually same as server_id

required
node_type
str

server or client? i.e. whether to bind (server) or connect (client) as per ZMQ definition

required
context
Context | Context

ZeroMQ Context object that creates the socket

required
access_point
str

TCP, IPC or INPROC. Message crafting-passing-routing is transport-invariant as suggested by ZMQ. Speed relationship - INPROC > IPC > TCP. For client side TCP, specify the TCP address - tcp://<host>:<port> or a random port will be chosen.

IPC
socket_type
SocketType

Usually a ROUTER socket is implemented for both client-server and peer-to-peer communication. But other sockets like PAIR, DEALER, etc. can also be used as per the use-case.

ROUTER
kwargs

Additional arguments:

  • socket_class: zmq.Socket class to use, default is zmq.Socket for sync and zmq.asyncio.Socket for async.
{}

Returns:

Name Type Description
socket Socket

created socket

socket_address str

qualified address of the socket created for any transport type

Raises:

Type Description
NotImplementedError

if transport other than TCP, IPC or INPROC is used

RuntimeError

if transport is TCP and a socket connection from client side is requested but a socket address is not supplied

Source code in hololinked/hololinked/core/zmq/brokers.py
@classmethod
def get_socket(
    cls,
    *,
    server_id: str,
    socket_id: str,
    node_type: str,
    context: zmq.asyncio.Context | zmq.Context,
    access_point: str = ZMQ_TRANSPORTS.IPC,
    socket_type: zmq.SocketType = zmq.ROUTER,
    **kwargs,
) -> tuple[zmq.Socket, str]:
    """
    Create a socket with certain specifications. Supported ZeroMQ transports are TCP, IPC & INPROC.
    For IPC sockets, a file is created under TEMP_DIR of global configuration.

    Parameters
    ----------
    server_id: str
        Used to create socket address
    socket_id: str
        Each ROUTER socket require unique identity to correctly route the messages, usually same as `server_id`
    node_type: str
        server or client? i.e. whether to bind (server) or connect (client) as per ZMQ definition
    context: zmq.Context or zmq.asyncio.Context
        ZeroMQ Context object that creates the socket
    access_point: Enum
        `TCP`, `IPC` or `INPROC`. Message crafting-passing-routing is transport-invariant as suggested by ZMQ.
        Speed relationship - `INPROC` > `IPC` > `TCP`. For client side `TCP`, specify the TCP address - `tcp://<host>:<port>`
        or a random port will be chosen.
    socket_type: zmq.SocketType, default zmq.ROUTER
        Usually a ROUTER socket is implemented for both client-server and peer-to-peer communication. But other sockets
        like PAIR, DEALER, etc. can also be used as per the use-case.
    kwargs: dict
        Additional arguments:

        - `socket_class`: `zmq.Socket` class to use, default is `zmq.Socket` for sync and `zmq.asyncio.Socket` for async.

    Returns
    -------
    socket: zmq.Socket
        created socket
    socket_address: str
        qualified address of the socket created for any transport type

    Raises
    ------
    NotImplementedError
        if transport other than `TCP`, `IPC` or `INPROC` is used
    RuntimeError
        if transport is `TCP` and a socket connection from client side is requested but a socket address is not supplied
    """
    if node_type.lower() not in ["server", "client"]:
        raise ValueError(f"Invalid node_type: {node_type}")
    bind = node_type.lower() == "server"
    if len(access_point) == 3 or len(access_point) == 6 or isinstance(access_point, Enum):
        transport = access_point
        socket_address = None
    else:
        transport = access_point.split("://")[0].upper()
        socket_address = access_point

    socket = context.socket(socket_type, socket_class=kwargs.get("socket_class", None))
    socket.setsockopt_string(zmq.IDENTITY, socket_id)

    if transport == ZMQ_TRANSPORTS.IPC or transport.lower() == "ipc":
        if socket_address is None or not socket_address.endswith(".ipc"):
            if not socket_address:
                filename = get_sanitized_filename_from_random_string(server_id, "ipc")
            elif not socket_address.endswith(".ipc"):
                filename = get_sanitized_filename_from_random_string(socket_address, "ipc")
            # re-compute for IPC because it looks for a file in a directory
            filename = filename.replace(
                EventPublisher._standard_address_suffix.replace("/", "_"),
                f".{EventPublisher._standard_address_suffix_filename_replacement}",
            )
            socket_address = "ipc://{}{}{}".format(global_config.TEMP_DIR_SOCKETS, os.sep, filename)
        if bind:
            socket.bind(socket_address)
        else:
            socket.connect(socket_address)
    elif transport == ZMQ_TRANSPORTS.TCP or transport.lower() == "tcp":
        if bind:
            failed = False
            if socket_address:
                try:
                    socket.bind(socket_address)
                except zmq.error.ZMQError as ex:
                    if not ex.strerror.startswith("Address in use"):
                        raise ex from None
                    failed = True
            if failed or not socket_address:
                for i in range(
                    global_config.TCP_SOCKET_SEARCH_START_PORT,
                    global_config.TCP_SOCKET_SEARCH_END_PORT,
                ):
                    socket_address = "tcp://0.0.0.0:{}".format(i)
                    try:
                        socket.bind(socket_address)
                        break
                    except zmq.error.ZMQError as ex:
                        if not ex.strerror.startswith("Address in use"):
                            raise ex from None
        elif socket_address:
            socket.connect(socket_address)
        else:
            raise RuntimeError(f"Socket address not supplied for TCP connection to identity - {socket_id}")
    elif transport == ZMQ_TRANSPORTS.INPROC or transport.lower() == "inproc":
        # inproc_id = id.replace('/', '_').replace('-', '_')
        if socket_address is None:
            socket_address = f"inproc://{server_id}"
        elif not socket_address.startswith("inproc://"):
            socket_address = f"inproc://{socket_address}"
        if bind:
            socket.bind(socket_address)
        else:
            socket.connect(socket_address)
    else:
        raise NotImplementedError(
            "transports other than IPC, TCP & INPROC are not implemented now for {}.".format(cls.__name__)
            + f" Given transport {transport}."
        )
    return socket, socket_address

hololinked.core.zmq.brokers.BaseAsyncZMQ

Bases: BaseZMQ

Base class for all async ZMQ servers and clients.

Source code in hololinked/hololinked/core/zmq/brokers.py
class BaseAsyncZMQ(BaseZMQ):
    """
    Base class for all async ZMQ servers and clients.
    """

    # init of this class must always take empty arguments due to inheritance structure

    def create_socket(
        self,
        *,
        server_id: str,
        socket_id: str,
        node_type: str = "server",
        context: zmq.asyncio.Context | None = None,
        access_point: str = ZMQ_TRANSPORTS.IPC,
        socket_type: zmq.SocketType = zmq.ROUTER,
        **kwargs,
    ) -> None:
        """
        Overloads `create_socket()` to create, bind/connect an async socket. A global context is used if
        none is supplied.
        """
        if context and not isinstance(context, zmq.asyncio.Context):
            raise TypeError(
                "async ZMQ message broker accepts only async ZMQ context. supplied type {}".format(type(context))
            )
        self.context = context or global_config.zmq_context()
        self.socket, self.socket_address = BaseZMQ.get_socket(
            server_id=server_id,
            socket_id=socket_id,
            node_type=node_type,
            context=self.context,
            access_point=access_point,
            socket_type=socket_type,
            **kwargs,
        )
        self.logger.info(
            "created socket type: {} with address: {} & identity: {} and {}".format(
                get_socket_type_name(socket_type),
                self.socket_address,
                socket_id,
                "bound" if node_type == "server" else "connected",
            )
        )

Functions

create_socket

create_socket(*, server_id: str, socket_id: str, node_type: str = 'server', context: Context | None = None, access_point: str = ZMQ_TRANSPORTS.IPC, socket_type: SocketType = zmq.ROUTER, **kwargs) -> None

Overloads create_socket() to create, bind/connect an async socket. A global context is used if none is supplied.

Source code in hololinked/hololinked/core/zmq/brokers.py
def create_socket(
    self,
    *,
    server_id: str,
    socket_id: str,
    node_type: str = "server",
    context: zmq.asyncio.Context | None = None,
    access_point: str = ZMQ_TRANSPORTS.IPC,
    socket_type: zmq.SocketType = zmq.ROUTER,
    **kwargs,
) -> None:
    """
    Overloads `create_socket()` to create, bind/connect an async socket. A global context is used if
    none is supplied.
    """
    if context and not isinstance(context, zmq.asyncio.Context):
        raise TypeError(
            "async ZMQ message broker accepts only async ZMQ context. supplied type {}".format(type(context))
        )
    self.context = context or global_config.zmq_context()
    self.socket, self.socket_address = BaseZMQ.get_socket(
        server_id=server_id,
        socket_id=socket_id,
        node_type=node_type,
        context=self.context,
        access_point=access_point,
        socket_type=socket_type,
        **kwargs,
    )
    self.logger.info(
        "created socket type: {} with address: {} & identity: {} and {}".format(
            get_socket_type_name(socket_type),
            self.socket_address,
            socket_id,
            "bound" if node_type == "server" else "connected",
        )
    )

hololinked.core.zmq.brokers.BaseSyncZMQ

Bases: BaseZMQ

Base class for all sync ZMQ servers and clients

Source code in hololinked/hololinked/core/zmq/brokers.py
class BaseSyncZMQ(BaseZMQ):
    """Base class for all sync ZMQ servers and clients"""

    # init of this class must always take empty arguments due to inheritance structure

    def create_socket(
        self,
        *,
        server_id: str,
        socket_id: str,
        node_type: str = "server",
        context: zmq.Context | None = None,
        access_point: str = ZMQ_TRANSPORTS.IPC,
        socket_type: zmq.SocketType = zmq.ROUTER,
        **kwargs,
    ) -> None:
        """
        Overloads `create_socket()` to create, bind/connect a synchronous socket. A global context is used if
        none is supplied.
        """
        self.context = context or global_config.zmq_context()
        self.socket, self.socket_address = BaseZMQ.get_socket(
            server_id=server_id,
            socket_id=socket_id,
            node_type=node_type,
            context=self.context,
            access_point=access_point,
            socket_type=socket_type,
            socket_class=zmq.Socket,
            **kwargs,
        )
        self.logger.info(
            "created socket type: {} with address: {} & identity: {} and {}".format(
                get_socket_type_name(socket_type),
                self.socket_address,
                socket_id,
                "bound" if node_type == "server" else "connected",
            )
        )

Functions

create_socket

create_socket(*, server_id: str, socket_id: str, node_type: str = 'server', context: Context | None = None, access_point: str = ZMQ_TRANSPORTS.IPC, socket_type: SocketType = zmq.ROUTER, **kwargs) -> None

Overloads create_socket() to create, bind/connect a synchronous socket. A global context is used if none is supplied.

Source code in hololinked/hololinked/core/zmq/brokers.py
def create_socket(
    self,
    *,
    server_id: str,
    socket_id: str,
    node_type: str = "server",
    context: zmq.Context | None = None,
    access_point: str = ZMQ_TRANSPORTS.IPC,
    socket_type: zmq.SocketType = zmq.ROUTER,
    **kwargs,
) -> None:
    """
    Overloads `create_socket()` to create, bind/connect a synchronous socket. A global context is used if
    none is supplied.
    """
    self.context = context or global_config.zmq_context()
    self.socket, self.socket_address = BaseZMQ.get_socket(
        server_id=server_id,
        socket_id=socket_id,
        node_type=node_type,
        context=self.context,
        access_point=access_point,
        socket_type=socket_type,
        socket_class=zmq.Socket,
        **kwargs,
    )
    self.logger.info(
        "created socket type: {} with address: {} & identity: {} and {}".format(
            get_socket_type_name(socket_type),
            self.socket_address,
            socket_id,
            "bound" if node_type == "server" else "connected",
        )
    )

hololinked.core.zmq.brokers.BaseZMQServer

Bases: BaseZMQ

Base class for all ZMQ servers irrespective of sync and async

Source code in hololinked/hololinked/core/zmq/brokers.py
class BaseZMQServer(BaseZMQ):
    """Base class for all ZMQ servers irrespective of sync and async"""

    def __init__(self, id: str, logger: structlog.stdlib.BoundLogger | None = None, **kwargs) -> None:
        super().__init__(id=id, **kwargs)
        if not logger:
            logger = structlog.get_logger()
        self.logger = logger.bind(component="broker", impl=self.__class__.__name__, id=self.id)

    def handshake(self, request_message: RequestMessage) -> None:
        """
        Pass a handshake message to client. Absolutely mandatory to handshake with all clients to ensure initial messages
        do not get lost because of ZMQ's tiny but significant initial delay after creating socket.

        Parameters
        ----------
        request_message: RequestMessage
            the client message for which the handshake is being sent
        """
        run_callable_somehow(self._handshake(request_message))

    def _handshake(self, request_message: RequestMessage) -> None:
        raise NotImplementedError(
            f"handshake cannot be handled - implement _handshake in {self.__class__} to handshake."
        )

    def handle_invalid_message(self, request_message: RequestMessage, exception: Exception) -> None:
        """
        Pass an invalid message to the client when an exception occurred while parsing the message from the client
        (in `handled_default_message_types()`)

        Parameters
        ----------
        request_message: RequestMessage
            the client message for which the parsing exception occurred
        exception: Exception
            exception object raised
        """
        run_callable_somehow(self._handle_invalid_message(request_message, exception))

    def _handle_invalid_message(self, message: RequestMessage, exception: Exception) -> None:
        raise NotImplementedError(
            "invalid message cannot be handled"
            + f" - implement _handle_invalid_message in {self.__class__} to handle invalid messages."
        )

    def handle_timeout(self, request_message: RequestMessage, timeout_type: str) -> None:
        """
        Pass timeout message to the client when the operation could not be executed within specified timeouts

        Parameters
        ----------
        request_message: RequestMessage
            the client message which could not executed within the specified timeout. timeout value is
            generally specified within the execution context values.
        """
        run_callable_somehow(self._handle_timeout(request_message, timeout_type=timeout_type))

    def _handle_timeout(self, request_message: RequestMessage, timeout_type: str) -> None:
        raise NotImplementedError(
            "timeouts cannot be handled ",
            f"- implement _handle_timeout in {self.__class__} to handle timeout.",
        )

    def handle_error_message(self, request_message: RequestMessage, exception: Exception) -> None:
        """
        Pass an exception message to the client when an exception occurred while executing the operation

        Parameters
        ----------
        request_message: RequestMessage
            the client message for which the exception occurred
        exception: Exception
            exception object raised
        """
        run_callable_somehow(self._handle_error_message(request_message, exception))

    def _handle_error_message(self, request_message: RequestMessage, exception: Exception) -> None:
        raise NotImplementedError(
            "exceptions cannot be handled ",
            f"- implement _handle_error_message in {self.__class__} to handle exceptions.",
        )

    def handled_default_message_types(self, request_message: RequestMessage) -> bool:
        """
        Handle default cases for the server without further processing of the request (for example, `HANDSHAKE`).
        This method is called once/supposed to be called when the message is received or popped out of the socket.

        Parameters
        ----------
        request_message: RequestMessage
            the client message to handle
        """
        if request_message.type == HANDSHAKE:
            self.handshake(request_message)
            return True
        elif request_message.type == EXIT:
            # self.send response with message type EXIT
            raise BreakLoop(f"exit message received from {request_message.sender_id} with msg-ID {request_message.id}")
        elif request_message.length != len(request_message.byte_array):
            self.handle_invalid_message(
                request_message,
                exception=ValueError(
                    f"message length {request_message.length} does not match "
                    + f"the number of message parts {len(request_message.byte_array)}"
                ),
            )
            return True
        return False

Functions

handle_error_message

handle_error_message(request_message: RequestMessage, exception: Exception) -> None

Pass an exception message to the client when an exception occurred while executing the operation

Parameters:

Name Type Description Default
request_message
RequestMessage

the client message for which the exception occurred

required
exception
Exception

exception object raised

required
Source code in hololinked/hololinked/core/zmq/brokers.py
def handle_error_message(self, request_message: RequestMessage, exception: Exception) -> None:
    """
    Pass an exception message to the client when an exception occurred while executing the operation

    Parameters
    ----------
    request_message: RequestMessage
        the client message for which the exception occurred
    exception: Exception
        exception object raised
    """
    run_callable_somehow(self._handle_error_message(request_message, exception))

handle_invalid_message

handle_invalid_message(request_message: RequestMessage, exception: Exception) -> None

Pass an invalid message to the client when an exception occurred while parsing the message from the client (in handled_default_message_types())

Parameters:

Name Type Description Default
request_message
RequestMessage

the client message for which the parsing exception occurred

required
exception
Exception

exception object raised

required
Source code in hololinked/hololinked/core/zmq/brokers.py
def handle_invalid_message(self, request_message: RequestMessage, exception: Exception) -> None:
    """
    Pass an invalid message to the client when an exception occurred while parsing the message from the client
    (in `handled_default_message_types()`)

    Parameters
    ----------
    request_message: RequestMessage
        the client message for which the parsing exception occurred
    exception: Exception
        exception object raised
    """
    run_callable_somehow(self._handle_invalid_message(request_message, exception))

handle_timeout

handle_timeout(request_message: RequestMessage, timeout_type: str) -> None

Pass timeout message to the client when the operation could not be executed within specified timeouts

Parameters:

Name Type Description Default
request_message
RequestMessage

the client message which could not executed within the specified timeout. timeout value is generally specified within the execution context values.

required
Source code in hololinked/hololinked/core/zmq/brokers.py
def handle_timeout(self, request_message: RequestMessage, timeout_type: str) -> None:
    """
    Pass timeout message to the client when the operation could not be executed within specified timeouts

    Parameters
    ----------
    request_message: RequestMessage
        the client message which could not executed within the specified timeout. timeout value is
        generally specified within the execution context values.
    """
    run_callable_somehow(self._handle_timeout(request_message, timeout_type=timeout_type))

handled_default_message_types

handled_default_message_types(request_message: RequestMessage) -> bool

Handle default cases for the server without further processing of the request (for example, HANDSHAKE). This method is called once/supposed to be called when the message is received or popped out of the socket.

Parameters:

Name Type Description Default
request_message
RequestMessage

the client message to handle

required
Source code in hololinked/hololinked/core/zmq/brokers.py
def handled_default_message_types(self, request_message: RequestMessage) -> bool:
    """
    Handle default cases for the server without further processing of the request (for example, `HANDSHAKE`).
    This method is called once/supposed to be called when the message is received or popped out of the socket.

    Parameters
    ----------
    request_message: RequestMessage
        the client message to handle
    """
    if request_message.type == HANDSHAKE:
        self.handshake(request_message)
        return True
    elif request_message.type == EXIT:
        # self.send response with message type EXIT
        raise BreakLoop(f"exit message received from {request_message.sender_id} with msg-ID {request_message.id}")
    elif request_message.length != len(request_message.byte_array):
        self.handle_invalid_message(
            request_message,
            exception=ValueError(
                f"message length {request_message.length} does not match "
                + f"the number of message parts {len(request_message.byte_array)}"
            ),
        )
        return True
    return False

handshake

handshake(request_message: RequestMessage) -> None

Pass a handshake message to client. Absolutely mandatory to handshake with all clients to ensure initial messages do not get lost because of ZMQ's tiny but significant initial delay after creating socket.

Parameters:

Name Type Description Default
request_message
RequestMessage

the client message for which the handshake is being sent

required
Source code in hololinked/hololinked/core/zmq/brokers.py
def handshake(self, request_message: RequestMessage) -> None:
    """
    Pass a handshake message to client. Absolutely mandatory to handshake with all clients to ensure initial messages
    do not get lost because of ZMQ's tiny but significant initial delay after creating socket.

    Parameters
    ----------
    request_message: RequestMessage
        the client message for which the handshake is being sent
    """
    run_callable_somehow(self._handshake(request_message))

hololinked.core.zmq.brokers.BaseZMQClient

Bases: BaseZMQ

Base class for all ZMQ clients irrespective of sync and async

Source code in hololinked/hololinked/core/zmq/brokers.py
class BaseZMQClient(BaseZMQ):
    """Base class for all ZMQ clients irrespective of sync and async"""

    def __init__(
        self,
        *,
        id: str,
        server_id: str,
        logger: structlog.BoundLogger | None = None,
        **kwargs,
    ) -> None:
        """
        Parameters
        ----------
        id: str
            Unique id of the client to receive messages from the server. Each client connecting to same server must
            still have unique ID.
        server_id: str
            The server id to connect to
        logger: Optional[logging.Logger]
            logger instance to use. If None, a default logger is created.
        kwargs: dict
            Additional arguments:

            - `poll_timeout`: `int`, time in milliseconds to poll the socket for messages, default is 1000 ms.
        """
        super().__init__(id=id, **kwargs)
        if not logger:
            logger = structlog.get_logger()
        self.logger = logger.bind(component="broker", impl=self.__class__.__name__, id=id, server_id=server_id)
        self.server_id = server_id
        self.socket: zmq.Socket | zmq.asyncio.Socket
        self.poller: zmq.Poller | zmq.asyncio.Poller
        self._monitor_socket = None  # type: zmq.Socket | zmq.asyncio.Socket | None
        self._response_cache = dict()
        self._poll_timeout = kwargs.get("poll_timeout", 1000)  # default to 1000 ms
        self._stop = False  # in general, stop any loop with this variable

    @property
    def poll_timeout(self) -> int:
        """socket polling timeout in milliseconds greater than 0"""
        return self._poll_timeout

    @poll_timeout.setter
    def poll_timeout(self, value: int) -> None:
        if not isinstance(value, int) or value <= 0:
            raise ValueError(
                f"polling period must be an integer greater than 0, not {value}. Value is considered in milliseconds."
            )
        self._poll_timeout = value

    def exit(self) -> None:
        try:
            BaseZMQ.exit(self)
            self.poller.unregister(self.socket)
            # TODO - there is some issue here while quitting
            if self._monitor_socket is not None:
                self.poller.unregister(self._monitor_socket)
        except Exception as ex:  # noqa
            # TODO log message and undo noqa
            # raises a weird key error for some reason
            # unable to deregister from poller - <zmq.asyncio.Socket(zmq.PAIR) at 0x1c9e5028830> - KeyError
            # unable to deregister from poller - <zmq.asyncio.Socket(zmq.PAIR) at 0x1c9e502a350> - KeyError
            # unable to deregister from poller - <zmq.asyncio.Socket(zmq.PAIR) at 0x1c9e5080750> - KeyError
            # unable to deregister from poller - <zmq.asyncio.Socket(zmq.PAIR) at 0x1c9e5082430> - KeyError
            self.logger.warning(f"unable to deregister socket from poller - {str(ex)} - {type(ex).__name__}")
        try:
            if self._monitor_socket is not None:
                self._monitor_socket.close(0)
            self.socket.close(0)
            self.logger.info("terminated socket of client")
        except Exception as ex:
            self.logger.warning(
                "could not properly terminate socket or attempted to terminate an already terminated,"
                + f" exception message: {str(ex)}"
            )

    def handled_default_message_types(self, response_message: ResponseMessage) -> bool:
        """
        Handle default cases for the client. This method is called once/supposed to be called when the message
        is received or popped out of the socket.

        Parameters
        ----------
        response_message: List[ResponseMessage]
            the server message to handle
        """
        if len(response_message.byte_array) == 2:  # socket monitor message, not our message
            try:
                if ZMQ_EVENT_MAP[parse_monitor_message(response_message.byte_array)["event"]] == SERVER_DISCONNECTED:
                    raise ConnectionAbortedError("server disconnected")
                return True  # True should simply continue polling
            except RuntimeError:
                self.logger.warning(
                    "message received from monitor socket cannot be deserialized, "
                    + f"assuming its irrelevant and skipping, {response_message.byte_array}"
                )
                return True
        elif len(response_message.byte_array) != ResponseMessage.length:  # either an error or not our message
            self.logger.warning(
                f"received unknown message from server, message length: {len(response_message.byte_array)}, "
                + f"message: {response_message.byte_array}"
            )
            return True
        if response_message.type == HANDSHAKE:
            return True
        return False

    def stop(self) -> None:
        """Stop polling for messages from server. Automatically reset when polling starts again."""
        self._stop = True

Functions

__init__

__init__(*, id: str, server_id: str, logger: BoundLogger | None = None, **kwargs) -> None

Parameters:

Name Type Description Default
id
str

Unique id of the client to receive messages from the server. Each client connecting to same server must still have unique ID.

required
server_id
str

The server id to connect to

required
logger
BoundLogger | None

logger instance to use. If None, a default logger is created.

None
kwargs

Additional arguments:

  • poll_timeout: int, time in milliseconds to poll the socket for messages, default is 1000 ms.
{}
Source code in hololinked/hololinked/core/zmq/brokers.py
def __init__(
    self,
    *,
    id: str,
    server_id: str,
    logger: structlog.BoundLogger | None = None,
    **kwargs,
) -> None:
    """
    Parameters
    ----------
    id: str
        Unique id of the client to receive messages from the server. Each client connecting to same server must
        still have unique ID.
    server_id: str
        The server id to connect to
    logger: Optional[logging.Logger]
        logger instance to use. If None, a default logger is created.
    kwargs: dict
        Additional arguments:

        - `poll_timeout`: `int`, time in milliseconds to poll the socket for messages, default is 1000 ms.
    """
    super().__init__(id=id, **kwargs)
    if not logger:
        logger = structlog.get_logger()
    self.logger = logger.bind(component="broker", impl=self.__class__.__name__, id=id, server_id=server_id)
    self.server_id = server_id
    self.socket: zmq.Socket | zmq.asyncio.Socket
    self.poller: zmq.Poller | zmq.asyncio.Poller
    self._monitor_socket = None  # type: zmq.Socket | zmq.asyncio.Socket | None
    self._response_cache = dict()
    self._poll_timeout = kwargs.get("poll_timeout", 1000)  # default to 1000 ms
    self._stop = False  # in general, stop any loop with this variable

handled_default_message_types

handled_default_message_types(response_message: ResponseMessage) -> bool

Handle default cases for the client. This method is called once/supposed to be called when the message is received or popped out of the socket.

Parameters:

Name Type Description Default
response_message
ResponseMessage

the server message to handle

required
Source code in hololinked/hololinked/core/zmq/brokers.py
def handled_default_message_types(self, response_message: ResponseMessage) -> bool:
    """
    Handle default cases for the client. This method is called once/supposed to be called when the message
    is received or popped out of the socket.

    Parameters
    ----------
    response_message: List[ResponseMessage]
        the server message to handle
    """
    if len(response_message.byte_array) == 2:  # socket monitor message, not our message
        try:
            if ZMQ_EVENT_MAP[parse_monitor_message(response_message.byte_array)["event"]] == SERVER_DISCONNECTED:
                raise ConnectionAbortedError("server disconnected")
            return True  # True should simply continue polling
        except RuntimeError:
            self.logger.warning(
                "message received from monitor socket cannot be deserialized, "
                + f"assuming its irrelevant and skipping, {response_message.byte_array}"
            )
            return True
    elif len(response_message.byte_array) != ResponseMessage.length:  # either an error or not our message
        self.logger.warning(
            f"received unknown message from server, message length: {len(response_message.byte_array)}, "
            + f"message: {response_message.byte_array}"
        )
        return True
    if response_message.type == HANDSHAKE:
        return True
    return False

stop

stop() -> None

Stop polling for messages from server. Automatically reset when polling starts again.

Source code in hololinked/hololinked/core/zmq/brokers.py
def stop(self) -> None:
    """Stop polling for messages from server. Automatically reset when polling starts again."""
    self._stop = True