Skip to content

hololinked.core.zmq.brokers.MessageMappedZMQClientPool

Bases: BaseZMQClient

Pool of async ZMQ clients, to be primarily used within protocol bindings where multiple things may be served. Use message ID to track responses.

Source code in hololinked/hololinked/core/zmq/brokers.py
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
class MessageMappedZMQClientPool(BaseZMQClient):
    """
    Pool of async ZMQ clients, to be primarily used within protocol bindings where multiple things may be served.
    Use message ID to track responses.
    """

    def __init__(
        self,
        id: str,
        client_ids: list[str],
        server_ids: list[str],
        handshake: bool = True,
        context: zmq.asyncio.Context = None,
        access_point: str = ZMQ_TRANSPORTS.IPC,
        poll_timeout: int = 25,
        **kwargs,
    ) -> None:
        """
        Parameters
        ----------
        id: str
            ID of the pool, must be unique.
        client_ids: List[str]
            list of id's of clients in the pool, must be unique. Clients are created with these ID's.
            For pre-existing clients, use `register` method instead and leave this list empty.
        server_ids: List[str]
            list of id's of servers to connect to, must be same length as client_ids. For pre-existing clients that
            are connected to servers, use `register` method instead and leave this list empty.
        handshake: bool
            when true, handshake with the server first before allowing first message and block until that handshake was
            accomplished.
        context: zmq.asyncio.Context
            ZMQ context to use, if None, a global context is used.
        access_point: Enum | str, default ZMQ_TRANSPORTS.IPC
            Default access point for all clients in the pool, usually not helpful except `INPROC` transport.
            Use `TCP` or `tcp://<host>:<port>` for network access, `IPC` for multi-process applications,
            and `INPROC` for multi-threaded applications.
        poll_timeout: int
            socket polling timeout in milliseconds greater than 0.
        **kwargs:
            Additional arguments:

            - `logger`: `logging.Logger`, logger instance to use. If None, a default logger is created.
        """
        super().__init__(id=id, server_id=None, **kwargs)
        if len(client_ids) != len(server_ids):
            raise ValueError("client_ids and server_ids must have same length")
        # this class does not call create_socket method
        self.context = context or global_config.zmq_context()
        self.pool = dict()  # type: dict[str, AsyncZMQClient]
        self.poller = zmq.asyncio.Poller()
        for client_id, server_id in zip(client_ids, server_ids):
            client = AsyncZMQClient(
                id=client_id,
                server_id=server_id,
                handshake=handshake,
                context=self.context,
                access_point=access_point,
                logger=self.logger,
            )
            self.register(client)
        # Both the client pool as well as the individual client get their serializers and client_types
        # This is required to implement pool level sending and receiving messages like polling of pool of sockets
        self.event_pool = AsyncioEventPool(len(server_ids))
        self.events_map = dict()  # type: dict[bytes, asyncio.Event]
        self.message_map = dict()
        self.cancelled_messages = []
        self.poll_timeout = poll_timeout
        self.stop_poll = False
        self._thing_to_client_map = dict()  # type: dict[str, AsyncZMQClient]
        self._client_to_thing_map = dict()  # type: dict[str, str]

    def create_new(self, id: str, server_id: str, access_point: str = ZMQ_TRANSPORTS.IPC) -> None:
        """
        Create new server with specified transport & add it to the pool.
        Other arguments are taken from pool default specifications.

        Parameters
        ----------
        id: str
            id of the new client to be created
        server_id: str
            id of the server to connect to
        access_point: str
            transport method used by the server - `IPC`, `INPROC` or `tcp://<host>:<port>`
        """
        if server_id not in self.pool.keys():
            client = AsyncZMQClient(
                id=id,
                server_id=server_id,
                handshake=True,
                context=self.context,
                access_point=access_point,
                logger=self.logger,
            )
            client._monitor_socket = client.socket.get_monitor_socket()
            self.poller.register(client._monitor_socket, zmq.POLLIN)
            self.pool[id] = client
        else:
            raise ValueError(f"client for instance name '{server_id}' already present in pool")

    def register(self, client: AsyncZMQClient, thing_id: str | None = None) -> None:
        """
        Register a pre-existing client with the pool.

        Parameters
        ----------
        client: AsyncZMQClient
            client to be registered
        thing_id: Optional, str
            thing_id to which this client is mapped, especially when the client is connected to a server that serves
            only one `Thing`.
        """
        if not isinstance(client, AsyncZMQClient):
            raise TypeError(
                "registration possible for clients only subclass of AsyncZMQClient." + f" Given type {type(client)}"
            )
        if client.id not in self.pool:
            self.pool[client.id] = client
            self.poller.register(client.socket, zmq.POLLIN)
            self.poller.register(client._monitor_socket, zmq.POLLIN)
        elif self.pool[client.id] != client:
            warnings.warn(
                f"client with id '{client.id}' already present in pool. Replacing with {client}",
                category=UserWarning,
            )
        if thing_id:
            self._thing_to_client_map[thing_id] = client.id
            self._client_to_thing_map[client.id] = thing_id

    def get_client_id_from_thing_id(self, thing_id: str) -> str:
        """
        Retrieve client mapped to a `thing_id`. The value must have been previously set using `register` method.

        Parameters
        ----------
        thing_id: str
            the `thing_id` for which the client is to be retrieved
        """
        if thing_id not in self._thing_to_client_map:
            raise ValueError(f"client for thing_id '{thing_id}' not present in pool")
        return self._thing_to_client_map.get(thing_id, None)

    def get_thing_id_from_client_id(self, client_id: str) -> str:
        """
        Retrieve `thing_id` mapped to a client. The value must have been previously set using `register` method.

        Parameters
        ----------
        client_id: str
            the client id for which the `thing_id` is to be retrieved
        """
        if client_id not in self._client_to_thing_map:
            raise ValueError(f"thing_id for client_id '{client_id}' not present in pool")
        return self._client_to_thing_map.get(client_id, None)

    def get_client_protocol(self, thing_id: str) -> str:
        """
        Retrieve protocol used by a client in the pool.

        Parameters
        ----------
        client_id: str
            the client id for which the protocol is to be retrieved
        """
        if thing_id not in self._thing_to_client_map:
            raise ValueError(f"client_id '{thing_id}' not present in pool")
        client_id = self._thing_to_client_map[thing_id]
        return self.pool[client_id].socket_address.split("://")[0].upper()

    @property
    def poll_timeout(self) -> int:
        """socket polling timeout in milliseconds greater than 0"""
        return self._poll_timeout

    @poll_timeout.setter
    def poll_timeout(self, value) -> None:
        if not isinstance(value, int) or value < 0:
            raise ValueError(
                "polling period must be an integer greater than 0, not {}. Value is considered in milliseconds".format(
                    value
                )
            )
        self._poll_timeout = value

    async def handshake_complete(self) -> None:
        """wait for handshake to complete for all clients in the pool"""
        for client in self.pool.values():
            await client.handshake_complete()  # sufficient to wait serially

    def handshake(self, timeout: int | None = 60000) -> None:
        """
        schedules handshake coroutines for each client in the running event loop
        or completes handshake synchronously if no event loop is running.
        Use `handshake_complete()` async method to check if handshake is complete.

        Parameters
        ----------
        timeout: float | int
            timeout in milliseconds to wait for handshake to complete. If handshake does not complete within this time,
            a `ConnectionError` is raised. If None, wait indefinitely until handshake completes.
        """
        for client in self.pool.values():
            client.handshake(timeout)

    async def poll_responses(self) -> None:
        """
        Poll for replies from server.This method should be independently started in the event loop by calling `start_polling()`.
        Sending message requests and retrieving a response is still carried out by other methods.
        Do not duplicate this method call as there are no checks how many pollers exist and messages will become malformed
        if multiple pollers are active.
        """
        self.logger.info("client polling started for sockets for {}".format(list(self.pool.keys())))
        self.stop_poll = False
        event_loop = asyncio.get_event_loop()
        while not self.stop_poll:
            sockets = await self.poller.poll(self.poll_timeout)  # type hints dont work in this line
            for socket, _ in sockets:
                while True:
                    try:
                        raw_response = await socket.recv_multipart(zmq.NOBLOCK)
                        response_message = ResponseMessage(raw_response)
                    except zmq.Again:
                        # errors in handle_message should reach the client.
                        break
                    except ConnectionAbortedError:
                        for client in self.pool.values():
                            if client.socket.get_monitor_socket() == socket:
                                self.poller.unregister(client.socket)  # leave the monitor in the pool
                                client.handshake(timeout=None)
                                self.logger.error(
                                    f"client {client.id} disconnected from server."
                                    + " Unregistering from poller temporarily until server comes back."
                                )
                                break
                    else:
                        if self.handled_default_message_types(response_message):
                            continue
                        message_id = response_message.id
                        self.logger.debug(
                            "received response from server",
                            sender_id=response_message.sender_id,
                            receiver_id=response_message.receiver_id,
                            msg_id=response_message.id,
                            message_type=response_message.type,
                        )
                        if message_id in self.cancelled_messages:
                            self.cancelled_messages.remove(message_id)
                            self.logger.debug("dropping a cancelled message", msg_id=message_id)
                            continue
                        self.message_map[message_id] = response_message
                        event = self.events_map.get(message_id, None)
                        if event:
                            event.set()
                        else:
                            event_loop.create_task(self._resolve_response(message_id, response_message))

    async def _resolve_response(self, message_id: str, data: Any) -> None:
        """
        This method is called when there is no asyncio Event available for a message ID. This can happen only
        when the server replied before the client created a asyncio.Event object. check `async_execute()` for details.

        Parameters
        ----------
        message_id: bytes
            the message for which the event was not created
        data: ResponseMessage
            the response message received from server
        """
        max_number_of_retries = 100
        for i in range(max_number_of_retries):
            await asyncio.sleep(0.025)
            try:
                event = self.events_map[message_id]
            except KeyError:
                if message_id in self.cancelled_messages:
                    # Only for safety, likely should never reach here
                    self.cancelled_messages.remove(message_id)
                    self.logger.debug("message cancelled, not retrieving response", msg_id=message_id)
                    return
                if i >= max_number_of_retries - 1:
                    self.logger.error("unknown message id without corresponding event object", msg_id=message_id)
                    return
            else:
                self.message_map[message_id] = data
                event.set()
                break

    def assert_client_ready(self, client: AsyncZMQClient):
        if not client._handshake_event.is_set():
            raise ConnectionAbortedError(f"{client.id} is currently not alive")
        if client.socket not in self.poller._map:
            raise ConnectionError(
                "handshake complete, server is alive but client socket not yet ready to be polled."
                + "Application using MessageMappedClientPool should register the socket manually for polling."
                + "If using hololinked.server.HTTPServer, socket is waiting until HTTP Server updates its "
                "routing logic as the server has just now come alive, please try again soon."
            )

    async def async_send_request(
        self,
        thing_id: str,
        objekt: str,
        operation: str,
        payload: SerializableData = SerializableNone,
        preserialized_payload: PreserializedData = PreserializedEmptyByte,
        server_execution_context: ServerExecutionContext = default_server_execution_context,
        thing_execution_context: ThingExecutionContext = default_thing_execution_context,
    ) -> str:
        """
        send request message to server.

        Parameters
        ----------
        client_id: str
            id of the client in the pool to be used to send the message
        thing_id: str
            `id` of the `Thing` on which an operation is to be performed
        objekt: str
            name of property, action or event (usually only property or action)
        operation: str
            operation to be performed, like `readproperty`, `writeproperty`, `invokeaction` etc.
        payload: SerializableData
            serializable data to be sent as payload
        preserialized_payload: PreserializedData
            pre-encoded data to be sent as payload, generally used for large or custom data that is already serialized
        server_execution_context: dict[str, Any]
            Specify server level execution context like `invokationTimeout`, `executionTimeout`, `oneway` operation etc.
        thing_execution_context: dict[str, Any]
            Specify thing level execution context like `fetchExecutionLogs` etc.

        Returns
        -------
        bytes
            a message id in bytes
        """
        client_id = self.get_client_id_from_thing_id(thing_id)
        self.assert_client_ready(self.pool[client_id])
        message_id = await self.pool[client_id].async_send_request(
            thing_id=thing_id,
            objekt=objekt,
            operation=operation,
            payload=payload,
            preserialized_payload=preserialized_payload,
            server_execution_context=server_execution_context,
            thing_execution_context=thing_execution_context,
        )
        event = self.event_pool.pop()
        self.events_map[message_id] = event
        return message_id

    async def async_recv_response(
        self, thing_id: str, message_id: bytes, timeout: float | int | None = None
    ) -> ResponseMessage:
        """
        Receive response for specified message ID.

        Parameters
        ----------
        client_id: str
            id of the client in the pool to be used to receive the message
        message_id: bytes
            the message id for which response needs to be fetched
        timeout: float | int | None
            Client side timeout, not the same as timeout passed to server. Recommended to be None in general cases.
            The reply is dropped if this timeout occurs. Usually the server always replies when either of invokation or
            execution timeout occurs. This timeout ensures that the protocol binding does not wait indefinitely for a message
            that may never arrive.

        Returns
        -------
        response: ResponseMessage
            response message from server corresponding to the message id

        Raises
        ------
        ValueError
            if supplied message id is not valid
        TimeoutError
            if timeout is not None and response did not arrive
        """
        try:
            client_id = self.get_client_id_from_thing_id(thing_id)
            event = self.events_map[message_id]
        except KeyError:
            raise KeyError(f"message id {message_id} unknown.") from None
        while True:
            try:
                await asyncio.wait_for(event.wait(), timeout)
                # default 5 seconds because we want to check if server is also dead
                if event.is_set():  # i.e. if timeout is not None, check if event is set
                    break
                self.assert_client_ready(self.pool[client_id])
            except TimeoutError:
                self.cancelled_messages.append(message_id)
                self.logger.error("message added to list of cancelled messages", msg_id=message_id)
                raise TimeoutError(f"Execution not completed within {timeout} seconds") from None
        self.events_map.pop(message_id)
        self.event_pool.completed(event)
        response = self.message_map.pop(message_id)
        return response

    async def async_execute(
        self,
        thing_id: str,
        objekt: str,
        operation: str,
        payload: SerializableData = SerializableNone,
        preserialized_payload: PreserializedData = PreserializedEmptyByte,
        server_execution_context: ServerExecutionContext = default_server_execution_context,
        thing_execution_context: ThingExecutionContext = default_thing_execution_context,
    ) -> ResponseMessage:
        """
        send an operation and receive the response for it.

        Parameters
        ----------
        client_id: str
            id of the client in the pool to be used to send the message
        thing_id: str
            `id` of the `Thing` on which an operation is to be performed
        objekt: str
            name of property, action or event (usually only property or action)
        operation: str
            operation to be performed, like `readproperty`, `writeproperty`, `invokeaction` etc.
        payload: SerializableData
            serializable data to be sent as payload
        preserialized_payload: PreserializedData
            pre-encoded data to be sent as payload, generally used for large or custom data that is already serialized
        server_execution_context: dict[str, Any]
            Specify server level execution context like `invokationTimeout`, `executionTimeout`, `oneway` operation etc.
        thing_execution_context: dict[str, Any]
            Specify thing level execution context like `fetchExecutionLogs` etc.

        Returns
        -------
        ResponseMessage
            response message from server after completing the operation
        """
        message_id = await self.async_send_request(
            thing_id=thing_id,
            objekt=objekt,
            operation=operation,
            payload=payload,
            preserialized_payload=preserialized_payload,
            server_execution_context=server_execution_context,
            thing_execution_context=thing_execution_context,
        )
        return await self.async_recv_response(
            thing_id=thing_id,
            message_id=message_id,
        )

    def start_polling(self) -> None:
        """register the server message polling loop in the asyncio event loop"""
        get_current_async_loop().create_task(self.poll_responses())

    def stop_polling(self):
        """
        stop polling for replies from server
        """
        self.stop_poll = True
        for client in self.pool.values():
            client.stop()

    async def async_execute_in_all(
        self,
        objekt: str,
        operation: str,
        payload: SerializableData = SerializableNone,
        preserialized_payload: PreserializedData = PreserializedEmptyByte,
        thing_ids: list[str] | None = None,
        server_execution_context: ServerExecutionContext = default_server_execution_context,
        thing_execution_context: ThingExecutionContext = default_thing_execution_context,
    ) -> dict[str, ResponseMessage]:
        if not thing_ids:
            thing_ids = self._client_to_thing_map.values()

        gathered_replies = await asyncio.gather(
            *[
                self.async_execute(
                    thing_id=id,
                    objekt=objekt,
                    operation=operation,
                    payload=payload,
                    preserialized_payload=preserialized_payload,
                    server_execution_context=server_execution_context,
                    thing_execution_context=thing_execution_context,
                )
                for id in thing_ids
            ]
        )
        replies = dict()
        for id, response in zip(thing_ids, gathered_replies):
            replies[id] = response
        return replies

    async def async_execute_in_all_things(
        self,
        objekt: str,
        operation: str,
        payload: SerializableData = SerializableNone,
        preserialized_payload: PreserializedData = PreserializedEmptyByte,
        server_execution_context: ServerExecutionContext = default_server_execution_context,
        thing_execution_context: ThingExecutionContext = default_thing_execution_context,
    ) -> dict[str, ResponseMessage]:
        """execute the same operation in all `Thing`s"""
        return await self.async_execute_in_all(
            objekt=objekt,
            operation=operation,
            payload=payload,
            preserialized_payload=preserialized_payload,
            server_execution_context=server_execution_context,
            thing_execution_context=thing_execution_context,
        )

    async def ping_all_servers(self):
        return await self.async_execute_in_all()  # operation='invokeAction', objekt=CommonRPC.PING)

    def __contains__(self, name: str) -> bool:
        return name in self.pool

    def __getitem__(self, key) -> AsyncZMQClient:
        return self.pool[key]

    def __iter__(self) -> Iterator[AsyncZMQClient]:
        return iter(self.pool.values())

    def exit(self) -> None:
        try:
            BaseZMQ.exit(self)
            for client in self.pool.values():
                self.poller.unregister(client.socket)
                self.poller.unregister(client.socket.get_monitor_socket())
                client.exit()
            self.logger.info("all client socket unregistered")
        except Exception as ex:
            self.logger.warning(
                "could not properly terminate context or attempted to terminate an already terminated context."
                + f" Exception message: {str(ex)}"
            )

    """
    Inheritance tree:

    BaseZMQ
    BaseAsyncZMQ
    BaseSyncZMQ
    BaseZMQClient
    SyncZMQClient
    AsyncZMQClient
    MessageMappedClientPool
    """

