Skip to content

hololinked.core.zmq.brokers.BaseZMQ

Base class for all ZMQ message brokers. Implements socket creation & logger which is common to all server and client implementations.

Source code in hololinked\core\zmq\brokers.py
class BaseZMQ: 
    """
    Base class for all ZMQ message brokers. Implements socket creation & logger
    which is common to all server and client implementations. 
    """

    def __init__(self, id: str, logger: logging.Logger | None, **kwargs) -> None:
        super().__init__()
        self.id = id # type: str
        self.logger = None
        if not logger:
            logger = get_default_logger('{}|{}'.format(self.__class__.__name__, self.id), 
                                                kwargs.get('log_level', logging.INFO))
        self.logger = logger


    def exit(self) -> None:
        """
        Cleanup method to terminate ZMQ sockets and contexts before quitting. Called by `__del__()`
        automatically. Each subclass server/client should implement their version of exiting if necessary.
        """
        if hasattr(self, 'logger') and not self.logger:
            self.logger = get_default_logger('{}|{}'.format(
                                self.__class__.__name__, self.id), logging.INFO)

    def __del__(self) -> None:
        self.exit()


    @classmethod
    def get_socket(cls, *, id: str, node_type: str, context: zmq.asyncio.Context | zmq.Context, 
                    transport: ZMQ_TRANSPORTS = ZMQ_TRANSPORTS.IPC, socket_type: zmq.SocketType = zmq.ROUTER, 
                    **kwargs) -> typing.Tuple[zmq.Socket, str]:
        """
        Create a socket with certain specifications. Supported ZeroMQ transports are TCP, IPC & INPROC. 
        For IPC sockets, a file is created under TEMP_DIR of global configuration.

        Parameters
        ----------
        id: str
            Each ROUTER socket require unique identity to correctly route the messages. 
        node_type: str
            server or client? i.e. whether to bind (server) or connect (client) as per ZMQ definition
        context: zmq.Context or zmq.asyncio.Context
            ZeroMQ Context object that creates the socket
        transport: Enum
            TCP, IPC or INPROC. Message crafting/passing/routing is transport invariant as suggested by ZMQ.
            Speed relationship - INPROC > IPC > TCP.
        socket_type: zmq.SocketType, default zmq.ROUTER
            Usually a ROUTER socket is implemented for both client-server and peer-to-peer communication
        **kwargs: dict
            - socket_address: str,
                applicable only for TCP socket to find the correct socket to connect 

        Returns
        -------
        socket: zmq.Socket
            created socket
        socket_address: str
            qualified address of the socket created for any transport type

        Raises
        ------
        NotImplementedError
            if transport other than TCP, IPC or INPROC is used
        RuntimeError
            if transport is TCP and a socket connect from client side is requested but a socket address is not supplied
        """

        socket = context.socket(socket_type)
        socket.setsockopt_string(zmq.IDENTITY, id)
        socket_address = kwargs.get('socket_address', None)
        bind = node_type == 'server'
        if transport == ZMQ_TRANSPORTS.IPC or transport.lower() == "ipc":
            if socket_address is None or not socket_address.endswith('.ipc'):
                if not socket_address:
                    split_id = id.split('/')
                elif not socket_address.endswith('.ipc'):
                    split_id = socket_address.split('/')
                socket_dir = os.sep  + os.sep.join(split_id[:-1]) if len(split_id) > 1 else ''
                directory = global_config.TEMP_DIR + socket_dir
                if not os.path.exists(directory):
                    os.makedirs(directory)
                # re-compute for IPC because it looks for a file in a directory
                socket_address = "ipc://{}{}{}.ipc".format(directory, os.sep, split_id[-1])
            if bind:
                socket.bind(socket_address)
            else:
                socket.connect(socket_address)
        elif transport == ZMQ_TRANSPORTS.TCP or transport.lower() == "tcp":
            if bind:
                if not socket_address:
                    for i in range(global_config.TCP_SOCKET_SEARCH_START_PORT, global_config.TCP_SOCKET_SEARCH_END_PORT):
                        socket_address = "tcp://0.0.0.0:{}".format(i)
                        try:
                            socket.bind(socket_address)
                            break 
                        except zmq.error.ZMQError as ex:
                            if not ex.strerror.startswith('Address in use'):
                                raise ex from None
                else:                   
                    socket.bind(socket_address)
            elif socket_address: 
                socket.connect(socket_address)
            else:
                raise RuntimeError(f"Socket address not supplied for TCP connection to identity - {id}")
        elif transport == ZMQ_TRANSPORTS.INPROC or transport.lower() == "inproc":
            # inproc_id = id.replace('/', '_').replace('-', '_')
            if socket_address is None:
                socket_address = f'inproc://{id}'
            elif not socket_address.startswith('inproc://'):
                socket_address = f'inproc://{socket_address}'
            if bind:
                socket.bind(socket_address)
            else:
                socket.connect(socket_address)
        else:
            raise NotImplementedError("transports other than IPC, TCP & INPROC are not implemented now for {}.".format(cls.__name__) + 
                                            f" Given transport {transport}.")

        return socket, socket_address

