Skip to content

hololinked.core.zmq.rpc_server.RPCServer

Bases: BaseZMQServer

The RPCServer implements a infinite loop where ZMQ sockets listen for messages, in any transport layer possible (INPROC, IPC or TCP). Once requests are received, jobs are dispatched to the Thing instances which are being served, with timeouts or any other execution requirements (called execution context). After being executed by a Thing instance, the results are then sent back to the client. Execution information include Thing ID, the property, action or event to be executed (events are usually PUB-SUB and are largely handled by the EventPublisher directly), what operation to do on them (i.e. readproperty, invokeaction etc.), the payload and the execution contexts (like timeouts). This is structured as a JSON.

Jobs determine how to execute the operations on the Thing instance, whether in queued, async or threaded modes. This is their main function. Queued mode is the default as it is assumed that multiple physical operations in the physical world is not always practical.

Default ZMQ transport layer is INPROC, but IPC or TCP can also be added simultaneously using ZMQServer instance instead of RPCServer. The purpose of INPROC being default is that, the RPCServer is the only server implementing the operations directly on the Thing instances. All other protocols like HTTP, MQTT, CoAP etc. will be used to redirect requests to the RPCServer only and do not directly operate on the Thing instances. Instead, the incoming requests in those protocols are converted to the above stated "Jobs" which are in JSON format.

INPROC is the fastest and most efficient way to communicate between multiple independently running loops, whether the loop belongs to a specific protocol's request listener or the RPCServer itself, as it used shared memory. The same INPROC messaging contract is also used for IPC and TCP, thus eliminating the need to separately implement messaging contracts at different layers of communication for ZMQ.

Therefore, if a Thing instance is to be served by a well known protocol, say HTTP, the server behaves like HTTP-RPC.

UML Diagram

