Skip to content

hololinked.core.zmq.brokers.MessageMappedZMQClientPool

Bases: BaseZMQClient

Pool of clients where message ID can track the replies irrespective of order of arrival.

Parameters:

Name Type Description Default

server_ids

List[str]

list of instance names of servers to connect to

required

id

str

Unique identity of the client to receive messages from the server. Each client connecting to same server must still have unique ID.

required

client_type

ZMQ or HTTP Server

required

handshake

bool

when true, handshake with the server first before allowing first message and block until that handshake was accomplished.

True

poll_timeout

int

socket polling timeout in milliseconds greater than 0.

25

transport

str

transport implemented by ZMQ server

'IPC'

context

Context

ZMQ context

None

deserialize_server_messages

deserializes the data field of the message

required

**kwargs

zmq_serializer: BaseSerializer custom implementation of ZMQ serializer if necessary http_serializer: JSONSerializer custom implementation of JSON serializer if necessary

{}
Source code in hololinked\core\zmq\brokers.py
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
class MessageMappedZMQClientPool(BaseZMQClient):
    """
    Pool of clients where message ID can track the replies irrespective of order of arrival. 

    Parameters
    ----------
    server_ids: List[str]
        list of instance names of servers to connect to
    id: str
        Unique identity of the client to receive messages from the server. Each client connecting to same server must 
        still have unique ID.
    client_type: str
        ZMQ or HTTP Server
    handshake: bool
        when true, handshake with the server first before allowing first message and block until that handshake was
        accomplished.
    poll_timeout: int
        socket polling timeout in milliseconds greater than 0.
    transport: str
        transport implemented by ZMQ server
    context: zmq.asyncio.Context
        ZMQ context
    deserialize_server_messages: bool
        deserializes the data field of the message
    **kwargs:
        zmq_serializer: BaseSerializer
            custom implementation of ZMQ serializer if necessary
        http_serializer: JSONSerializer
            custom implementation of JSON serializer if necessary
    """

    def __init__(self, 
                id: str,
                client_ids: typing.List[str], 
                server_ids: typing.List[str], 
                handshake: bool = True, 
                poll_timeout: int = 25, 
                transport: str = 'IPC',  
                context: zmq.asyncio.Context = None, 
                **kwargs
            ) -> None:
        super().__init__(id=id, server_id=None, **kwargs)
        if len(client_ids) != len(server_ids):
            raise ValueError("client_ids and server_ids must have same length")
        # this class does not call create_socket method
        self.context = context or zmq.asyncio.Context()
        self.pool = dict() # type: typing.Dict[str, AsyncZMQClient]
        self.poller = zmq.asyncio.Poller()
        for client_id, server_id in zip(client_ids, server_ids):
            client = AsyncZMQClient(
                            id=client_id,
                            server_id=server_id,
                            handshake=handshake, 
                            transport=transport, 
                            context=self.context, 
                            logger=self.logger
                        )
            self.register(client)
        # Both the client pool as well as the individual client get their serializers and client_types
        # This is required to implement pool level sending and receiving messages like polling of pool of sockets
        self.event_pool = AsyncioEventPool(len(server_ids))
        self.events_map = dict() # type: typing.Dict[bytes, asyncio.Event]
        self.message_map = dict()
        self.cancelled_messages = []
        self.poll_timeout = poll_timeout
        self.stop_poll = False 


    def create_new(self, id: str, server_id: str, transport: str = 'IPC') -> None:
        """
        Create new server with specified transport. other arguments are taken from pool specifications. 

        Parameters
        ----------
        id: str
            instance name of server 
        transport: str
            transport implemented by ZMQ server
        """
        if server_id not in self.pool.keys():
            client = AsyncZMQClient(
                        id=id,
                        server_id=server_id,
                        handshake=True, 
                        transport=transport, 
                        context=self.context, 
                        logger=self.logger
                    )
            client._monitor_socket = client.socket.get_monitor_socket()
            self.poller.register(client._monitor_socket, zmq.POLLIN)
            self.pool[server_id] = client
        else: 
            raise ValueError(f"client for instance name '{server_id}' already present in pool")


    def register(self, client: AsyncZMQClient) -> None:
        """
        Register a client with the pool. 

        Parameters
        ----------
        client: AsyncZMQClient
            client to be registered
        """
        if not isinstance(client, AsyncZMQClient):
            raise TypeError("registration possible for clients only subclass of AsyncZMQClient." +
                           f" Given type {type(client)}")
        self.pool[client.id] = client 
        self.poller.register(client.socket, zmq.POLLIN)
        self.poller.register(client._monitor_socket, zmq.POLLIN)


    def get_client_id_from_thing_id(self, thing_id: str) -> typing.Dict[str, AsyncZMQClient]:
        """
        map of thing_id to client
        """
        raise NotImplementedError("get_client_id_from_thing_id not implemented for MessageMappedZMQClientPool")

    @property
    def poll_timeout(self) -> int:
        """
        socket polling timeout in milliseconds greater than 0. 
        """
        return self._poll_timeout

    @poll_timeout.setter
    def poll_timeout(self, value) -> None:
        if not isinstance(value, int) or value < 0:
            raise ValueError("polling period must be an integer greater than 0, not {}. Value is considered in milliseconds".format(value))
        self._poll_timeout = value 


    async def handshake_complete(self) -> None:
        """
        wait for handshake to complete for all clients in the pool
        """
        for client in self.pool.values():
            await client.handshake_complete() # sufficient to wait serially


    def handshake(self, timeout: int | None = 60000) -> None:
        """
        automatically called when handshake argument at init is True. When not automatically called, it is necessary
        to call this method before awaiting ``handshake_complete()``.
        """
        for client in self.pool.values():
            run_callable_somehow(client._handshake(timeout))


    async def poll_responses(self) -> None:
        """
        Poll for replies from server. Since the client is message mapped, this method should be independently started
        in the event loop. Sending message and retrieving a message mapped is still carried out by other methods.
        """
        self.logger.info("client polling started for sockets for {}".format(list(self.pool.keys())))
        self.stop_poll = False 
        event_loop = asyncio.get_event_loop()
        while not self.stop_poll:
            sockets = await self.poller.poll(self.poll_timeout) # type hints dont work in this line
            for socket, _ in sockets:
                while True:
                    try:
                        raw_response = await socket.recv_multipart(zmq.NOBLOCK)
                        response_message = ResponseMessage(raw_response)                          
                    except zmq.Again:
                        # errors in handle_message should reach the client. 
                        break
                    except ConnectionAbortedError:
                        for client in self.pool.values():
                            if client.socket.get_monitor_socket() == socket:
                                self.poller.unregister(client.socket) # leave the monitor in the pool
                                client.handshake(timeout=None)
                                self.logger.error(f"{client.id} disconnected." +
                                    " Unregistering from poller temporarily until server comes back.")
                                break
                    else:
                        message_id = response_message.id
                        self.logger.debug(f"received response from server '{response_message.sender_id}' with msg-ID '{message_id}'")
                        if message_id in self.cancelled_messages:
                            self.cancelled_messages.remove(message_id)
                            self.logger.debug(f"msg-ID '{message_id}' cancelled")
                            continue
                        event = self.events_map.get(message_id, None) 
                        final_data = response_message.body
                        # if len(response_message.pre_encoded_payload ) > 0 and response_message.payload:
                        #     final_data = tuple(response_message.body)
                        # elif len(response_message.pre_encoded_payload) > 0:
                        #     final_data = response_message.pre_encoded_payload
                        # else:
                        #     final_data = response_message.payload
                        if event:
                            event.set()
                        else:    
                            invalid_event_task = asyncio.create_task(self._resolve_response(message_id, final_data))
                            event_loop.call_soon(lambda: invalid_event_task)


    async def _resolve_response(self, message_id: str, data: typing.Any) -> None:
        """
        This method is called when there is an asyncio Event not available for a message ID. This can happen only 
        when the server replied before the client created a asyncio.Event object. check ``async_execute()`` for details.

        Parameters
        ----------
        message_id: bytes 
            the message for which the event was not created
        data: bytes
            the data given by the server which needs to mapped to the message
        """
        max_number_of_retries = 100
        for i in range(max_number_of_retries):
            await asyncio.sleep(0.025)
            try:
                event = self.events_map[message_id]
            except KeyError:
                if message_id in self.cancelled_messages:
                    # Only for safety, likely should never reach here
                    self.cancelled_messages.remove(message_id)
                    self.logger.debug(f'message_id {message_id} cancelled')
                    return 
                if i >= max_number_of_retries - 1:
                    self.logger.error("unknown message id {} without corresponding event object".format(message_id)) 
                    return
            else:    
                self.message_map[message_id] = data
                event.set()
                break

    def assert_client_ready(self, client: AsyncZMQClient):
        if not client._handshake_event.is_set():
            raise ConnectionAbortedError(f"{client.id} is currently not alive")
        if not client.socket in self.poller._map:
            raise ConnectionError("handshake complete, server is alive but client socket not yet ready to be polled." +
                                "Application using MessageMappedClientPool should register the socket manually for polling." +
                                "If using hololinked.server.HTTPServer, socket is waiting until HTTP Server updates its "
                                "routing logic as the server has just now come alive, please try again soon.")

    async def async_send_request(self, 
                            client_id: str, 
                            thing_id: str, 
                            objekt: str, 
                            operation: str,
                            payload: SerializableData = SerializableNone, 
                            preserialized_payload: PreserializedData = PreserializedEmptyByte,
                            server_execution_context: ServerExecutionContext = default_server_execution_context,
                            thing_execution_context: ThingExecutionContext  = default_thing_execution_context
                        ) -> bytes:
        """
        Send operation to server with instance name. Replies are automatically polled & to be retrieved using 
        ``async_recv_response()``

        Parameters
        ----------
        id: str
            instance name of the server
        operation: str
            unique str identifying a server side or ``Thing`` resource. These values corresponding 
            to automatically extracted name from the object name or the URL_path prepended with the instance name. 
        arguments: Dict[str, Any]
            if the operation invokes a method, arguments of that method. 
        server_execution_context: Dict[str, Any]
            see execution context definitions
        thing_execution_context: Dict[str, Any]
            see execution context definitions

        Returns
        -------
        message_id: bytes
            created message ID
        """
        self.assert_client_ready(self.pool[client_id])
        message_id = await self.pool[client_id].async_send_request(
                                                thing_id=thing_id, 
                                                objekt=objekt,
                                                operation=operation, 
                                                payload=payload,
                                                preserialized_payload=preserialized_payload,
                                                server_execution_context=server_execution_context,
                                                thing_execution_context=thing_execution_context
                                            )
        event = self.event_pool.pop()
        self.events_map[message_id] = event 
        return message_id

    async def async_recv_response(self, 
                                client_id: str, 
                                message_id: bytes, 
                                timeout: float | int | None = None
                            ) -> typing.Dict[str, typing.Any]:
        """
        Receive response for specified message ID. 

        Parameters
        ----------
        message_id: bytes
            the message id for which response needs to eb fetched
        raise_client_side_exceptions: bool, default False
            raise exceptions from server on client side
        timeout: float, 
            client side timeout, not the same as timeout passed to server, recommended to be None in general cases. 
            Server side timeouts ensure start of execution of operations within specified timeouts and 
            drops execution altogether if timeout occured. Client side timeouts only wait for message to come within 
            the timeout, but do not gaurantee non-execution.  

        Returns
        -------
        response: dict, Any
            dictionary when plain response is False, any value returned from execution on the server side if plain response is
            True.

        Raises
        ------
        ValueError: 
            if supplied message id is not valid
        TimeoutError:
            if timeout is not None and response did not arrive
        """
        try:
            event = self.events_map[message_id]
        except KeyError:
            raise ValueError(f"message id {message_id} unknown.") from None
        while True:
            try:
                await asyncio.wait_for(event.wait(), timeout) 
                # default 5 seconds because we want to check if server is also dead
                if event.is_set(): # i.e. if timeout is not None, check if event is set
                    break
                self.assert_client_ready(self.pool[client_id])
            except TimeoutError:
                self.cancelled_messages.append(message_id)
                self.logger.debug(f'message_id {message_id} added to list of cancelled messages')
                raise TimeoutError(f"Execution not completed within {timeout} seconds") from None
        self.events_map.pop(message_id)
        self.event_pool.completed(event)
        response = self.message_map.pop(message_id)
        return response

    async def async_execute(self, 
                        client_id: str, 
                        thing_id: str, 
                        objekt: str, 
                        operation: str, 
                        payload: SerializableData = SerializableNone, 
                        preserialized_payload: PreserializedData = PreserializedEmptyByte,
                        server_execution_context: ServerExecutionContext = default_server_execution_context,
                        thing_execution_context: ThingExecutionContext  = default_thing_execution_context, 
                    ) -> typing.Dict[str, typing.Any]:
        """
        sends message and receives response.

        Parameters
        ----------
        id: str
            instance name of the server
        operation: str
            unique str identifying a server side or ``Thing`` resource. These values corresponding 
            to automatically extracted name from the object name or the URL_path prepended with the instance name. 
        arguments: Dict[str, Any]
            if the operation invokes a method, arguments of that method. 
        context: Dict[str, Any]
            see execution context definitions
        raise_client_side_exceptions: bool, default False
            raise exceptions from server on client side
        invokation_timeout: float, default 5
            server side timeout
        execution_timeout: float, default None
            client side timeout, not the same as timeout passed to server, recommended to be None in general cases. 
            Server side timeouts ensure start of execution of operations within specified timeouts and 
            drops execution altogether if timeout occured. Client side timeouts only wait for message to come within 
            the timeout, but do not gaurantee non-execution.  
        """
        message_id = await self.async_send_request(
                                                client_id=client_id, 
                                                thing_id=thing_id, 
                                                objekt=objekt, 
                                                operation=operation,
                                                payload=payload, 
                                                preserialized_payload=preserialized_payload,
                                                server_execution_context=server_execution_context,
                                                thing_execution_context=thing_execution_context
                                            )
        return await self.async_recv_response(
                                            client_id=client_id, 
                                            message_id=message_id, 
                                        )

    def start_polling(self) -> None:
        """
        register the server message polling loop in the asyncio event loop. 
        """
        event_loop = asyncio.get_event_loop()
        event_loop.call_soon(lambda: asyncio.create_task(self.poll_responses()))

    def stop_polling(self):
        """
        stop polling for replies from server
        """
        self.stop_poll = True

    async def async_execute_in_all(self, 
                                objekt: str, 
                                operation: str, 
                                payload: SerializableData = SerializableNone, 
                                preserialized_payload: PreserializedData = PreserializedEmptyByte,
                                ids: typing.Optional[typing.List[str]] = None,
                                server_execution_context: ServerExecutionContext = default_server_execution_context,
                                thing_execution_context: ThingExecutionContext = default_thing_execution_context,
                            ) -> typing.Dict[str, typing.Any]:
        """
        execute a specified operation in all Thing including eventloops
        """
        if not ids:
            ids = self.pool.keys()
        gathered_replies = await asyncio.gather(*[
            self.async_execute(id=id, objekt=objekt, operation=operation, payload=payload, 
                            preserialized_payload=preserialized_payload, 
                            server_execution_context=server_execution_context,
                            thing_execution_context=thing_execution_context
                        ) 
                for id in ids])
        replies = dict()
        for id, response in zip(ids, gathered_replies):
            replies[id] = response
        return replies  

    async def async_execute_in_all_things(self, 
                                        objekt: str, 
                                        operation: str, 
                                        payload: SerializableData = SerializableNone, 
                                        preserialized_payload: PreserializedData = PreserializedEmptyByte,
                                        server_execution_context: ServerExecutionContext = default_server_execution_context,
                                        thing_execution_context: ThingExecutionContext = default_thing_execution_context,        
                                    ) -> typing.Dict[str, typing.Any]:
        """
        execute the same operation in all Things, eventloops are excluded. 
        """
        return await self.async_execute_in_all(
                        objekt=objekt, operation=operation, payload=payload, 
                        preserialized_payload=preserialized_payload,
                        ids=[id for id, client in self.pool.items()],
                        server_execution_context=server_execution_context,
                        thing_execution_context=thing_execution_context
                    )

    async def ping_all_servers(self):
        """
        ping all servers connected to the client pool, calls ping() on Thing
        """
        return await self.async_execute_in_all() #operation='invokeAction', objekt=CommonRPC.PING)

    def __contains__(self, name: str) -> bool:
        return name in self.pool

    def __getitem__(self, key) ->AsyncZMQClient:
        return self.pool[key]

    def __iter__(self) -> typing.Iterator[AsyncZMQClient]:
        return iter(self.pool.values())

    def exit(self) -> None:
        BaseZMQ.exit(self)
        for client in self.pool.values():
            self.poller.unregister(client.socket)
            self.poller.unregister(client.socket.get_monitor_socket())
            client.exit()
        self.logger.info("all client socket unregistered from pool for '{}'".format(self.__class__))
        try:
            self.context.term()
            self.logger.info("context terminated for '{}'".format(self.__class__))        
        except Exception as ex:
            self.logger.warning("could not properly terminate context or attempted to terminate an already terminated context" +
                            "'{}'. Exception message: {}".format(self.identity, str(ex)))

    """
    BaseZMQ
    BaseAsyncZMQ
    BaseSyncZMQ
    BaseZMQClient
    SyncZMQClient
    AsyncZMQClient
    MessageMappedClientPool
    """