Functions

__init__

__init__(id: str, client_ids: list[str], server_ids: list[str], handshake: bool = True, context: Context = None, access_point: str = ZMQ_TRANSPORTS.IPC, poll_timeout: int = 25, **kwargs) -> None

Parameters:

Name Type Description Default

id

str

ID of the pool, must be unique.

required

client_ids

list[str]

list of id's of clients in the pool, must be unique. Clients are created with these ID's. For pre-existing clients, use register method instead and leave this list empty.

required

server_ids

list[str]

list of id's of servers to connect to, must be same length as client_ids. For pre-existing clients that are connected to servers, use register method instead and leave this list empty.

required

handshake

bool

when true, handshake with the server first before allowing first message and block until that handshake was accomplished.

True

context

Context

ZMQ context to use, if None, a global context is used.

None

access_point

str

Default access point for all clients in the pool, usually not helpful except INPROC transport. Use TCP or tcp://<host>:<port> for network access, IPC for multi-process applications, and INPROC for multi-threaded applications.

IPC

poll_timeout

int

socket polling timeout in milliseconds greater than 0.

25

**kwargs

Additional arguments:

  • logger: logging.Logger, logger instance to use. If None, a default logger is created.
{}
Source code in hololinked/hololinked/core/zmq/brokers.py
def __init__(
    self,
    id: str,
    client_ids: list[str],
    server_ids: list[str],
    handshake: bool = True,
    context: zmq.asyncio.Context = None,
    access_point: str = ZMQ_TRANSPORTS.IPC,
    poll_timeout: int = 25,
    **kwargs,
) -> None:
    """
    Parameters
    ----------
    id: str
        ID of the pool, must be unique.
    client_ids: List[str]
        list of id's of clients in the pool, must be unique. Clients are created with these ID's.
        For pre-existing clients, use `register` method instead and leave this list empty.
    server_ids: List[str]
        list of id's of servers to connect to, must be same length as client_ids. For pre-existing clients that
        are connected to servers, use `register` method instead and leave this list empty.
    handshake: bool
        when true, handshake with the server first before allowing first message and block until that handshake was
        accomplished.
    context: zmq.asyncio.Context
        ZMQ context to use, if None, a global context is used.
    access_point: Enum | str, default ZMQ_TRANSPORTS.IPC
        Default access point for all clients in the pool, usually not helpful except `INPROC` transport.
        Use `TCP` or `tcp://<host>:<port>` for network access, `IPC` for multi-process applications,
        and `INPROC` for multi-threaded applications.
    poll_timeout: int
        socket polling timeout in milliseconds greater than 0.
    **kwargs:
        Additional arguments:

        - `logger`: `logging.Logger`, logger instance to use. If None, a default logger is created.
    """
    super().__init__(id=id, server_id=None, **kwargs)
    if len(client_ids) != len(server_ids):
        raise ValueError("client_ids and server_ids must have same length")
    # this class does not call create_socket method
    self.context = context or global_config.zmq_context()
    self.pool = dict()  # type: dict[str, AsyncZMQClient]
    self.poller = zmq.asyncio.Poller()
    for client_id, server_id in zip(client_ids, server_ids):
        client = AsyncZMQClient(
            id=client_id,
            server_id=server_id,
            handshake=handshake,
            context=self.context,
            access_point=access_point,
            logger=self.logger,
        )
        self.register(client)
    # Both the client pool as well as the individual client get their serializers and client_types
    # This is required to implement pool level sending and receiving messages like polling of pool of sockets
    self.event_pool = AsyncioEventPool(len(server_ids))
    self.events_map = dict()  # type: dict[bytes, asyncio.Event]
    self.message_map = dict()
    self.cancelled_messages = []
    self.poll_timeout = poll_timeout
    self.stop_poll = False
    self._thing_to_client_map = dict()  # type: dict[str, AsyncZMQClient]
    self._client_to_thing_map = dict()  # type: dict[str, str]