Source code in hololinked/hololinked/core/zmq/rpc_server.py
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
class RPCServer(BaseZMQServer):
    """
    The `RPCServer` implements a infinite loop where ZMQ sockets listen for messages, in any transport layer possible
    (`INPROC`, `IPC` or `TCP`). Once requests are received, jobs are dispatched to the `Thing` instances which are being served,
    with timeouts or any other execution requirements (called execution context). After being executed by a `Thing` instance,
    the results are then sent back to the client. Execution information include `Thing` ID, the property, action or
    event to be executed (events are usually PUB-SUB and are largely handled by the `EventPublisher` directly),
    what operation to do on them (i.e. `readproperty`, `invokeaction` etc.), the payload and the execution contexts (like timeouts).
    This is structured as a JSON.

    Jobs determine how to execute the operations on the `Thing` instance, whether in queued, async or threaded modes.
    This is their main function. Queued mode is the default as it is assumed that multiple physical operations in the physical world
    is not always practical.

    Default ZMQ transport layer is `INPROC`, but `IPC` or `TCP` can also be added simultaneously using `ZMQServer`
    instance instead of `RPCServer`. The purpose of `INPROC` being default is that, the `RPCServer` is the only server
    implementing the operations directly on the `Thing`
    instances. All other protocols like HTTP, MQTT, CoAP etc. will be used to redirect requests to the `RPCServer` only
    and do not directly operate on the `Thing` instances. Instead, the incoming requests in those protocols are converted
    to the above stated "Jobs" which are in JSON format.

    `INPROC` is the fastest and most efficient way to communicate between multiple independently running loops,
    whether the loop belongs to a specific protocol's request listener or the `RPCServer` itself, as it used shared memory.
    The same `INPROC` messaging contract is also used for `IPC` and `TCP`, thus eliminating the
    need to separately implement messaging contracts at different layers of communication for ZMQ.

    Therefore, if a `Thing` instance is to be served by a well known protocol, say HTTP, the server behaves like HTTP-RPC.

    [UML Diagram](http://docs.hololinked.dev/UML/PDF/RPCServer.pdf)
    """

    things = TypedList(
        item_type=(Thing,),
        bounds=(0, 100),
        allow_None=True,
        default=None,
        doc="list of Things which are being executed",
        remote=False,
    )  # type: list[Thing]

    schedulers: dict[str, "QueuedScheduler"]

    def __init__(
        self,
        *,
        id: str,
        things: list[Thing] | None = None,
        context: zmq.asyncio.Context | None = None,
        access_point: ZMQ_TRANSPORTS = ZMQ_TRANSPORTS.INPROC,
        **kwargs: dict[str, Any],
    ) -> None:
        """
        Parameters
        ----------
        id: str
            `id` of the server
        things: List[Thing]
            list of `Thing` instances to be served
        context: Optional, zmq.asyncio.Context
            ZeroMQ async Context object to use. Automatically created when None is supplied.
        access_point: ZMQ_TRANSPORTS
            transport layer to be used for the server, default is `INPROC`
        """
        super().__init__(id=id, **kwargs)
        self.things = []
        self.add_things(*(things or []))

        if isinstance(access_point, str):
            access_point = access_point.upper()

        self._run = False  # flag to stop all the servers
        self.context = context or global_config.zmq_context()

        self.req_rep_server = AsyncZMQServer(
            id=self.id,
            context=self.context,
            access_point=access_point,
            poll_timeout=1000,
            **kwargs,
        )
        self.event_publisher = EventPublisher(
            id=f"{self.id}{EventPublisher._standard_address_suffix}",
            context=self.context,
            access_point=access_point,
            **kwargs,
        )
        self.schedulers = dict()

    def __post_init__(self):
        # post init is not called, dont add logic here
        super().__post_init__()
        self.logger.info("Server can be started using run()")

    def add_thing(self, thing: Thing) -> None:
        """Adds a thing to the list of things to serve."""
        # setup scheduling requirements
        all_things = get_all_sub_things_recusively(thing)
        for instance in all_things:
            instance.rpc_server = self
            for action in instance.actions.descriptors.values():
                if action.execution_info.iscoroutine and not action.execution_info.synchronous:
                    self.schedulers[f"{instance.id}.{action.name}.invokeAction"] = AsyncScheduler
                elif not action.execution_info.synchronous:
                    self.schedulers[f"{instance.id}.{action.name}.invokeAction"] = ThreadedScheduler
                # else QueuedScheduler which is default
            # properties need not dealt yet, but may be in future)
        if self.things is None:
            self.things = []
        self.things.append(thing)

    def add_things(self, *things: Thing) -> None:
        """Adds multiple things to the list of things to serve."""
        for thing in things:
            self.add_thing(thing)

    @property
    def is_running(self) -> bool:
        """Check if the server is running or not."""
        return self._run

    async def recv_requests_and_dispatch_jobs(self, server: AsyncZMQServer) -> None:
        """
        Continuously receives messages from different clients and dispatches them as jobs according to the specific
        requirements of a how an object (property/action/event) must be executed (queued/threaded/async).
        Also handles messages that dont need separate jobs like `HANDSHAKE`, `EXIT`, timeouts etc.

        Parameters
        ----------
        server: AsyncZMQServer
            the server instance to poll for requests
        """
        self.logger.debug(f"started polling at socket {server.socket_address}")
        eventloop = asyncio.get_event_loop()
        things = {thing.id: thing for thing in self.things}
        while self._run:
            try:
                request_messages = await server.poll_requests()
                # when stop poll is set, this will exit with an empty list
            except BreakLoop:
                break
            except Exception as ex:
                self.logger.error(f"exception occurred while polling for server - {str(ex)}")
                self.logger.exception(ex)
                continue

            for request_message in request_messages:
                try:
                    # handle invokation timeout
                    invokation_timeout = request_message.server_execution_context.get("invokation_timeout", None)

                    ready_to_process_event = None
                    timeout_task = None
                    if invokation_timeout is not None:
                        ready_to_process_event = asyncio.Event()
                        timeout_task = eventloop.create_task(
                            self._process_timeouts(
                                request_message=request_message,
                                ready_to_process_event=ready_to_process_event,
                                timeout=invokation_timeout,
                                origin_server=server,
                                timeout_type="invokation",
                            )
                        )

                    # check object level scheduling requirements and schedule the message
                    # append to messages list - message, event, timeout task, origin socket
                    job = (
                        server,
                        request_message,
                        timeout_task,
                        ready_to_process_event,
                    )  # type: Scheduler.JobInvokationType
                    if request_message.qualified_operation in self.schedulers:
                        scheduler = self.schedulers[request_message.qualified_operation](
                            things[request_message.thing_id], self
                        )
                    else:
                        scheduler = self.schedulers[request_message.thing_id]
                    scheduler.dispatch_job(job)

                except Exception as ex:
                    # handle invalid message
                    self.logger.error(
                        f"exception occurred for message - {str(ex)}",
                        sender_id=request_message.sender_id,
                        msg_id=request_message.id,
                    )
                    self.logger.exception(ex)
                    eventloop.create_task(server._handle_invalid_message(request_message=request_message, exception=ex))
        self.stop()
        self.logger.info(f"stopped polling at socket {server.socket_address.split(':')[0].upper()}")

    async def tunnel_message_to_things(self, scheduler: "Scheduler") -> None:
        """message tunneler/coordinator between external sockets listening thread and `Thing` object executor thread"""
        self.logger.info("started schedulers")
        eventloop = get_current_async_loop()
        while self._run and scheduler.run:
            # wait for message first
            if not scheduler.has_job:
                await scheduler.wait_for_job()
                # this means in next loop it wont be in this block as a message arrived
                continue

            # retrieve from messages list - message, execution context, event, timeout task, origin socket
            origin_server, request_message, timeout_task, ready_to_process_event = scheduler.next_job
            server_execution_context = request_message.server_execution_context

            # handle invokation timeout
            invokation_timed_out = True
            if ready_to_process_event is not None:
                ready_to_process_event.set()  # releases timeout task
                invokation_timed_out = await timeout_task
            if ready_to_process_event is not None and invokation_timed_out:
                # drop call to thing, timeout message was already sent in _process_timeouts()
                continue

            # handle execution through thing
            scheduler.last_operation_request = scheduler.extract_operation_tuple_from_request(request_message)

            # schedule an execution timeout
            execution_timeout = server_execution_context.get("execution_timeout", None)
            execution_completed_event = None
            execution_timeout_task = None
            execution_timed_out = True
            if execution_timeout is not None:
                execution_completed_event = asyncio.Event()
                execution_timeout_task = eventloop.create_task(
                    self._process_timeouts(
                        request_message=request_message,
                        ready_to_process_event=execution_completed_event,
                        timeout=execution_timeout,
                        origin_server=origin_server,
                        timeout_type="execution",
                    )
                )

            # always wait for reply from thing, since this loop is asyncio task (& in its own thread in RPC server),
            # timeouts always reach client without truly blocking by the GIL. If reply does not arrive, all other requests
            # get invokation timeout.
            # await eventloop.run_in_executor(None, scheduler.wait_for_reply)
            await scheduler.wait_for_reply(eventloop)
            # check if reply is never undefined, Undefined is a sensible placeholder for NotImplemented singleton
            if scheduler.last_operation_reply is Undefined:
                # this is a logic error, as the reply should never be undefined
                await origin_server._handle_error_message(
                    request_message=request_message,
                    exception=RuntimeError("No reply from thing - logic error"),
                )
                continue
            payload, preserialized_payload, reply_message_type = scheduler.last_operation_reply
            scheduler.reset_operation_reply()

            # check if execution completed within time
            if execution_completed_event is not None:
                execution_completed_event.set()  # releases timeout task
                execution_timed_out = await execution_timeout_task
            if execution_timeout_task is not None and execution_timed_out:
                # drop reply to client as timeout was already sent
                continue
            if server_execution_context.get("oneway", False):
                # drop reply if oneway
                continue

            # send reply to client
            await origin_server.async_send_response_with_message_type(
                request_message=request_message,
                message_type=reply_message_type,
                payload=payload,
                preserialized_payload=preserialized_payload,
            )

        scheduler.cleanup()
        self.logger.info("stopped schedulers")

    async def run_thing_instance(self, instance: Thing, scheduler: "Scheduler" | None = None) -> None:
        """
        run a single `Thing` instance in an infinite loop by allowing the scheduler to schedule operations on it.

        Parameters
        ----------
        instance: Thing
            instance of the `Thing`
        scheduler: Optional[Scheduler]
            scheduler that schedules operations on the `Thing` instance, a default is always available.
        """
        logger = self.logger.bind(cls=instance.__class__.__name__, thing_id=instance.id)
        logger.info("starting to run operations on thing")
        instance.logger.info("waiting to receive operations now")
        # if logger.level >= logging.ERROR:
        # sleep added to resolve some issue with logging related IO bound tasks in asyncio - not really clear what it is.
        # This loop crashes for log levels above ERROR without the following statement
        await asyncio.sleep(0.001)
        # TODO - investigate and fix it
        scheduler = scheduler or self.schedulers[instance.id]
        eventloop = get_current_async_loop()

        while self._run and scheduler.run:
            # print("starting to serve thing {}".format(instance.id))
            await scheduler.wait_for_operation(eventloop)
            # await scheduler.wait_for_operation()
            if scheduler.last_operation_request is Undefined:
                logger.warning("No operation request found although an interruption to wait was made, continuing...")
                continue

            try:
                # fetch operation_request which is a tuple of
                # (thing_id, objekt, operation, payload, preserialized_payload, execution_context)
                (
                    thing_id,
                    objekt,
                    operation,
                    payload,
                    preserialized_payload,
                    execution_context,
                ) = scheduler.last_operation_request

                # deserializing the payload required to execute the operation
                payload = payload.deserialize()
                preserialized_payload = preserialized_payload.value
                instance.logger.debug(f"starting execution of operation {operation} on {objekt}")

                # start activities related to thing execution context
                fetch_execution_logs = execution_context.pop("fetch_execution_logs", False)
                if fetch_execution_logs:
                    list_handler = LogHistoryHandler([])
                    list_handler.setLevel(logging.DEBUG)
                    if isinstance(instance.logger, structlog.stdlib.BoundLoggerBase):
                        stdlib_logger = instance.logger._logger
                    else:
                        stdlib_logger = instance.logger
                    list_handler.setFormatter(stdlib_logger.handlers[0].formatter)
                    stdlib_logger.addHandler(list_handler)

                # execute the operation
                return_value = await self.execute_operation(instance, objekt, operation, payload, preserialized_payload)

                # handle return value
                serializer = Serializers.for_object(thing_id, instance.__class__.__name__, objekt)
                content_type_if_no_serializer = Serializers.get_content_type_for_object(
                    thing_id,
                    instance.__class__.__name__,
                    objekt,
                )
                rpayload, rpreserialized_payload = self.format_return_value(
                    return_value,
                    serializer=serializer,
                    content_type_if_no_serializer=content_type_if_no_serializer,
                )

                # complete thing execution context
                if fetch_execution_logs:
                    rpayload.value = dict(return_value=rpayload.value, execution_logs=list_handler.log_list)

                # raise any payload errors now
                rpayload.require_serialized()

                # set reply
                scheduler.last_operation_reply = (rpayload, rpreserialized_payload, REPLY)

            except BreakInnerLoop:
                # exit the loop and stop the thing
                instance.logger.info("exiting event loop")

                # send a reply with None return value
                rpayload, rpreserialized_payload = self.format_return_value(None, Serializers.json)

                # complete thing execution context
                if fetch_execution_logs:
                    rpayload.value = dict(return_value=rpayload.value, execution_logs=list_handler.log_list)

                # set reply, let the message broker decide
                scheduler.last_operation_reply = (rpayload, rpreserialized_payload, None)

                # quit the loop
                break

            except Exception as ex:
                # error occurred while executing the operation
                instance.logger.error(f"error while executing operation - {str(ex)}")
                instance.logger.exception(ex)

                # send a reply with error
                rpayload, rpreserialized_payload = self.format_return_value(
                    dict(exception=format_exception_as_json(ex)), Serializers.json
                )

                # complete thing execution context
                if fetch_execution_logs:
                    rpayload.value["execution_logs"] = list_handler.log_list

                # set error reply
                scheduler.last_operation_reply = (rpayload, rpreserialized_payload, ERROR)

            finally:
                # cleanup
                if fetch_execution_logs:
                    if isinstance(instance.logger, structlog.stdlib.BoundLoggerBase):
                        stdlib_logger = instance.logger._logger
                    else:
                        stdlib_logger = instance.logger
                    stdlib_logger.removeHandler(list_handler)
                instance.logger.debug(f"completed execution of operation {operation} on {objekt}")
        logger.info("stopped running thing")

    async def execute_operation(
        self,
        instance: Thing,
        objekt: str,
        operation: str,
        payload: Any,
        preserialized_payload: bytes,
    ) -> Any:
        """
        Execute a given operation on a thing instance.

        Parameters
        ----------
        instance: Thing
            instance of the thing
        objekt: str
            name of the property, action or event
        operation: str
            operation to be executed on the property, action or event
        payload: Any
            payload to be used for the operation
        preserialized_payload: bytes
            preserialized payload to be used for the operation
        """
        if operation == Operations.readproperty:
            prop = instance.properties[objekt]  # type: Property
            return getattr(instance, prop.name)
        elif operation == Operations.writeproperty:
            prop = instance.properties[objekt]  # type: Property
            if preserialized_payload != EMPTY_BYTE:
                prop_value = preserialized_payload
            else:
                prop_value = payload
            return prop.external_set(instance, prop_value)
        elif operation == Operations.deleteproperty:
            prop = instance.properties[objekt]  # type: Property
            del prop  # raises NotImplementedError when deletion is not implemented which is mostly the case
        elif operation == Operations.invokeaction and objekt == "get_thing_description":
            # special case
            if payload is None:
                payload = dict()
            args = payload.pop("__args__", tuple())
            return self.get_thing_description(instance, *args, **payload)
        elif operation == Operations.invokeaction:
            if payload is None:
                payload = dict()
            args = payload.pop("__args__", tuple())
            # payload then become kwargs
            if preserialized_payload != EMPTY_BYTE:
                args = (preserialized_payload,) + args
            action = instance.actions[objekt]  # type: BoundAction
            if action.execution_info.iscoroutine:
                # the actual scheduling as a purely async task is done by the scheduler, not here,
                # this will be a blocking call
                return await action.external_call(*args, **payload)
            return action.external_call(*args, **payload)
        elif operation == Operations.readmultipleproperties or operation == Operations.readallproperties:
            if objekt is None:
                return instance.properties.get()
            return instance.properties.get(names=objekt)
        elif operation == Operations.writemultipleproperties or operation == Operations.writeallproperties:
            return instance.properties.set(payload)
        raise NotImplementedError(
            "Unimplemented execution path for Thing {} for operation {}".format(instance.id, operation)
        )

    def format_return_value(
        self,
        return_value: Any,
        serializer: BaseSerializer,
        content_type_if_no_serializer: str = "",
    ) -> tuple[SerializableData, PreserializedData]:
        if (
            isinstance(return_value, tuple)
            and len(return_value) == 2
            and (isinstance(return_value[1], bytes) or isinstance(return_value[1], PreserializedData))
        ):
            payload = SerializableData(return_value[0], serializer=serializer, content_type=serializer.content_type)
            if isinstance(return_value[1], bytes):
                preserialized_payload = PreserializedData(return_value[1], content_type=content_type_if_no_serializer)
        elif isinstance(return_value, bytes):
            payload = SerializableData(None, content_type="application/json")
            preserialized_payload = PreserializedData(return_value, content_type=content_type_if_no_serializer)
        elif isinstance(return_value, PreserializedData):
            payload = SerializableData(None, content_type="application/json")
            preserialized_payload = return_value
        else:
            payload = SerializableData(return_value, serializer=serializer, content_type=serializer.content_type)
            preserialized_payload = PreserializedData(EMPTY_BYTE, content_type="text/plain")
        return payload, preserialized_payload

    async def _process_timeouts(
        self,
        request_message: RequestMessage,
        ready_to_process_event: asyncio.Event,
        origin_server: AsyncZMQServer,
        timeout: float | int | None,
        timeout_type: str,
    ) -> bool:
        """
        replies timeout to client if timeout occured along with returning `True` to indicate that.
        If timeout did not occur, the `ready_to_process_event` is set to indicate that the operation can be processed.
        `False` is returned in this case.
        """
        try:
            await asyncio.wait_for(ready_to_process_event.wait(), timeout)
            return False
        except TimeoutError:
            await origin_server._handle_timeout(request_message, timeout_type)
            return True

    def run_zmq_request_listener(self):
        """
        Runs ZMQ's socket polling in an async loop. This method is blocking and is automatically called by `run()`
        method. Please dont call this method when the async loop is already running.
        """
        eventloop = get_current_async_loop()
        existing_tasks = asyncio.all_tasks(eventloop)
        eventloop.run_until_complete(
            asyncio.gather(
                self.recv_requests_and_dispatch_jobs(self.req_rep_server),
                *[self.tunnel_message_to_things(scheduler) for scheduler in self.schedulers.values()],
                *existing_tasks,
            )
        )
        eventloop.close()

    def run_things(self, things: list[Thing]):
        """
        Run loop that executes operations on `Thing` instances. This method is blocking and is called by `run()` method.

        Parameters
        ----------
        things: List[Thing]
            list of `Thing` instances to be executed
        """
        thing_executor_loop = get_current_async_loop()
        self.logger.info(f"starting thing in thread {threading.get_ident()} for {[obj.id for obj in things]}")
        thing_executor_loop.run_until_complete(
            asyncio.gather(*[self.run_thing_instance(instance) for instance in things])
        )
        self.logger.info(f"exiting event loop in thread {threading.get_ident()}")
        thing_executor_loop.close()

    def run(self):
        """
        Start & run the server. This method is blocking.
        Creates job schedulers for each `Thing`, dispatches each `Thing` to its own thread and starts the ZMQ sockets
        request polling loop. Call `stop()` (threadsafe) to stop the server.
        """
        self._run = True
        self.logger.info("starting RPC server")
        for thing in self.things:
            self.schedulers[thing.id] = QueuedScheduler(thing, self)
        threads = dict()  # type: dict[int, threading.Thread]
        for thing in self.things:
            thread = threading.Thread(target=self.run_things, args=([thing],))
            thread.start()
            threads[thread.ident] = thread
        self.run_zmq_request_listener()
        for thread in threads.values():
            thread.join()
        self.logger.info("RPC server stopped")

    def stop(self):
        """Stop the server. This method is threadsafe."""
        self._run = False
        self.req_rep_server.stop_polling()
        for scheduler in self.schedulers.values():
            scheduler.cleanup()

    def exit(self):
        """Stop and try to clean up resources used by the server."""
        try:
            self.stop()
            if self.req_rep_server is not None:
                self.req_rep_server.exit()
            if self.event_publisher is not None:
                self.event_publisher.exit()
        except Exception as ex:
            self.logger.warning(f"Exception occurred while exiting the RPC server - {str(ex)}")

    def __hash__(self):
        return hash(str(self))

    def __eq__(self, other):
        if not isinstance(other, RPCServer):
            return False
        return self.id == other.id

    def __str__(self):
        return f"RPCServer({self.id}, req-rep: {self.req_rep_server.socket_address}, pub-sub: {self.event_publisher.socket_address})"

    def get_thing_description(
        self,
        instance: Thing,
        protocol: str,
        ignore_errors: bool = False,
        skip_names: list[str] = [],
    ) -> dict[str, Any]:
        """
        Get the Thing Description (TD) for a specific Thing instance.

        Parameters
        ----------
        instance: Thing
            The Thing instance for which to retrieve the TD
        protocol: str
            The protocol for which to generate the TD - `INPROC`, `IPC` or `TCP`
        ignore_errors: bool
            Whether to ignore errors while generating the TD. Default is False.
        skip_names: List[str]
            List of property, action or event names to skip while generating the TD. Default is empty list.

        Returns
        -------
        JSON
            The Thing Description in JSON format.
        """
        TM = instance.get_thing_model(ignore_errors=ignore_errors, skip_names=skip_names).json()  # type: dict[str, Any]
        TD = copy.deepcopy(TM)
        from ...td import ActionAffordance, EventAffordance, PropertyAffordance
        from ...td.forms import Form

        if protocol.lower() == "inproc":
            req_rep_socket_address = self.req_rep_server.socket_address
            pub_sub_socket_address = self.event_publisher.socket_address
        elif protocol.lower() == "ipc":
            if not hasattr(self, "ipc_server"):
                raise RuntimeError(
                    "This server cannot generate TD for IPC protocol, consider using thing model directly."
                )
            req_rep_socket_address = self.ipc_server.socket_address
            pub_sub_socket_address = self.ipc_event_publisher.socket_address
        elif protocol.lower() == "tcp":
            if not hasattr(self, "tcp_server"):
                raise RuntimeError(
                    "This server cannot generate TD for TCP protocol, consider using thing model directly."
                )
            req_rep_socket_address = self.tcp_server.socket_address  # type: str
            req_rep_socket_address = req_rep_socket_address.replace(
                "*", socket.gethostname()
            ).replace(
                "0.0.0.0", socket.gethostname()
            )  # SAST(id='hololinked.core.zmq.rpc_server.RPCServer.get_thing_description.req_rep_socket_address', description='B104:hardcoded_bind_all_interfaces', tool='bandit')
            pub_sub_socket_address = self.tcp_event_publisher.socket_address  # type: str
            pub_sub_socket_address = pub_sub_socket_address.replace(
                "*", socket.gethostname()
            ).replace(
                "0.0.0.0", socket.gethostname()
            )  # SAST(id='hololinked.core.zmq.rpc_server.RPCServer.get_thing_description.pub_sub_socket_address', description='B104:hardcoded_bind_all_interfaces', tool='bandit')
        else:
            raise ValueError(f"Unsupported protocol '{protocol}' for ZMQ.")

        for name in TM.get("properties", []):
            try:
                affordance = PropertyAffordance.from_TD(name, TM)
                if not TD["properties"][name].get("forms", None):
                    TD["properties"][name]["forms"] = []

                form = Form()
                form.href = req_rep_socket_address
                form.op = Operations.readproperty

                content_type = Serializers.get_content_type_for_object(instance.id, instance.__class__.__name__, name)
                if not content_type:
                    content_type = Serializers.for_object(instance.id, instance.__class__.__name__, name).content_type
                form.contentType = content_type

                TD["properties"][name]["forms"].append(form.json())

                if not affordance.readOnly:
                    form = Form()
                    form.href = req_rep_socket_address
                    form.op = Operations.writeproperty
                    content_type = Serializers.get_content_type_for_object(
                        instance.id,
                        instance.__class__.__name__,
                        name,
                    )
                    if not content_type:
                        content_type = Serializers.for_object(
                            instance.id,
                            instance.__class__.__name__,
                            name,
                        ).content_type
                    form.contentType = content_type
                    TD["properties"][name]["forms"].append(form.json())

                if affordance.observable:
                    form = Form()
                    form.href = pub_sub_socket_address
                    form.op = Operations.observeproperty
                    content_type = Serializers.get_content_type_for_object(
                        instance.id, instance.__class__.__name__, name
                    )
                    if not content_type:
                        content_type = Serializers.for_object(
                            instance.id,
                            instance.__class__.__name__,
                            name,
                        ).content_type
                    form.contentType = content_type
                    TD["properties"][name]["forms"].append(form.json())
            except Exception as ex:
                if not ignore_errors:
                    raise ex from None
                instance.logger.warning("error while generating TD forms for property", name=name, error=str(ex))

        for name in TM.get("actions", []):
            try:
                affordance = ActionAffordance.from_TD(name, TM)
                if not TD["actions"][name].get("forms", None):
                    TD["actions"][name]["forms"] = []

                form = Form()
                form.href = req_rep_socket_address
                form.op = Operations.invokeaction
                content_type = Serializers.get_content_type_for_object(instance.id, instance.__class__.__name__, name)
                if not content_type:
                    content_type = Serializers.for_object(instance.id, instance.__class__.__name__, name).content_type
                form.contentType = content_type
                TD["actions"][name]["forms"].append(form.json())
            except Exception as ex:
                if not ignore_errors:
                    raise ex from None
                instance.logger.warning("error while generating TD forms for action", name=name, error=str(ex))

        for name in TM.get("events", []):
            try:
                affordance = EventAffordance.from_TD(name, TM)
                if not TD["events"][name].get("forms", None):
                    TD["events"][name]["forms"] = []

                form = Form()
                form.href = pub_sub_socket_address
                form.op = Operations.subscribeevent
                content_type = Serializers.get_content_type_for_object(instance.id, instance.__class__.__name__, name)
                if not content_type:
                    content_type = Serializers.for_object(instance.id, instance.__class__.__name__, name).content_type
                form.contentType = content_type
                TD["events"][name]["forms"].append(form.json())
            except Exception as ex:
                if not ignore_errors:
                    raise ex from None
                instance.logger.warning("error while generating TD forms for event", name=name, error=str(ex))

        return TD