Functions

async_execute async

async_execute(client_id: str, thing_id: str, objekt: str, operation: str, payload: SerializableData = SerializableNone, preserialized_payload: PreserializedData = PreserializedEmptyByte, server_execution_context: ServerExecutionContext = default_server_execution_context, thing_execution_context: ThingExecutionContext = default_thing_execution_context) -> typing.Dict[str, typing.Any]

sends message and receives response.

Parameters:

Name Type Description Default

id

instance name of the server

required

operation

str

unique str identifying a server side or Thing resource. These values corresponding to automatically extracted name from the object name or the URL_path prepended with the instance name.

required

arguments

if the operation invokes a method, arguments of that method.

required

context

see execution context definitions

required

raise_client_side_exceptions

raise exceptions from server on client side

required

invokation_timeout

server side timeout

required

execution_timeout

client side timeout, not the same as timeout passed to server, recommended to be None in general cases. Server side timeouts ensure start of execution of operations within specified timeouts and drops execution altogether if timeout occured. Client side timeouts only wait for message to come within the timeout, but do not gaurantee non-execution.

required
Source code in hololinked\core\zmq\brokers.py
async def async_execute(self, 
                    client_id: str, 
                    thing_id: str, 
                    objekt: str, 
                    operation: str, 
                    payload: SerializableData = SerializableNone, 
                    preserialized_payload: PreserializedData = PreserializedEmptyByte,
                    server_execution_context: ServerExecutionContext = default_server_execution_context,
                    thing_execution_context: ThingExecutionContext  = default_thing_execution_context, 
                ) -> typing.Dict[str, typing.Any]:
    """
    sends message and receives response.

    Parameters
    ----------
    id: str
        instance name of the server
    operation: str
        unique str identifying a server side or ``Thing`` resource. These values corresponding 
        to automatically extracted name from the object name or the URL_path prepended with the instance name. 
    arguments: Dict[str, Any]
        if the operation invokes a method, arguments of that method. 
    context: Dict[str, Any]
        see execution context definitions
    raise_client_side_exceptions: bool, default False
        raise exceptions from server on client side
    invokation_timeout: float, default 5
        server side timeout
    execution_timeout: float, default None
        client side timeout, not the same as timeout passed to server, recommended to be None in general cases. 
        Server side timeouts ensure start of execution of operations within specified timeouts and 
        drops execution altogether if timeout occured. Client side timeouts only wait for message to come within 
        the timeout, but do not gaurantee non-execution.  
    """
    message_id = await self.async_send_request(
                                            client_id=client_id, 
                                            thing_id=thing_id, 
                                            objekt=objekt, 
                                            operation=operation,
                                            payload=payload, 
                                            preserialized_payload=preserialized_payload,
                                            server_execution_context=server_execution_context,
                                            thing_execution_context=thing_execution_context
                                        )
    return await self.async_recv_response(
                                        client_id=client_id, 
                                        message_id=message_id, 
                                    )