register

register(client: AsyncZMQClient, thing_id: str | None = None) -> None

Register a pre-existing client with the pool.

Parameters:

Name Type Description Default

client

AsyncZMQClient

client to be registered

required

thing_id

str | None

thing_id to which this client is mapped, especially when the client is connected to a server that serves only one Thing.

None
Source code in hololinked/hololinked/core/zmq/brokers.py
def register(self, client: AsyncZMQClient, thing_id: str | None = None) -> None:
    """
    Register a pre-existing client with the pool.

    Parameters
    ----------
    client: AsyncZMQClient
        client to be registered
    thing_id: Optional, str
        thing_id to which this client is mapped, especially when the client is connected to a server that serves
        only one `Thing`.
    """
    if not isinstance(client, AsyncZMQClient):
        raise TypeError(
            "registration possible for clients only subclass of AsyncZMQClient." + f" Given type {type(client)}"
        )
    if client.id not in self.pool:
        self.pool[client.id] = client
        self.poller.register(client.socket, zmq.POLLIN)
        self.poller.register(client._monitor_socket, zmq.POLLIN)
    elif self.pool[client.id] != client:
        warnings.warn(
            f"client with id '{client.id}' already present in pool. Replacing with {client}",
            category=UserWarning,
        )
    if thing_id:
        self._thing_to_client_map[thing_id] = client.id
        self._client_to_thing_map[client.id] = thing_id