Functions

__init__

__init__(*, id: str, things: list[Thing] | None = None, context: Context | None = None, access_point: ZMQ_TRANSPORTS = ZMQ_TRANSPORTS.INPROC, **kwargs: dict[str, Any]) -> None

Parameters:

Name Type Description Default

id

str

id of the server

required

things

list[Thing] | None

list of Thing instances to be served

None

context

Context | None

ZeroMQ async Context object to use. Automatically created when None is supplied.

None

access_point

ZMQ_TRANSPORTS

transport layer to be used for the server, default is INPROC

INPROC
Source code in hololinked/hololinked/core/zmq/rpc_server.py
def __init__(
    self,
    *,
    id: str,
    things: list[Thing] | None = None,
    context: zmq.asyncio.Context | None = None,
    access_point: ZMQ_TRANSPORTS = ZMQ_TRANSPORTS.INPROC,
    **kwargs: dict[str, Any],
) -> None:
    """
    Parameters
    ----------
    id: str
        `id` of the server
    things: List[Thing]
        list of `Thing` instances to be served
    context: Optional, zmq.asyncio.Context
        ZeroMQ async Context object to use. Automatically created when None is supplied.
    access_point: ZMQ_TRANSPORTS
        transport layer to be used for the server, default is `INPROC`
    """
    super().__init__(id=id, **kwargs)
    self.things = []
    self.add_things(*(things or []))

    if isinstance(access_point, str):
        access_point = access_point.upper()

    self._run = False  # flag to stop all the servers
    self.context = context or global_config.zmq_context()

    self.req_rep_server = AsyncZMQServer(
        id=self.id,
        context=self.context,
        access_point=access_point,
        poll_timeout=1000,
        **kwargs,
    )
    self.event_publisher = EventPublisher(
        id=f"{self.id}{EventPublisher._standard_address_suffix}",
        context=self.context,
        access_point=access_point,
        **kwargs,
    )
    self.schedulers = dict()