async_execute_in_all async

async_execute_in_all(objekt: str, operation: str, payload: SerializableData = SerializableNone, preserialized_payload: PreserializedData = PreserializedEmptyByte, ids: typing.Optional[typing.List[str]] = None, server_execution_context: ServerExecutionContext = default_server_execution_context, thing_execution_context: ThingExecutionContext = default_thing_execution_context) -> typing.Dict[str, typing.Any]

execute a specified operation in all Thing including eventloops

Source code in hololinked\core\zmq\brokers.py
async def async_execute_in_all(self, 
                            objekt: str, 
                            operation: str, 
                            payload: SerializableData = SerializableNone, 
                            preserialized_payload: PreserializedData = PreserializedEmptyByte,
                            ids: typing.Optional[typing.List[str]] = None,
                            server_execution_context: ServerExecutionContext = default_server_execution_context,
                            thing_execution_context: ThingExecutionContext = default_thing_execution_context,
                        ) -> typing.Dict[str, typing.Any]:
    """
    execute a specified operation in all Thing including eventloops
    """
    if not ids:
        ids = self.pool.keys()
    gathered_replies = await asyncio.gather(*[
        self.async_execute(id=id, objekt=objekt, operation=operation, payload=payload, 
                        preserialized_payload=preserialized_payload, 
                        server_execution_context=server_execution_context,
                        thing_execution_context=thing_execution_context
                    ) 
            for id in ids])
    replies = dict()
    for id, response in zip(ids, gathered_replies):
        replies[id] = response
    return replies  