start_polling

start_polling() -> None

register the server message polling loop in the asyncio event loop

Source code in hololinked/hololinked/core/zmq/brokers.py
def start_polling(self) -> None:
    """register the server message polling loop in the asyncio event loop"""
    get_current_async_loop().create_task(self.poll_responses())

poll_responses async

poll_responses() -> None

Poll for replies from server.This method should be independently started in the event loop by calling start_polling(). Sending message requests and retrieving a response is still carried out by other methods. Do not duplicate this method call as there are no checks how many pollers exist and messages will become malformed if multiple pollers are active.

Source code in hololinked/hololinked/core/zmq/brokers.py
async def poll_responses(self) -> None:
    """
    Poll for replies from server.This method should be independently started in the event loop by calling `start_polling()`.
    Sending message requests and retrieving a response is still carried out by other methods.
    Do not duplicate this method call as there are no checks how many pollers exist and messages will become malformed
    if multiple pollers are active.
    """
    self.logger.info("client polling started for sockets for {}".format(list(self.pool.keys())))
    self.stop_poll = False
    event_loop = asyncio.get_event_loop()
    while not self.stop_poll:
        sockets = await self.poller.poll(self.poll_timeout)  # type hints dont work in this line
        for socket, _ in sockets:
            while True:
                try:
                    raw_response = await socket.recv_multipart(zmq.NOBLOCK)
                    response_message = ResponseMessage(raw_response)
                except zmq.Again:
                    # errors in handle_message should reach the client.
                    break
                except ConnectionAbortedError:
                    for client in self.pool.values():
                        if client.socket.get_monitor_socket() == socket:
                            self.poller.unregister(client.socket)  # leave the monitor in the pool
                            client.handshake(timeout=None)
                            self.logger.error(
                                f"client {client.id} disconnected from server."
                                + " Unregistering from poller temporarily until server comes back."
                            )
                            break
                else:
                    if self.handled_default_message_types(response_message):
                        continue
                    message_id = response_message.id
                    self.logger.debug(
                        "received response from server",
                        sender_id=response_message.sender_id,
                        receiver_id=response_message.receiver_id,
                        msg_id=response_message.id,
                        message_type=response_message.type,
                    )
                    if message_id in self.cancelled_messages:
                        self.cancelled_messages.remove(message_id)
                        self.logger.debug("dropping a cancelled message", msg_id=message_id)
                        continue
                    self.message_map[message_id] = response_message
                    event = self.events_map.get(message_id, None)
                    if event:
                        event.set()
                    else:
                        event_loop.create_task(self._resolve_response(message_id, response_message))