run

run()

Start & run the server. This method is blocking. Creates job schedulers for each Thing, dispatches each Thing to its own thread and starts the ZMQ sockets request polling loop. Call stop() (threadsafe) to stop the server.

Source code in hololinked/hololinked/core/zmq/rpc_server.py
def run(self):
    """
    Start & run the server. This method is blocking.
    Creates job schedulers for each `Thing`, dispatches each `Thing` to its own thread and starts the ZMQ sockets
    request polling loop. Call `stop()` (threadsafe) to stop the server.
    """
    self._run = True
    self.logger.info("starting RPC server")
    for thing in self.things:
        self.schedulers[thing.id] = QueuedScheduler(thing, self)
    threads = dict()  # type: dict[int, threading.Thread]
    for thing in self.things:
        thread = threading.Thread(target=self.run_things, args=([thing],))
        thread.start()
        threads[thread.ident] = thread
    self.run_zmq_request_listener()
    for thread in threads.values():
        thread.join()
    self.logger.info("RPC server stopped")

stop

stop()

Stop the server. This method is threadsafe.

Source code in hololinked/hololinked/core/zmq/rpc_server.py
def stop(self):
    """Stop the server. This method is threadsafe."""
    self._run = False
    self.req_rep_server.stop_polling()
    for scheduler in self.schedulers.values():
        scheduler.cleanup()