Functions

exit

exit() -> None

Cleanup method to terminate ZMQ sockets and contexts before quitting. Called by __del__() automatically. Each subclass server/client should implement their version of exiting if necessary.

Source code in hololinked\core\zmq\brokers.py
def exit(self) -> None:
    """
    Cleanup method to terminate ZMQ sockets and contexts before quitting. Called by `__del__()`
    automatically. Each subclass server/client should implement their version of exiting if necessary.
    """
    if hasattr(self, 'logger') and not self.logger:
        self.logger = get_default_logger('{}|{}'.format(
                            self.__class__.__name__, self.id), logging.INFO)

get_socket classmethod

get_socket(*, id: str, node_type: str, context: zmq.asyncio.Context | zmq.Context, transport: ZMQ_TRANSPORTS = ZMQ_TRANSPORTS.IPC, socket_type: zmq.SocketType = zmq.ROUTER, **kwargs) -> typing.Tuple[zmq.Socket, str]

Create a socket with certain specifications. Supported ZeroMQ transports are TCP, IPC & INPROC. For IPC sockets, a file is created under TEMP_DIR of global configuration.

Parameters:

Name Type Description Default

id

str

Each ROUTER socket require unique identity to correctly route the messages.

required

node_type

str

server or client? i.e. whether to bind (server) or connect (client) as per ZMQ definition

required

context

Context | Context

ZeroMQ Context object that creates the socket

required

transport

ZMQ_TRANSPORTS

TCP, IPC or INPROC. Message crafting/passing/routing is transport invariant as suggested by ZMQ. Speed relationship - INPROC > IPC > TCP.

IPC

socket_type

SocketType

Usually a ROUTER socket is implemented for both client-server and peer-to-peer communication

ROUTER

**kwargs

  • socket_address: str, applicable only for TCP socket to find the correct socket to connect
{}

Returns:

Name Type Description
socket Socket

created socket

socket_address str

qualified address of the socket created for any transport type

Raises:

Type Description
NotImplementedError

if transport other than TCP, IPC or INPROC is used

RuntimeError

if transport is TCP and a socket connect from client side is requested but a socket address is not supplied