stop_polling

stop_polling()

stop polling for replies from server

Source code in hololinked/hololinked/core/zmq/brokers.py
def stop_polling(self):
    """
    stop polling for replies from server
    """
    self.stop_poll = True
    for client in self.pool.values():
        client.stop()

async_execute async

async_execute(thing_id: str, objekt: str, operation: str, payload: SerializableData = SerializableNone, preserialized_payload: PreserializedData = PreserializedEmptyByte, server_execution_context: ServerExecutionContext = default_server_execution_context, thing_execution_context: ThingExecutionContext = default_thing_execution_context) -> ResponseMessage

send an operation and receive the response for it.

Parameters:

Name Type Description Default

client_id

id of the client in the pool to be used to send the message

required

thing_id

str

id of the Thing on which an operation is to be performed

required

objekt

str

name of property, action or event (usually only property or action)

required

operation

str

operation to be performed, like readproperty, writeproperty, invokeaction etc.

required

payload

SerializableData

serializable data to be sent as payload

SerializableNone

preserialized_payload

PreserializedData

pre-encoded data to be sent as payload, generally used for large or custom data that is already serialized

PreserializedEmptyByte

server_execution_context

ServerExecutionContext

Specify server level execution context like invokationTimeout, executionTimeout, oneway operation etc.