async_execute_in_all_things async

async_execute_in_all_things(objekt: str, operation: str, payload: SerializableData = SerializableNone, preserialized_payload: PreserializedData = PreserializedEmptyByte, server_execution_context: ServerExecutionContext = default_server_execution_context, thing_execution_context: ThingExecutionContext = default_thing_execution_context) -> typing.Dict[str, typing.Any]

execute the same operation in all Things, eventloops are excluded.

Source code in hololinked\core\zmq\brokers.py
async def async_execute_in_all_things(self, 
                                    objekt: str, 
                                    operation: str, 
                                    payload: SerializableData = SerializableNone, 
                                    preserialized_payload: PreserializedData = PreserializedEmptyByte,
                                    server_execution_context: ServerExecutionContext = default_server_execution_context,
                                    thing_execution_context: ThingExecutionContext = default_thing_execution_context,        
                                ) -> typing.Dict[str, typing.Any]:
    """
    execute the same operation in all Things, eventloops are excluded. 
    """
    return await self.async_execute_in_all(
                    objekt=objekt, operation=operation, payload=payload, 
                    preserialized_payload=preserialized_payload,
                    ids=[id for id, client in self.pool.items()],
                    server_execution_context=server_execution_context,
                    thing_execution_context=thing_execution_context
                )