Source code in hololinked\core\zmq\brokers.py
@classmethod
def get_socket(cls, *, id: str, node_type: str, context: zmq.asyncio.Context | zmq.Context, 
                transport: ZMQ_TRANSPORTS = ZMQ_TRANSPORTS.IPC, socket_type: zmq.SocketType = zmq.ROUTER, 
                **kwargs) -> typing.Tuple[zmq.Socket, str]:
    """
    Create a socket with certain specifications. Supported ZeroMQ transports are TCP, IPC & INPROC. 
    For IPC sockets, a file is created under TEMP_DIR of global configuration.

    Parameters
    ----------
    id: str
        Each ROUTER socket require unique identity to correctly route the messages. 
    node_type: str
        server or client? i.e. whether to bind (server) or connect (client) as per ZMQ definition
    context: zmq.Context or zmq.asyncio.Context
        ZeroMQ Context object that creates the socket
    transport: Enum
        TCP, IPC or INPROC. Message crafting/passing/routing is transport invariant as suggested by ZMQ.
        Speed relationship - INPROC > IPC > TCP.
    socket_type: zmq.SocketType, default zmq.ROUTER
        Usually a ROUTER socket is implemented for both client-server and peer-to-peer communication
    **kwargs: dict
        - socket_address: str,
            applicable only for TCP socket to find the correct socket to connect 

    Returns
    -------
    socket: zmq.Socket
        created socket
    socket_address: str
        qualified address of the socket created for any transport type

    Raises
    ------
    NotImplementedError
        if transport other than TCP, IPC or INPROC is used
    RuntimeError
        if transport is TCP and a socket connect from client side is requested but a socket address is not supplied
    """

    socket = context.socket(socket_type)
    socket.setsockopt_string(zmq.IDENTITY, id)
    socket_address = kwargs.get('socket_address', None)
    bind = node_type == 'server'
    if transport == ZMQ_TRANSPORTS.IPC or transport.lower() == "ipc":
        if socket_address is None or not socket_address.endswith('.ipc'):
            if not socket_address:
                split_id = id.split('/')
            elif not socket_address.endswith('.ipc'):
                split_id = socket_address.split('/')
            socket_dir = os.sep  + os.sep.join(split_id[:-1]) if len(split_id) > 1 else ''
            directory = global_config.TEMP_DIR + socket_dir
            if not os.path.exists(directory):
                os.makedirs(directory)
            # re-compute for IPC because it looks for a file in a directory
            socket_address = "ipc://{}{}{}.ipc".format(directory, os.sep, split_id[-1])
        if bind:
            socket.bind(socket_address)
        else:
            socket.connect(socket_address)
    elif transport == ZMQ_TRANSPORTS.TCP or transport.lower() == "tcp":
        if bind:
            if not socket_address:
                for i in range(global_config.TCP_SOCKET_SEARCH_START_PORT, global_config.TCP_SOCKET_SEARCH_END_PORT):
                    socket_address = "tcp://0.0.0.0:{}".format(i)
                    try:
                        socket.bind(socket_address)
                        break 
                    except zmq.error.ZMQError as ex:
                        if not ex.strerror.startswith('Address in use'):
                            raise ex from None
            else:                   
                socket.bind(socket_address)
        elif socket_address: 
            socket.connect(socket_address)
        else:
            raise RuntimeError(f"Socket address not supplied for TCP connection to identity - {id}")
    elif transport == ZMQ_TRANSPORTS.INPROC or transport.lower() == "inproc":
        # inproc_id = id.replace('/', '_').replace('-', '_')
        if socket_address is None:
            socket_address = f'inproc://{id}'
        elif not socket_address.startswith('inproc://'):
            socket_address = f'inproc://{socket_address}'
        if bind:
            socket.bind(socket_address)
        else:
            socket.connect(socket_address)
    else:
        raise NotImplementedError("transports other than IPC, TCP & INPROC are not implemented now for {}.".format(cls.__name__) + 
                                        f" Given transport {transport}.")

    return socket, socket_address

hololinked.core.zmq.brokers.BaseAsyncZMQ

Bases: BaseZMQ

Base class for all async ZMQ servers and clients.

Source code in hololinked\core\zmq\brokers.py
class BaseAsyncZMQ(BaseZMQ):
    """
    Base class for all async ZMQ servers and clients.
    """
    # init of this class must always take empty arguments due to inheritance structure

    def create_socket(self, *, id: str, node_type: str = 'server', context: zmq.asyncio.Context | None = None, 
                        transport: str = "IPC", socket_type: zmq.SocketType = zmq.ROUTER, **kwargs) -> None:
        """
        Overloads ``create_socket()`` to create, bind/connect an async socket. A async context is created if none is supplied. 
        """
        if context and not isinstance(context, zmq.asyncio.Context):
            raise TypeError("async ZMQ message broker accepts only async ZMQ context. supplied type {}".format(type(context)))
        self.context = context or zmq.asyncio.Context()
        self.socket, self.socket_address = BaseZMQ.get_socket(id=id, node_type=node_type, context=self.context, 
                                                transport=transport, socket_type=socket_type, **kwargs)
        self.logger.info("created socket {} with address {} & identity {} and {}".format(get_socket_type_name(socket_type), 
                                                        self.socket_address, id, "bound" if node_type == 'server' else "connected"))