exit

exit()

Stop and try to clean up resources used by the server.

Source code in hololinked/hololinked/core/zmq/rpc_server.py
def exit(self):
    """Stop and try to clean up resources used by the server."""
    try:
        self.stop()
        if self.req_rep_server is not None:
            self.req_rep_server.exit()
        if self.event_publisher is not None:
            self.event_publisher.exit()
    except Exception as ex:
        self.logger.warning(f"Exception occurred while exiting the RPC server - {str(ex)}")

get_thing_description

get_thing_description(instance: Thing, protocol: str, ignore_errors: bool = False, skip_names: list[str] = []) -> dict[str, Any]

Get the Thing Description (TD) for a specific Thing instance.

Parameters:

Name Type Description Default

instance

Thing

The Thing instance for which to retrieve the TD

required

protocol

str

The protocol for which to generate the TD - INPROC, IPC or TCP

required

ignore_errors

bool

Whether to ignore errors while generating the TD. Default is False.

False

skip_names

list[str]

List of property, action or event names to skip while generating the TD. Default is empty list.

[]

Returns:

Type Description
JSON

The Thing Description in JSON format.

Source code in hololinked/hololinked/core/zmq/rpc_server.py
def get_thing_description(
    self,
    instance: Thing,
    protocol: str,
    ignore_errors: bool = False,
    skip_names: list[str] = [],
) -> dict[str, Any]:
    """
    Get the Thing Description (TD) for a specific Thing instance.

    Parameters
    ----------
    instance: Thing
        The Thing instance for which to retrieve the TD
    protocol: str
        The protocol for which to generate the TD - `INPROC`, `IPC` or `TCP`
    ignore_errors: bool
        Whether to ignore errors while generating the TD. Default is False.
    skip_names: List[str]
        List of property, action or event names to skip while generating the TD. Default is empty list.

    Returns
    -------
    JSON
        The Thing Description in JSON format.
    """
    TM = instance.get_thing_model(ignore_errors=ignore_errors, skip_names=skip_names).json()  # type: dict[str, Any]
    TD = copy.deepcopy(TM)
    from ...td import ActionAffordance, EventAffordance, PropertyAffordance
    from ...td.forms import Form

    if protocol.lower() == "inproc":
        req_rep_socket_address = self.req_rep_server.socket_address
        pub_sub_socket_address = self.event_publisher.socket_address
    elif protocol.lower() == "ipc":
        if not hasattr(self, "ipc_server"):
            raise RuntimeError(
                "This server cannot generate TD for IPC protocol, consider using thing model directly."
            )
        req_rep_socket_address = self.ipc_server.socket_address
        pub_sub_socket_address = self.ipc_event_publisher.socket_address
    elif protocol.lower() == "tcp":
        if not hasattr(self, "tcp_server"):
            raise RuntimeError(
                "This server cannot generate TD for TCP protocol, consider using thing model directly."
            )
        req_rep_socket_address = self.tcp_server.socket_address  # type: str
        req_rep_socket_address = req_rep_socket_address.replace(
            "*", socket.gethostname()
        ).replace(
            "0.0.0.0", socket.gethostname()
        )  # SAST(id='hololinked.core.zmq.rpc_server.RPCServer.get_thing_description.req_rep_socket_address', description='B104:hardcoded_bind_all_interfaces', tool='bandit')
        pub_sub_socket_address = self.tcp_event_publisher.socket_address  # type: str
        pub_sub_socket_address = pub_sub_socket_address.replace(
            "*", socket.gethostname()
        ).replace(
            "0.0.0.0", socket.gethostname()
        )  # SAST(id='hololinked.core.zmq.rpc_server.RPCServer.get_thing_description.pub_sub_socket_address', description='B104:hardcoded_bind_all_interfaces', tool='bandit')
    else:
        raise ValueError(f"Unsupported protocol '{protocol}' for ZMQ.")

    for name in TM.get("properties", []):
        try:
            affordance = PropertyAffordance.from_TD(name, TM)
            if not TD["properties"][name].get("forms", None):
                TD["properties"][name]["forms"] = []

            form = Form()
            form.href = req_rep_socket_address
            form.op = Operations.readproperty

            content_type = Serializers.get_content_type_for_object(instance.id, instance.__class__.__name__, name)
            if not content_type:
                content_type = Serializers.for_object(instance.id, instance.__class__.__name__, name).content_type
            form.contentType = content_type

            TD["properties"][name]["forms"].append(form.json())

            if not affordance.readOnly:
                form = Form()
                form.href = req_rep_socket_address
                form.op = Operations.writeproperty
                content_type = Serializers.get_content_type_for_object(
                    instance.id,
                    instance.__class__.__name__,
                    name,
                )
                if not content_type:
                    content_type = Serializers.for_object(
                        instance.id,
                        instance.__class__.__name__,
                        name,
                    ).content_type
                form.contentType = content_type
                TD["properties"][name]["forms"].append(form.json())

            if affordance.observable:
                form = Form()
                form.href = pub_sub_socket_address
                form.op = Operations.observeproperty
                content_type = Serializers.get_content_type_for_object(
                    instance.id, instance.__class__.__name__, name
                )
                if not content_type:
                    content_type = Serializers.for_object(
                        instance.id,
                        instance.__class__.__name__,
                        name,
                    ).content_type
                form.contentType = content_type
                TD["properties"][name]["forms"].append(form.json())
        except Exception as ex:
            if not ignore_errors:
                raise ex from None
            instance.logger.warning("error while generating TD forms for property", name=name, error=str(ex))

    for name in TM.get("actions", []):
        try:
            affordance = ActionAffordance.from_TD(name, TM)
            if not TD["actions"][name].get("forms", None):
                TD["actions"][name]["forms"] = []

            form = Form()
            form.href = req_rep_socket_address
            form.op = Operations.invokeaction
            content_type = Serializers.get_content_type_for_object(instance.id, instance.__class__.__name__, name)
            if not content_type:
                content_type = Serializers.for_object(instance.id, instance.__class__.__name__, name).content_type
            form.contentType = content_type
            TD["actions"][name]["forms"].append(form.json())
        except Exception as ex:
            if not ignore_errors:
                raise ex from None
            instance.logger.warning("error while generating TD forms for action", name=name, error=str(ex))

    for name in TM.get("events", []):
        try:
            affordance = EventAffordance.from_TD(name, TM)
            if not TD["events"][name].get("forms", None):
                TD["events"][name]["forms"] = []

            form = Form()
            form.href = pub_sub_socket_address
            form.op = Operations.subscribeevent
            content_type = Serializers.get_content_type_for_object(instance.id, instance.__class__.__name__, name)
            if not content_type:
                content_type = Serializers.for_object(instance.id, instance.__class__.__name__, name).content_type
            form.contentType = content_type
            TD["events"][name]["forms"].append(form.json())
        except Exception as ex:
            if not ignore_errors:
                raise ex from None
            instance.logger.warning("error while generating TD forms for event", name=name, error=str(ex))

    return TD