async_recv_response async

async_recv_response(client_id: str, message_id: bytes, timeout: float | int | None = None) -> typing.Dict[str, typing.Any]

Receive response for specified message ID.

Parameters:

Name Type Description Default

message_id

bytes

the message id for which response needs to eb fetched

required

raise_client_side_exceptions

raise exceptions from server on client side

required

timeout

float | int | None

client side timeout, not the same as timeout passed to server, recommended to be None in general cases. Server side timeouts ensure start of execution of operations within specified timeouts and drops execution altogether if timeout occured. Client side timeouts only wait for message to come within the timeout, but do not gaurantee non-execution.

None

Returns:

Name Type Description
response (dict, Any)

dictionary when plain response is False, any value returned from execution on the server side if plain response is True.

Raises:

Type Description
ValueError:

if supplied message id is not valid

TimeoutError:

if timeout is not None and response did not arrive

Source code in hololinked\core\zmq\brokers.py
async def async_recv_response(self, 
                            client_id: str, 
                            message_id: bytes, 
                            timeout: float | int | None = None
                        ) -> typing.Dict[str, typing.Any]:
    """
    Receive response for specified message ID. 

    Parameters
    ----------
    message_id: bytes
        the message id for which response needs to eb fetched
    raise_client_side_exceptions: bool, default False
        raise exceptions from server on client side
    timeout: float, 
        client side timeout, not the same as timeout passed to server, recommended to be None in general cases. 
        Server side timeouts ensure start of execution of operations within specified timeouts and 
        drops execution altogether if timeout occured. Client side timeouts only wait for message to come within 
        the timeout, but do not gaurantee non-execution.  

    Returns
    -------
    response: dict, Any
        dictionary when plain response is False, any value returned from execution on the server side if plain response is
        True.

    Raises
    ------
    ValueError: 
        if supplied message id is not valid
    TimeoutError:
        if timeout is not None and response did not arrive
    """
    try:
        event = self.events_map[message_id]
    except KeyError:
        raise ValueError(f"message id {message_id} unknown.") from None
    while True:
        try:
            await asyncio.wait_for(event.wait(), timeout) 
            # default 5 seconds because we want to check if server is also dead
            if event.is_set(): # i.e. if timeout is not None, check if event is set
                break
            self.assert_client_ready(self.pool[client_id])
        except TimeoutError:
            self.cancelled_messages.append(message_id)
            self.logger.debug(f'message_id {message_id} added to list of cancelled messages')
            raise TimeoutError(f"Execution not completed within {timeout} seconds") from None
    self.events_map.pop(message_id)
    self.event_pool.completed(event)
    response = self.message_map.pop(message_id)
    return response