default_server_execution_context

thing_execution_context

ThingExecutionContext

Specify thing level execution context like fetchExecutionLogs etc.

default_thing_execution_context

Returns:

Type Description
ResponseMessage

response message from server after completing the operation

Source code in hololinked/hololinked/core/zmq/brokers.py
async def async_execute(
    self,
    thing_id: str,
    objekt: str,
    operation: str,
    payload: SerializableData = SerializableNone,
    preserialized_payload: PreserializedData = PreserializedEmptyByte,
    server_execution_context: ServerExecutionContext = default_server_execution_context,
    thing_execution_context: ThingExecutionContext = default_thing_execution_context,
) -> ResponseMessage:
    """
    send an operation and receive the response for it.

    Parameters
    ----------
    client_id: str
        id of the client in the pool to be used to send the message
    thing_id: str
        `id` of the `Thing` on which an operation is to be performed
    objekt: str
        name of property, action or event (usually only property or action)
    operation: str
        operation to be performed, like `readproperty`, `writeproperty`, `invokeaction` etc.
    payload: SerializableData
        serializable data to be sent as payload
    preserialized_payload: PreserializedData
        pre-encoded data to be sent as payload, generally used for large or custom data that is already serialized
    server_execution_context: dict[str, Any]
        Specify server level execution context like `invokationTimeout`, `executionTimeout`, `oneway` operation etc.
    thing_execution_context: dict[str, Any]
        Specify thing level execution context like `fetchExecutionLogs` etc.

    Returns
    -------
    ResponseMessage
        response message from server after completing the operation
    """
    message_id = await self.async_send_request(
        thing_id=thing_id,
        objekt=objekt,
        operation=operation,
        payload=payload,
        preserialized_payload=preserialized_payload,
        server_execution_context=server_execution_context,
        thing_execution_context=thing_execution_context,
    )
    return await self.async_recv_response(
        thing_id=thing_id,
        message_id=message_id,
    )