run_zmq_request_listener

run_zmq_request_listener()

Runs ZMQ's socket polling in an async loop. This method is blocking and is automatically called by run() method. Please dont call this method when the async loop is already running.

Source code in hololinked/hololinked/core/zmq/rpc_server.py
def run_zmq_request_listener(self):
    """
    Runs ZMQ's socket polling in an async loop. This method is blocking and is automatically called by `run()`
    method. Please dont call this method when the async loop is already running.
    """
    eventloop = get_current_async_loop()
    existing_tasks = asyncio.all_tasks(eventloop)
    eventloop.run_until_complete(
        asyncio.gather(
            self.recv_requests_and_dispatch_jobs(self.req_rep_server),
            *[self.tunnel_message_to_things(scheduler) for scheduler in self.schedulers.values()],
            *existing_tasks,
        )
    )
    eventloop.close()

recv_requests_and_dispatch_jobs async

recv_requests_and_dispatch_jobs(server: AsyncZMQServer) -> None

Continuously receives messages from different clients and dispatches them as jobs according to the specific requirements of a how an object (property/action/event) must be executed (queued/threaded/async). Also handles messages that dont need separate jobs like HANDSHAKE, EXIT, timeouts etc.

Parameters:

Name Type Description Default

server

AsyncZMQServer

the server instance to poll for requests