async_send_request async

async_send_request(client_id: str, thing_id: str, objekt: str, operation: str, payload: SerializableData = SerializableNone, preserialized_payload: PreserializedData = PreserializedEmptyByte, server_execution_context: ServerExecutionContext = default_server_execution_context, thing_execution_context: ThingExecutionContext = default_thing_execution_context) -> bytes

Send operation to server with instance name. Replies are automatically polled & to be retrieved using async_recv_response()

Parameters:

Name Type Description Default

id

instance name of the server

required

operation

str

unique str identifying a server side or Thing resource. These values corresponding to automatically extracted name from the object name or the URL_path prepended with the instance name.

required

arguments

if the operation invokes a method, arguments of that method.

required

server_execution_context

ServerExecutionContext

see execution context definitions

default_server_execution_context

thing_execution_context

ThingExecutionContext

see execution context definitions

default_thing_execution_context

Returns:

Name Type Description
message_id bytes

created message ID

Source code in hololinked\core\zmq\brokers.py
async def async_send_request(self, 
                        client_id: str, 
                        thing_id: str, 
                        objekt: str, 
                        operation: str,
                        payload: SerializableData = SerializableNone, 
                        preserialized_payload: PreserializedData = PreserializedEmptyByte,
                        server_execution_context: ServerExecutionContext = default_server_execution_context,
                        thing_execution_context: ThingExecutionContext  = default_thing_execution_context
                    ) -> bytes:
    """
    Send operation to server with instance name. Replies are automatically polled & to be retrieved using 
    ``async_recv_response()``

    Parameters
    ----------
    id: str
        instance name of the server
    operation: str
        unique str identifying a server side or ``Thing`` resource. These values corresponding 
        to automatically extracted name from the object name or the URL_path prepended with the instance name. 
    arguments: Dict[str, Any]
        if the operation invokes a method, arguments of that method. 
    server_execution_context: Dict[str, Any]
        see execution context definitions
    thing_execution_context: Dict[str, Any]
        see execution context definitions

    Returns
    -------
    message_id: bytes
        created message ID
    """
    self.assert_client_ready(self.pool[client_id])
    message_id = await self.pool[client_id].async_send_request(
                                            thing_id=thing_id, 
                                            objekt=objekt,
                                            operation=operation, 
                                            payload=payload,
                                            preserialized_payload=preserialized_payload,
                                            server_execution_context=server_execution_context,
                                            thing_execution_context=thing_execution_context
                                        )
    event = self.event_pool.pop()
    self.events_map[message_id] = event 
    return message_id