Functions

create_socket

create_socket(*, id: str, node_type: str = 'server', context: zmq.asyncio.Context | None = None, transport: str = 'IPC', socket_type: zmq.SocketType = zmq.ROUTER, **kwargs) -> None

Overloads create_socket() to create, bind/connect an async socket. A async context is created if none is supplied.

Source code in hololinked\core\zmq\brokers.py
def create_socket(self, *, id: str, node_type: str = 'server', context: zmq.asyncio.Context | None = None, 
                    transport: str = "IPC", socket_type: zmq.SocketType = zmq.ROUTER, **kwargs) -> None:
    """
    Overloads ``create_socket()`` to create, bind/connect an async socket. A async context is created if none is supplied. 
    """
    if context and not isinstance(context, zmq.asyncio.Context):
        raise TypeError("async ZMQ message broker accepts only async ZMQ context. supplied type {}".format(type(context)))
    self.context = context or zmq.asyncio.Context()
    self.socket, self.socket_address = BaseZMQ.get_socket(id=id, node_type=node_type, context=self.context, 
                                            transport=transport, socket_type=socket_type, **kwargs)
    self.logger.info("created socket {} with address {} & identity {} and {}".format(get_socket_type_name(socket_type), 
                                                    self.socket_address, id, "bound" if node_type == 'server' else "connected"))

hololinked.core.zmq.brokers.BaseSyncZMQ

Bases: BaseZMQ

Base class for all sync ZMQ servers and clients.

Source code in hololinked\core\zmq\brokers.py
class BaseSyncZMQ(BaseZMQ):
    """
    Base class for all sync ZMQ servers and clients.
    """
    # init of this class must always take empty arguments due to inheritance structure

    def create_socket(self, *, id: str, node_type: str = 'server', context: zmq.Context | None = None, 
                    transport: str = "IPC", socket_type: zmq.SocketType = zmq.ROUTER, **kwargs) -> None:
        """
        Overloads ``create_socket()`` to create, bind/connect a synchronous socket. A synchronous context is created 
        if none is supplied. 
        """
        if context: 
            if not isinstance(context, zmq.Context):
                raise TypeError("sync ZMQ message broker accepts only sync ZMQ context. supplied type {}".format(type(context)))
            if isinstance(context, zmq.asyncio.Context):
                raise TypeError("sync ZMQ message broker accepts only sync ZMQ context. supplied type {}".format(type(context)))
        self.context = context or zmq.Context()
        self.socket, self.socket_address = BaseZMQ.get_socket(id=id, node_type=node_type, context=self.context, 
                                                transport=transport, socket_type=socket_type, **kwargs)
        self.logger.info("created socket {} with address {} & identity {} and {}".format(get_socket_type_name(socket_type), 
                                                        self.socket_address, id, "bound" if node_type == 'server' else "connected"))

Functions

create_socket

create_socket(*, id: str, node_type: str = 'server', context: zmq.Context | None = None, transport: str = 'IPC', socket_type: zmq.SocketType = zmq.ROUTER, **kwargs) -> None

Overloads create_socket() to create, bind/connect a synchronous socket. A synchronous context is created if none is supplied.