required
Source code in hololinked/hololinked/core/zmq/rpc_server.py
async def recv_requests_and_dispatch_jobs(self, server: AsyncZMQServer) -> None:
    """
    Continuously receives messages from different clients and dispatches them as jobs according to the specific
    requirements of a how an object (property/action/event) must be executed (queued/threaded/async).
    Also handles messages that dont need separate jobs like `HANDSHAKE`, `EXIT`, timeouts etc.

    Parameters
    ----------
    server: AsyncZMQServer
        the server instance to poll for requests
    """
    self.logger.debug(f"started polling at socket {server.socket_address}")
    eventloop = asyncio.get_event_loop()
    things = {thing.id: thing for thing in self.things}
    while self._run:
        try:
            request_messages = await server.poll_requests()
            # when stop poll is set, this will exit with an empty list
        except BreakLoop:
            break
        except Exception as ex:
            self.logger.error(f"exception occurred while polling for server - {str(ex)}")
            self.logger.exception(ex)
            continue

        for request_message in request_messages:
            try:
                # handle invokation timeout
                invokation_timeout = request_message.server_execution_context.get("invokation_timeout", None)

                ready_to_process_event = None
                timeout_task = None
                if invokation_timeout is not None:
                    ready_to_process_event = asyncio.Event()
                    timeout_task = eventloop.create_task(
                        self._process_timeouts(
                            request_message=request_message,
                            ready_to_process_event=ready_to_process_event,
                            timeout=invokation_timeout,
                            origin_server=server,
                            timeout_type="invokation",
                        )
                    )

                # check object level scheduling requirements and schedule the message
                # append to messages list - message, event, timeout task, origin socket
                job = (
                    server,
                    request_message,
                    timeout_task,
                    ready_to_process_event,
                )  # type: Scheduler.JobInvokationType
                if request_message.qualified_operation in self.schedulers:
                    scheduler = self.schedulers[request_message.qualified_operation](
                        things[request_message.thing_id], self
                    )
                else:
                    scheduler = self.schedulers[request_message.thing_id]
                scheduler.dispatch_job(job)

            except Exception as ex:
                # handle invalid message
                self.logger.error(
                    f"exception occurred for message - {str(ex)}",
                    sender_id=request_message.sender_id,
                    msg_id=request_message.id,
                )
                self.logger.exception(ex)
                eventloop.create_task(server._handle_invalid_message(request_message=request_message, exception=ex))
    self.stop()
    self.logger.info(f"stopped polling at socket {server.socket_address.split(':')[0].upper()}")

tunnel_message_to_things async

tunnel_message_to_things(scheduler: 'Scheduler') -> None

message tunneler/coordinator between external sockets listening thread and Thing object executor thread

Source code in hololinked/hololinked/core/zmq/rpc_server.py
async def tunnel_message_to_things(self, scheduler: "Scheduler") -> None:
    """message tunneler/coordinator between external sockets listening thread and `Thing` object executor thread"""
    self.logger.info("started schedulers")
    eventloop = get_current_async_loop()
    while self._run and scheduler.run:
        # wait for message first
        if not scheduler.has_job:
            await scheduler.wait_for_job()
            # this means in next loop it wont be in this block as a message arrived
            continue

        # retrieve from messages list - message, execution context, event, timeout task, origin socket
        origin_server, request_message, timeout_task, ready_to_process_event = scheduler.next_job
        server_execution_context = request_message.server_execution_context

        # handle invokation timeout
        invokation_timed_out = True
        if ready_to_process_event is not None:
            ready_to_process_event.set()  # releases timeout task
            invokation_timed_out = await timeout_task
        if ready_to_process_event is not None and invokation_timed_out:
            # drop call to thing, timeout message was already sent in _process_timeouts()
            continue

        # handle execution through thing
        scheduler.last_operation_request = scheduler.extract_operation_tuple_from_request(request_message)

        # schedule an execution timeout
        execution_timeout = server_execution_context.get("execution_timeout", None)
        execution_completed_event = None
        execution_timeout_task = None
        execution_timed_out = True
        if execution_timeout is not None:
            execution_completed_event = asyncio.Event()
            execution_timeout_task = eventloop.create_task(
                self._process_timeouts(
                    request_message=request_message,
                    ready_to_process_event=execution_completed_event,
                    timeout=execution_timeout,
                    origin_server=origin_server,
                    timeout_type="execution",
                )
            )

        # always wait for reply from thing, since this loop is asyncio task (& in its own thread in RPC server),
        # timeouts always reach client without truly blocking by the GIL. If reply does not arrive, all other requests
        # get invokation timeout.
        # await eventloop.run_in_executor(None, scheduler.wait_for_reply)
        await scheduler.wait_for_reply(eventloop)
        # check if reply is never undefined, Undefined is a sensible placeholder for NotImplemented singleton
        if scheduler.last_operation_reply is Undefined:
            # this is a logic error, as the reply should never be undefined
            await origin_server._handle_error_message(
                request_message=request_message,
                exception=RuntimeError("No reply from thing - logic error"),
            )
            continue
        payload, preserialized_payload, reply_message_type = scheduler.last_operation_reply
        scheduler.reset_operation_reply()

        # check if execution completed within time
        if execution_completed_event is not None:
            execution_completed_event.set()  # releases timeout task
            execution_timed_out = await execution_timeout_task
        if execution_timeout_task is not None and execution_timed_out:
            # drop reply to client as timeout was already sent
            continue
        if server_execution_context.get("oneway", False):
            # drop reply if oneway
            continue

        # send reply to client
        await origin_server.async_send_response_with_message_type(
            request_message=request_message,
            message_type=reply_message_type,
            payload=payload,
            preserialized_payload=preserialized_payload,
        )

    scheduler.cleanup()
    self.logger.info("stopped schedulers")

run_things

run_things(things: list[Thing])

Run loop that executes operations on Thing instances. This method is blocking and is called by run() method.

Parameters:

Name Type Description Default

things

list[Thing]

list of Thing instances to be executed

required
Source code in hololinked/hololinked/core/zmq/rpc_server.py
def run_things(self, things: list[Thing]):
    """
    Run loop that executes operations on `Thing` instances. This method is blocking and is called by `run()` method.

    Parameters
    ----------
    things: List[Thing]
        list of `Thing` instances to be executed
    """
    thing_executor_loop = get_current_async_loop()
    self.logger.info(f"starting thing in thread {threading.get_ident()} for {[obj.id for obj in things]}")
    thing_executor_loop.run_until_complete(
        asyncio.gather(*[self.run_thing_instance(instance) for instance in things])
    )
    self.logger.info(f"exiting event loop in thread {threading.get_ident()}")
    thing_executor_loop.close()

run_thing_instance async

run_thing_instance(instance: Thing, scheduler: 'Scheduler' | None = None) -> None

run a single Thing instance in an infinite loop by allowing the scheduler to schedule operations on it.

Parameters:

Name Type Description Default

instance

Thing

instance of the Thing

required

scheduler

'Scheduler' | None

scheduler that schedules operations on the Thing instance, a default is always available.