create_new

create_new(id: str, server_id: str, transport: str = 'IPC') -> None

Create new server with specified transport. other arguments are taken from pool specifications.

Parameters:

Name Type Description Default

id

str

instance name of server

required

transport

str

transport implemented by ZMQ server

'IPC'
Source code in hololinked\core\zmq\brokers.py
def create_new(self, id: str, server_id: str, transport: str = 'IPC') -> None:
    """
    Create new server with specified transport. other arguments are taken from pool specifications. 

    Parameters
    ----------
    id: str
        instance name of server 
    transport: str
        transport implemented by ZMQ server
    """
    if server_id not in self.pool.keys():
        client = AsyncZMQClient(
                    id=id,
                    server_id=server_id,
                    handshake=True, 
                    transport=transport, 
                    context=self.context, 
                    logger=self.logger
                )
        client._monitor_socket = client.socket.get_monitor_socket()
        self.poller.register(client._monitor_socket, zmq.POLLIN)
        self.pool[server_id] = client
    else: 
        raise ValueError(f"client for instance name '{server_id}' already present in pool")

get_client_id_from_thing_id

get_client_id_from_thing_id(thing_id: str) -> typing.Dict[str, AsyncZMQClient]

map of thing_id to client

Source code in hololinked\core\zmq\brokers.py
def get_client_id_from_thing_id(self, thing_id: str) -> typing.Dict[str, AsyncZMQClient]:
    """
    map of thing_id to client
    """
    raise NotImplementedError("get_client_id_from_thing_id not implemented for MessageMappedZMQClientPool")

handshake

handshake(timeout: int | None = 60000) -> None

automatically called when handshake argument at init is True. When not automatically called, it is necessary to call this method before awaiting handshake_complete().

Source code in hololinked\core\zmq\brokers.py
def handshake(self, timeout: int | None = 60000) -> None:
    """
    automatically called when handshake argument at init is True. When not automatically called, it is necessary
    to call this method before awaiting ``handshake_complete()``.
    """
    for client in self.pool.values():
        run_callable_somehow(client._handshake(timeout))