async_send_request async

async_send_request(thing_id: str, objekt: str, operation: str, payload: SerializableData = SerializableNone, preserialized_payload: PreserializedData = PreserializedEmptyByte, server_execution_context: ServerExecutionContext = default_server_execution_context, thing_execution_context: ThingExecutionContext = default_thing_execution_context) -> str

send request message to server.

Parameters:

Name Type Description Default

client_id

id of the client in the pool to be used to send the message

required

thing_id

str

id of the Thing on which an operation is to be performed

required

objekt

str

name of property, action or event (usually only property or action)

required

operation

str

operation to be performed, like readproperty, writeproperty, invokeaction etc.

required

payload

SerializableData

serializable data to be sent as payload

SerializableNone

preserialized_payload

PreserializedData

pre-encoded data to be sent as payload, generally used for large or custom data that is already serialized

PreserializedEmptyByte

server_execution_context

ServerExecutionContext

Specify server level execution context like invokationTimeout, executionTimeout, oneway operation etc.

default_server_execution_context

thing_execution_context

ThingExecutionContext

Specify thing level execution context like fetchExecutionLogs etc.

default_thing_execution_context

Returns:

Type Description
bytes

a message id in bytes

Source code in hololinked/hololinked/core/zmq/brokers.py
async def async_send_request(
    self,
    thing_id: str,
    objekt: str,
    operation: str,
    payload: SerializableData = SerializableNone,
    preserialized_payload: PreserializedData = PreserializedEmptyByte,
    server_execution_context: ServerExecutionContext = default_server_execution_context,
    thing_execution_context: ThingExecutionContext = default_thing_execution_context,
) -> str:
    """
    send request message to server.

    Parameters
    ----------
    client_id: str
        id of the client in the pool to be used to send the message
    thing_id: str
        `id` of the `Thing` on which an operation is to be performed
    objekt: str
        name of property, action or event (usually only property or action)
    operation: str
        operation to be performed, like `readproperty`, `writeproperty`, `invokeaction` etc.
    payload: SerializableData
        serializable data to be sent as payload
    preserialized_payload: PreserializedData
        pre-encoded data to be sent as payload, generally used for large or custom data that is already serialized
    server_execution_context: dict[str, Any]
        Specify server level execution context like `invokationTimeout`, `executionTimeout`, `oneway` operation etc.
    thing_execution_context: dict[str, Any]
        Specify thing level execution context like `fetchExecutionLogs` etc.

    Returns
    -------
    bytes
        a message id in bytes
    """
    client_id = self.get_client_id_from_thing_id(thing_id)
    self.assert_client_ready(self.pool[client_id])
    message_id = await self.pool[client_id].async_send_request(
        thing_id=thing_id,
        objekt=objekt,
        operation=operation,
        payload=payload,
        preserialized_payload=preserialized_payload,
        server_execution_context=server_execution_context,
        thing_execution_context=thing_execution_context,
    )
    event = self.event_pool.pop()
    self.events_map[message_id] = event
    return message_id