None
Source code in hololinked/hololinked/core/zmq/rpc_server.py
async def run_thing_instance(self, instance: Thing, scheduler: "Scheduler" | None = None) -> None:
    """
    run a single `Thing` instance in an infinite loop by allowing the scheduler to schedule operations on it.

    Parameters
    ----------
    instance: Thing
        instance of the `Thing`
    scheduler: Optional[Scheduler]
        scheduler that schedules operations on the `Thing` instance, a default is always available.
    """
    logger = self.logger.bind(cls=instance.__class__.__name__, thing_id=instance.id)
    logger.info("starting to run operations on thing")
    instance.logger.info("waiting to receive operations now")
    # if logger.level >= logging.ERROR:
    # sleep added to resolve some issue with logging related IO bound tasks in asyncio - not really clear what it is.
    # This loop crashes for log levels above ERROR without the following statement
    await asyncio.sleep(0.001)
    # TODO - investigate and fix it
    scheduler = scheduler or self.schedulers[instance.id]
    eventloop = get_current_async_loop()

    while self._run and scheduler.run:
        # print("starting to serve thing {}".format(instance.id))
        await scheduler.wait_for_operation(eventloop)
        # await scheduler.wait_for_operation()
        if scheduler.last_operation_request is Undefined:
            logger.warning("No operation request found although an interruption to wait was made, continuing...")
            continue

        try:
            # fetch operation_request which is a tuple of
            # (thing_id, objekt, operation, payload, preserialized_payload, execution_context)
            (
                thing_id,
                objekt,
                operation,
                payload,
                preserialized_payload,
                execution_context,
            ) = scheduler.last_operation_request

            # deserializing the payload required to execute the operation
            payload = payload.deserialize()
            preserialized_payload = preserialized_payload.value
            instance.logger.debug(f"starting execution of operation {operation} on {objekt}")

            # start activities related to thing execution context
            fetch_execution_logs = execution_context.pop("fetch_execution_logs", False)
            if fetch_execution_logs:
                list_handler = LogHistoryHandler([])
                list_handler.setLevel(logging.DEBUG)
                if isinstance(instance.logger, structlog.stdlib.BoundLoggerBase):
                    stdlib_logger = instance.logger._logger
                else:
                    stdlib_logger = instance.logger
                list_handler.setFormatter(stdlib_logger.handlers[0].formatter)
                stdlib_logger.addHandler(list_handler)

            # execute the operation
            return_value = await self.execute_operation(instance, objekt, operation, payload, preserialized_payload)

            # handle return value
            serializer = Serializers.for_object(thing_id, instance.__class__.__name__, objekt)
            content_type_if_no_serializer = Serializers.get_content_type_for_object(
                thing_id,
                instance.__class__.__name__,
                objekt,
            )
            rpayload, rpreserialized_payload = self.format_return_value(
                return_value,
                serializer=serializer,
                content_type_if_no_serializer=content_type_if_no_serializer,
            )

            # complete thing execution context
            if fetch_execution_logs:
                rpayload.value = dict(return_value=rpayload.value, execution_logs=list_handler.log_list)

            # raise any payload errors now
            rpayload.require_serialized()

            # set reply
            scheduler.last_operation_reply = (rpayload, rpreserialized_payload, REPLY)

        except BreakInnerLoop:
            # exit the loop and stop the thing
            instance.logger.info("exiting event loop")

            # send a reply with None return value
            rpayload, rpreserialized_payload = self.format_return_value(None, Serializers.json)

            # complete thing execution context
            if fetch_execution_logs:
                rpayload.value = dict(return_value=rpayload.value, execution_logs=list_handler.log_list)

            # set reply, let the message broker decide
            scheduler.last_operation_reply = (rpayload, rpreserialized_payload, None)

            # quit the loop
            break

        except Exception as ex:
            # error occurred while executing the operation
            instance.logger.error(f"error while executing operation - {str(ex)}")
            instance.logger.exception(ex)

            # send a reply with error
            rpayload, rpreserialized_payload = self.format_return_value(
                dict(exception=format_exception_as_json(ex)), Serializers.json
            )

            # complete thing execution context
            if fetch_execution_logs:
                rpayload.value["execution_logs"] = list_handler.log_list

            # set error reply
            scheduler.last_operation_reply = (rpayload, rpreserialized_payload, ERROR)

        finally:
            # cleanup
            if fetch_execution_logs:
                if isinstance(instance.logger, structlog.stdlib.BoundLoggerBase):
                    stdlib_logger = instance.logger._logger
                else:
                    stdlib_logger = instance.logger
                stdlib_logger.removeHandler(list_handler)
            instance.logger.debug(f"completed execution of operation {operation} on {objekt}")
    logger.info("stopped running thing")

execute_operation async

execute_operation(instance: Thing, objekt: str, operation: str, payload: Any, preserialized_payload: bytes) -> Any

Execute a given operation on a thing instance.

Parameters:

Name Type Description Default

instance

Thing

instance of the thing

required

objekt

str

name of the property, action or event

required

operation

str

operation to be executed on the property, action or event

required

payload

Any

payload to be used for the operation

required

preserialized_payload

bytes

preserialized payload to be used for the operation

required
Source code in hololinked/hololinked/core/zmq/rpc_server.py
async def execute_operation(
    self,
    instance: Thing,
    objekt: str,
    operation: str,
    payload: Any,
    preserialized_payload: bytes,
) -> Any:
    """
    Execute a given operation on a thing instance.

    Parameters
    ----------
    instance: Thing
        instance of the thing
    objekt: str
        name of the property, action or event
    operation: str
        operation to be executed on the property, action or event
    payload: Any
        payload to be used for the operation
    preserialized_payload: bytes
        preserialized payload to be used for the operation
    """
    if operation == Operations.readproperty:
        prop = instance.properties[objekt]  # type: Property
        return getattr(instance, prop.name)
    elif operation == Operations.writeproperty:
        prop = instance.properties[objekt]  # type: Property
        if preserialized_payload != EMPTY_BYTE:
            prop_value = preserialized_payload
        else:
            prop_value = payload
        return prop.external_set(instance, prop_value)
    elif operation == Operations.deleteproperty:
        prop = instance.properties[objekt]  # type: Property
        del prop  # raises NotImplementedError when deletion is not implemented which is mostly the case
    elif operation == Operations.invokeaction and objekt == "get_thing_description":
        # special case
        if payload is None:
            payload = dict()
        args = payload.pop("__args__", tuple())
        return self.get_thing_description(instance, *args, **payload)
    elif operation == Operations.invokeaction:
        if payload is None:
            payload = dict()
        args = payload.pop("__args__", tuple())
        # payload then become kwargs
        if preserialized_payload != EMPTY_BYTE:
            args = (preserialized_payload,) + args
        action = instance.actions[objekt]  # type: BoundAction
        if action.execution_info.iscoroutine:
            # the actual scheduling as a purely async task is done by the scheduler, not here,
            # this will be a blocking call
            return await action.external_call(*args, **payload)
        return action.external_call(*args, **payload)
    elif operation == Operations.readmultipleproperties or operation == Operations.readallproperties:
        if objekt is None:
            return instance.properties.get()
        return instance.properties.get(names=objekt)
    elif operation == Operations.writemultipleproperties or operation == Operations.writeallproperties:
        return instance.properties.set(payload)
    raise NotImplementedError(
        "Unimplemented execution path for Thing {} for operation {}".format(instance.id, operation)
    )

Attributes

id

str
instance-attribute, writable
ID of the RPC server, used to direct requests from client. Must be unique. For IPC & INPROC sockets, ID is used to create the socket as well. For TCP, it is used for message routing.

things

List[Thing]
instance-attribute, read-only
Thing objects that will be served by this RPC server.

logger

logging.Logger
logger instance

req_rep_server

AsyncZMQServer
instance-attribute, read-only
ZMQ server that handler request-reply pattern. Used for properties & actions.

event_publisher

EventPublisher
instance-attribute, read-only
ZMQ servers that publishes events to clients in publish-subscribe pattern.

schedulers

tying.Dict[str, Scheduler]
instance-attribute
A map of thing ID to a Scheduler object. For actions requiring special scheduling, additional schdulers with ID "(thing ID).(action name).(opertaion)" is created.