handshake_complete async

handshake_complete() -> None

wait for handshake to complete for all clients in the pool

Source code in hololinked\core\zmq\brokers.py
async def handshake_complete(self) -> None:
    """
    wait for handshake to complete for all clients in the pool
    """
    for client in self.pool.values():
        await client.handshake_complete() # sufficient to wait serially

ping_all_servers async

ping_all_servers()

ping all servers connected to the client pool, calls ping() on Thing

Source code in hololinked\core\zmq\brokers.py
async def ping_all_servers(self):
    """
    ping all servers connected to the client pool, calls ping() on Thing
    """
    return await self.async_execute_in_all() #operation='invokeAction', objekt=CommonRPC.PING)

poll_responses async

poll_responses() -> None

Poll for replies from server. Since the client is message mapped, this method should be independently started in the event loop. Sending message and retrieving a message mapped is still carried out by other methods.

Source code in hololinked\core\zmq\brokers.py
async def poll_responses(self) -> None:
    """
    Poll for replies from server. Since the client is message mapped, this method should be independently started
    in the event loop. Sending message and retrieving a message mapped is still carried out by other methods.
    """
    self.logger.info("client polling started for sockets for {}".format(list(self.pool.keys())))
    self.stop_poll = False 
    event_loop = asyncio.get_event_loop()
    while not self.stop_poll:
        sockets = await self.poller.poll(self.poll_timeout) # type hints dont work in this line
        for socket, _ in sockets:
            while True:
                try:
                    raw_response = await socket.recv_multipart(zmq.NOBLOCK)
                    response_message = ResponseMessage(raw_response)                          
                except zmq.Again:
                    # errors in handle_message should reach the client. 
                    break
                except ConnectionAbortedError:
                    for client in self.pool.values():
                        if client.socket.get_monitor_socket() == socket:
                            self.poller.unregister(client.socket) # leave the monitor in the pool
                            client.handshake(timeout=None)
                            self.logger.error(f"{client.id} disconnected." +
                                " Unregistering from poller temporarily until server comes back.")
                            break
                else:
                    message_id = response_message.id
                    self.logger.debug(f"received response from server '{response_message.sender_id}' with msg-ID '{message_id}'")
                    if message_id in self.cancelled_messages:
                        self.cancelled_messages.remove(message_id)
                        self.logger.debug(f"msg-ID '{message_id}' cancelled")
                        continue
                    event = self.events_map.get(message_id, None) 
                    final_data = response_message.body
                    # if len(response_message.pre_encoded_payload ) > 0 and response_message.payload:
                    #     final_data = tuple(response_message.body)
                    # elif len(response_message.pre_encoded_payload) > 0:
                    #     final_data = response_message.pre_encoded_payload
                    # else:
                    #     final_data = response_message.payload
                    if event:
                        event.set()
                    else:    
                        invalid_event_task = asyncio.create_task(self._resolve_response(message_id, final_data))
                        event_loop.call_soon(lambda: invalid_event_task)

register

register(client: AsyncZMQClient) -> None

Register a client with the pool.

Parameters:

Name Type Description Default

client

AsyncZMQClient

client to be registered

required
Source code in hololinked\core\zmq\brokers.py
def register(self, client: AsyncZMQClient) -> None:
    """
    Register a client with the pool. 

    Parameters
    ----------
    client: AsyncZMQClient
        client to be registered
    """
    if not isinstance(client, AsyncZMQClient):
        raise TypeError("registration possible for clients only subclass of AsyncZMQClient." +
                       f" Given type {type(client)}")
    self.pool[client.id] = client 
    self.poller.register(client.socket, zmq.POLLIN)
    self.poller.register(client._monitor_socket, zmq.POLLIN)

start_polling

start_polling() -> None

register the server message polling loop in the asyncio event loop.

Source code in hololinked\core\zmq\brokers.py
def start_polling(self) -> None:
    """
    register the server message polling loop in the asyncio event loop. 
    """
    event_loop = asyncio.get_event_loop()
    event_loop.call_soon(lambda: asyncio.create_task(self.poll_responses()))

stop_polling

stop_polling()

stop polling for replies from server

Source code in hololinked\core\zmq\brokers.py
def stop_polling(self):
    """
    stop polling for replies from server
    """
    self.stop_poll = True