Source code in hololinked\core\zmq\brokers.py
def create_socket(self, *, id: str, node_type: str = 'server', context: zmq.Context | None = None, 
                transport: str = "IPC", socket_type: zmq.SocketType = zmq.ROUTER, **kwargs) -> None:
    """
    Overloads ``create_socket()`` to create, bind/connect a synchronous socket. A synchronous context is created 
    if none is supplied. 
    """
    if context: 
        if not isinstance(context, zmq.Context):
            raise TypeError("sync ZMQ message broker accepts only sync ZMQ context. supplied type {}".format(type(context)))
        if isinstance(context, zmq.asyncio.Context):
            raise TypeError("sync ZMQ message broker accepts only sync ZMQ context. supplied type {}".format(type(context)))
    self.context = context or zmq.Context()
    self.socket, self.socket_address = BaseZMQ.get_socket(id=id, node_type=node_type, context=self.context, 
                                            transport=transport, socket_type=socket_type, **kwargs)
    self.logger.info("created socket {} with address {} & identity {} and {}".format(get_socket_type_name(socket_type), 
                                                    self.socket_address, id, "bound" if node_type == 'server' else "connected"))

hololinked.core.zmq.brokers.BaseZMQServer

Bases: BaseZMQ

Source code in hololinked\core\zmq\brokers.py
class BaseZMQServer(BaseZMQ):


    def handshake(self, request_message: RequestMessage) -> None:
        """
        Pass a handshake message to client. Absolutely mandatory to ensure initial messages do not get lost 
        because of ZMQ's very tiny but significant initial delay after creating socket.

        Parameters
        ----------
        request_message: List[bytes]
            the client message for which the handshake is being sent

        Returns
        -------
        None 
        """
        run_callable_somehow(self._handshake(request_message))

    def _handshake(self, request_message: RequestMessage) -> None:
        raise NotImplementedError(f"handshake cannot be handled - implement _handshake in {self.__class__} to handshake.")


    def handle_invalid_message(self, request_message: RequestMessage, exception: Exception) -> None:
        """
        Pass an invalid message to the client when an exception occurred while parsing the message from the client 
        (``parse_client_message()``)

        Parameters
        ----------
        request_message: List[bytes]
            the client message parsing which the exception occurred
        exception: Exception
            exception object raised

        Returns
        -------
        None
        """
        run_callable_somehow(self._handle_invalid_message(request_message, exception))

    def _handle_invalid_message(self, message: RequestMessage, exception: Exception) -> None:
        raise NotImplementedError("invalid message cannot be handled" +
                f" - implement _handle_invalid_message in {self.__class__} to handle invalid messages.")


    def handle_timeout(self, request_message: RequestMessage) -> None:
        """
        Pass timeout message to the client when the operation could not be executed within specified timeouts

        Parameters
        ----------
        request_message: List[bytes]
            the client message which could not executed within the specified timeout. timeout value is 
            generally specified within the execution context values.

        Returns
        -------
        None
        """
        run_callable_somehow(self._handle_timeout(request_message))

    def _handle_timeout(self, request_message: RequestMessage) -> None:
        raise NotImplementedError("timeouts cannot be handled ",
                f"- implement _handle_timeout in {self.__class__} to handle timeout.")


    def handle_error_message(self, request_message: RequestMessage, exception: Exception) -> None:
        """
        Pass an exception message to the client when an exception occurred while executing the operation

        Parameters
        ----------
        request_message: List[bytes]
            the client message for which the exception occurred
        exception: Exception
            exception object raised

        Returns
        -------
        None
        """
        run_callable_somehow(self._handle_error_message(request_message, exception))

    def _handle_error_message(self, request_message: RequestMessage, exception: Exception) -> None:
        raise NotImplementedError("exceptions cannot be handled ",
                f"- implement _handle_error_message in {self.__class__} to handle exceptions.")


    def handled_default_message_types(self, request_message: RequestMessage) -> bool:
        """
        Handle default cases for the server. This method is called when the message type is not recognized 
        or the message is not a valid message. 

        Parameters
        ----------
        request_message: List[bytes]
            the client message which could not executed within the specified timeout. timeout value is 
            generally specified within the execution context values.
        receiver_socket: zmq.Socket
            the socket to which the response must be sent. 

        Returns
        -------
        None
        """
        if request_message.type == HANDSHAKE:
            self.handshake(request_message)
            return True
        elif request_message.type == EXIT:
            # self.send response with message type EXIT
            raise BreakLoop(f"exit message received from {request_message.sender_id} with msg-ID {request_message.id}")
        return False 