async_recv_response async

async_recv_response(thing_id: str, message_id: bytes, timeout: float | int | None = None) -> ResponseMessage

Receive response for specified message ID.

Parameters:

Name Type Description Default

client_id

id of the client in the pool to be used to receive the message

required

message_id

bytes

the message id for which response needs to be fetched

required

timeout

float | int | None

Client side timeout, not the same as timeout passed to server. Recommended to be None in general cases. The reply is dropped if this timeout occurs. Usually the server always replies when either of invokation or execution timeout occurs. This timeout ensures that the protocol binding does not wait indefinitely for a message that may never arrive.

None

Returns:

Name Type Description
response ResponseMessage

response message from server corresponding to the message id

Raises:

Type Description
ValueError

if supplied message id is not valid

TimeoutError

if timeout is not None and response did not arrive

Source code in hololinked/hololinked/core/zmq/brokers.py
async def async_recv_response(
    self, thing_id: str, message_id: bytes, timeout: float | int | None = None
) -> ResponseMessage:
    """
    Receive response for specified message ID.

    Parameters
    ----------
    client_id: str
        id of the client in the pool to be used to receive the message
    message_id: bytes
        the message id for which response needs to be fetched
    timeout: float | int | None
        Client side timeout, not the same as timeout passed to server. Recommended to be None in general cases.
        The reply is dropped if this timeout occurs. Usually the server always replies when either of invokation or
        execution timeout occurs. This timeout ensures that the protocol binding does not wait indefinitely for a message
        that may never arrive.

    Returns
    -------
    response: ResponseMessage
        response message from server corresponding to the message id

    Raises
    ------
    ValueError
        if supplied message id is not valid
    TimeoutError
        if timeout is not None and response did not arrive
    """
    try:
        client_id = self.get_client_id_from_thing_id(thing_id)
        event = self.events_map[message_id]
    except KeyError:
        raise KeyError(f"message id {message_id} unknown.") from None
    while True:
        try:
            await asyncio.wait_for(event.wait(), timeout)
            # default 5 seconds because we want to check if server is also dead
            if event.is_set():  # i.e. if timeout is not None, check if event is set
                break
            self.assert_client_ready(self.pool[client_id])
        except TimeoutError:
            self.cancelled_messages.append(message_id)
            self.logger.error("message added to list of cancelled messages", msg_id=message_id)
            raise TimeoutError(f"Execution not completed within {timeout} seconds") from None
    self.events_map.pop(message_id)
    self.event_pool.completed(event)
    response = self.message_map.pop(message_id)
    return response

handshake

handshake(timeout: int | None = 60000) -> None

schedules handshake coroutines for each client in the running event loop or completes handshake synchronously if no event loop is running. Use handshake_complete() async method to check if handshake is complete.

Parameters:

Name Type Description Default

timeout

int | None

timeout in milliseconds to wait for handshake to complete. If handshake does not complete within this time, a ConnectionError is raised. If None, wait indefinitely until handshake completes.

60000
Source code in hololinked/hololinked/core/zmq/brokers.py
def handshake(self, timeout: int | None = 60000) -> None:
    """
    schedules handshake coroutines for each client in the running event loop
    or completes handshake synchronously if no event loop is running.
    Use `handshake_complete()` async method to check if handshake is complete.

    Parameters
    ----------
    timeout: float | int
        timeout in milliseconds to wait for handshake to complete. If handshake does not complete within this time,
        a `ConnectionError` is raised. If None, wait indefinitely until handshake completes.
    """
    for client in self.pool.values():
        client.handshake(timeout)

handshake_complete async

handshake_complete() -> None

wait for handshake to complete for all clients in the pool

Source code in hololinked/hololinked/core/zmq/brokers.py
async def handshake_complete(self) -> None:
    """wait for handshake to complete for all clients in the pool"""
    for client in self.pool.values():
        await client.handshake_complete()  # sufficient to wait serially