Functions

handle_error_message

handle_error_message(request_message: RequestMessage, exception: Exception) -> None

Pass an exception message to the client when an exception occurred while executing the operation

Parameters:

Name Type Description Default

request_message

RequestMessage

the client message for which the exception occurred

required

exception

Exception

exception object raised

required

Returns:

Type Description
None
Source code in hololinked\core\zmq\brokers.py
def handle_error_message(self, request_message: RequestMessage, exception: Exception) -> None:
    """
    Pass an exception message to the client when an exception occurred while executing the operation

    Parameters
    ----------
    request_message: List[bytes]
        the client message for which the exception occurred
    exception: Exception
        exception object raised

    Returns
    -------
    None
    """
    run_callable_somehow(self._handle_error_message(request_message, exception))

handle_invalid_message

handle_invalid_message(request_message: RequestMessage, exception: Exception) -> None

Pass an invalid message to the client when an exception occurred while parsing the message from the client (parse_client_message())

Parameters:

Name Type Description Default

request_message

RequestMessage

the client message parsing which the exception occurred

required

exception

Exception

exception object raised

required

Returns:

Type Description
None
Source code in hololinked\core\zmq\brokers.py
def handle_invalid_message(self, request_message: RequestMessage, exception: Exception) -> None:
    """
    Pass an invalid message to the client when an exception occurred while parsing the message from the client 
    (``parse_client_message()``)

    Parameters
    ----------
    request_message: List[bytes]
        the client message parsing which the exception occurred
    exception: Exception
        exception object raised

    Returns
    -------
    None
    """
    run_callable_somehow(self._handle_invalid_message(request_message, exception))

handle_timeout

handle_timeout(request_message: RequestMessage) -> None

Pass timeout message to the client when the operation could not be executed within specified timeouts

Parameters:

Name Type Description Default

request_message

RequestMessage

the client message which could not executed within the specified timeout. timeout value is generally specified within the execution context values.

required

Returns:

Type Description
None
Source code in hololinked\core\zmq\brokers.py
def handle_timeout(self, request_message: RequestMessage) -> None:
    """
    Pass timeout message to the client when the operation could not be executed within specified timeouts

    Parameters
    ----------
    request_message: List[bytes]
        the client message which could not executed within the specified timeout. timeout value is 
        generally specified within the execution context values.

    Returns
    -------
    None
    """
    run_callable_somehow(self._handle_timeout(request_message))

handled_default_message_types

handled_default_message_types(request_message: RequestMessage) -> bool

Handle default cases for the server. This method is called when the message type is not recognized or the message is not a valid message.

Parameters:

Name Type Description Default

request_message

RequestMessage

the client message which could not executed within the specified timeout. timeout value is generally specified within the execution context values.

required

receiver_socket

the socket to which the response must be sent.

required

Returns:

Type Description
None
Source code in hololinked\core\zmq\brokers.py
def handled_default_message_types(self, request_message: RequestMessage) -> bool:
    """
    Handle default cases for the server. This method is called when the message type is not recognized 
    or the message is not a valid message. 

    Parameters
    ----------
    request_message: List[bytes]
        the client message which could not executed within the specified timeout. timeout value is 
        generally specified within the execution context values.
    receiver_socket: zmq.Socket
        the socket to which the response must be sent. 

    Returns
    -------
    None
    """
    if request_message.type == HANDSHAKE:
        self.handshake(request_message)
        return True
    elif request_message.type == EXIT:
        # self.send response with message type EXIT
        raise BreakLoop(f"exit message received from {request_message.sender_id} with msg-ID {request_message.id}")
    return False 

handshake

handshake(request_message: RequestMessage) -> None

Pass a handshake message to client. Absolutely mandatory to ensure initial messages do not get lost because of ZMQ's very tiny but significant initial delay after creating socket.

Parameters:

Name Type Description Default

request_message

RequestMessage

the client message for which the handshake is being sent

required

Returns:

Type Description
None
Source code in hololinked\core\zmq\brokers.py
def handshake(self, request_message: RequestMessage) -> None:
    """
    Pass a handshake message to client. Absolutely mandatory to ensure initial messages do not get lost 
    because of ZMQ's very tiny but significant initial delay after creating socket.

    Parameters
    ----------
    request_message: List[bytes]
        the client message for which the handshake is being sent

    Returns
    -------
    None 
    """
    run_callable_somehow(self._handshake(request_message))

hololinked.core.zmq.brokers.BaseZMQClient

Bases: BaseZMQ

Base class for all ZMQ clients irrespective of sync and async.

server's response to client ::

[address, bytes(), server_type, message_type, message id, data, pre encoded data]|br|
[   0   ,   1    ,    2       ,      3      ,      4    ,  5  ,       6         ]|br|

Parameters:

Name Type Description Default

server_id

str

The instance name of the server (or Thing)

required

client_type

ZMQ or HTTP Server

required

server_type

server type metadata

required

zmq_serializer

custom implementation of ZMQ serializer if necessary

required

http_serializer

custom implementation of JSON serializer if necessary

required
Source code in hololinked\core\zmq\brokers.py
class BaseZMQClient(BaseZMQ):
    """    
    Base class for all ZMQ clients irrespective of sync and async.

    server's response to client
    ::

        [address, bytes(), server_type, message_type, message id, data, pre encoded data]|br|
        [   0   ,   1    ,    2       ,      3      ,      4    ,  5  ,       6         ]|br|

    Parameters
    ----------
    server_id: str
        The instance name of the server (or ``Thing``)
    client_type: str
        ZMQ or HTTP Server
    server_type: str    
        server type metadata
    zmq_serializer: BaseSerializer
        custom implementation of ZMQ serializer if necessary
    http_serializer: JSONSerializer
        custom implementation of JSON serializer if necessary
    """

    def __init__(self, *,
                id: str,
                server_id: str, 
                logger: typing.Optional[logging.Logger] = None,
                **kwargs
            ) -> None:
        super().__init__(id=id, logger=logger, **kwargs)
        self.server_id = server_id 
        self._monitor_socket = None
        self._response_cache = dict()
        self._terminate_context = False
        self.poller: zmq.Poller | zmq.asyncio.Poller 
        self.socket: zmq.Socket | zmq.asyncio.Socket
        self._poll_timeout = kwargs.get('poll_timeout', 1000)  # default to 1000 ms

    @property
    def poll_timeout(self) -> int:
        """
        socket polling timeout in milliseconds greater than 0.
        """
        return self._poll_timeout

    @poll_timeout.setter
    def poll_timeout(self, value: int) -> None:
        if not isinstance(value, int) or value <= 0:
            raise ValueError(f"polling period must be an integer greater than 0, not {value}. Value is considered in milliseconds.")
        self._poll_timeout = value

    def exit(self) -> None:
        BaseZMQ.exit(self)
        try:
            self.poller.unregister(self.socket)
            # TODO - there is some issue here while quitting 
            # print("poller exception did not occur 1")
            if self._monitor_socket is not None:
                # print("poller exception did not occur 2")
                self.poller.unregister(self._monitor_socket)
                # print("poller exception did not occur 3")
        except Exception as ex:
            self.logger.warning(f"unable to deregister from poller - {str(ex)}")

        try:
            if self._monitor_socket is not None:
                self._monitor_socket.close(0)
            self.socket.close(0)
            self.logger.info("terminated socket of server '{}' of type '{}'".format(self.id, self.__class__))
        except Exception as ex:
            self.logger.warning("could not properly terminate socket or attempted to terminate an already terminated " +
                             f"socket '{self.id}' of type '{self.__class__}'. Exception message: {str(ex)}")
        try:
            if self._terminate_context:
                self.context.term()
                self.logger.info("terminated context of socket '{}' of type '{}'".format(self.id, self.__class__))
        except Exception as ex:
            self.logger.warning("could not properly terminate context or attempted to terminate an already terminated context" +
                            "'{}'. Exception message: {}".format(self.id, str(ex)))