Skip to content

hololinked.core.zmq.rpc_server.RPCServer

Bases: BaseZMQServer

The RPCServer implements a infinite loop where ZMQ sockets listen for messages, in any transport layer possible (INPROC, IPC or TCP). Once requests are received, jobs are dispatched to the Thing instances which are being served, with timeouts or any other execution requirements (called execution context). Within the jobs, the requested operation information is made available which is extracted and executed by a Thing instance. The results are then sent back to the client. Operations information include Thing ID, the property, action or event to be executed (events are usually PUB-SUB and are largely handled by the EventPublisher directly), what to do on them (readProperty, invokeAction etc.), the payload and the execution contexts (like timeouts). This is structured as a JSON.

Jobs determine how to execute the operations on the Thing instance, whether in queued, async or threaded modes. This is their main function. Queued mode is the default as it is assumed that multiple physical operations in the physical world is not always practical. Jobs also help the Thing instance to retrieve operation information from a request object.

Default ZMQ transport layer is INPROC, but IPC or TCP can also be added simultaneously. The purpose of INPROC being default is that, the RPCServer is the only server implementing the operations directly on the Thing instances. All other protocols like HTTP, MQTT, CoAP etc. will be used to send requests to the RPCServer only and do not directly operate on the Thing instances. Instead, the incoming requests in other protocols are converted to the above stated "Operation Information" which are in JSON. INPROC is the fastest and most efficient way to communicate between multiple independently running loops, whether the loop belongs to a specific protocol's request listener or the RPCServer itself. The same INPROC messaging contract is also used for IPC and TCP, thus eliminating the need to separately implement messaging contracts at different layers of communication.

Therefore, if a Thing instance is to be served by a well known protocol, say HTTP, the server behaves like HTTP-RPC.

UML Diagram

Source code in hololinked\core\zmq\rpc_server.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
class RPCServer(BaseZMQServer):
    """
    The `RPCServer` implements a infinite loop where ZMQ sockets listen for messages, in any transport layer possible
    (`INPROC`, `IPC` or `TCP`). Once requests are received, jobs are dispatched to the `Thing` instances which are being served, 
    with timeouts or any other execution requirements (called execution context). Within the jobs, the requested
    operation information is made available which is extracted and executed by a `Thing` instance. 
    The results are then sent back to the client. Operations information include `Thing` ID, the property, action or 
    event to be executed (events are usually PUB-SUB and are largely handled by the `EventPublisher` directly), 
    what to do on them (`readProperty`, `invokeAction` etc.), the payload and the execution contexts (like timeouts).
    This is structured as a JSON. 

    Jobs determine how to execute the operations on the `Thing` instance, whether in queued, async or threaded modes.
    This is their main function. Queued mode is the default as it is assumed that multiple physical operations in the physical world is not 
    always practical. Jobs also help the `Thing` instance to retrieve operation information from a request object. 

    Default ZMQ transport layer is `INPROC`, but `IPC` or `TCP` can also be added simultaneously. The purpose of `INPROC` 
    being default is that, the `RPCServer` is the only server implementing the operations directly on the `Thing` 
    instances. All other protocols like HTTP, MQTT, CoAP etc. will be used to send requests to the `RPCServer` only 
    and do not directly operate on the `Thing` instances. Instead, the incoming requests in other protocols are converted 
    to the above stated "Operation Information" which are in JSON. `INPROC` is the fastest and most efficient way to communicate between
    multiple independently running loops, whether the loop belongs to a specific protocol's request listener or 
    the `RPCServer` itself. The same `INPROC` messaging contract is also used for `IPC` and `TCP`, thus eliminating the 
    need to separately implement messaging contracts at different layers of communication.

    Therefore, if a `Thing` instance is to be served by a well known protocol, say HTTP, the server behaves like HTTP-RPC. 

    [UML Diagram](http://localhost:8000/UML/PDF/RPCServer.pdf)
    """

    things = TypedDict(key_type=(str,), item_type=(Thing,), bounds=(0, 100), allow_None=True, default=None,
                    doc="list of Things which are being executed", remote=False) # type: typing.Dict[str, Thing]


    def __init__(self, *, 
                id: str, 
                things: typing.List[Thing],
                context: zmq.asyncio.Context | None = None, 
                transport: ZMQ_TRANSPORTS = ZMQ_TRANSPORTS.INPROC,
                **kwargs: typing.Dict[str, typing.Any]
            ) -> None:
        """
        Parameters
        ----------
        id: str
            `id` of the server
        things: List[Thing]
            list of `Thing` instances to be served
        context: Optional, zmq.asyncio.Context
            ZeroMQ async Context object to use. All sockets except those created by event publisher share this context. 
            Automatically created when None is supplied.
        transport: ZMQ_TRANSPORTS
            transport layer to be used for the server, default is `INPROC`
        **kwargs:
            tcp_socket_address: str
                address of the `TCP` socket, if not given, a random port is chosen
        """
        super().__init__(id=id, **kwargs)
        self.things = dict() 
        for thing in things:
            self.things[thing.id] = thing

        if self.logger is None:
            self.logger =  get_default_logger('{}|{}|{}|{}'.format(self.__class__.__name__, 
                                                'RPC', 'MIXED', self.id), kwargs.get('log_level', logging.INFO))
            kwargs['logger'] = self.logger
        # contexts and poller
        self._run = False # flag to stop all the
        self._terminate_context = context is None
        self.context = context or zmq.asyncio.Context()

        self.req_rep_server = AsyncZMQServer(
                                id=self.id, 
                                context=self.context, 
                                transport=transport,
                                poll_timeout=1000,
                                **kwargs
                            )        
        self.event_publisher = EventPublisher(
                                id=f'{self.id}/event-publisher',
                                # dont pass the context
                                transport=transport,
                                **kwargs
                            )        

        self.schedulers = dict()

        # setup scheduling requirements
        for instance in self.things.values():
            instance.rpc_server = self
            instance.event_publisher = self.event_publisher 
            for action in instance.actions.descriptors.values():
                if action.execution_info.iscoroutine and not action.execution_info.synchronous:
                    self.schedulers[f'{instance.id}.{action.name}.invokeAction'] = AsyncScheduler
                elif not action.execution_info.synchronous:
                    self.schedulers[f'{instance.id}.{action.name}.invokeAction'] = ThreadedScheduler
                # else QueuedScheduler which is default
            # properties need not dealt yet, but may be in future

    schedulers: typing.Dict[str, "QueuedScheduler"]

    def __post_init__(self):
        super().__post_init__()
        self.logger.info("Server with name '{}' can be started using run().".format(self.id))   


    async def recv_requests_and_dispatch_jobs(self, server: AsyncZMQServer) -> None:
        """
        Continuously receives messages from different clients and dispatches them as jobs according to the specific
        requirements of a how an object (property/action/event) must be executed (queued/threaded/async). 
        Also handles messages that dont need separate jobs like `HANDSHAKE`, `EXIT`, timeouts etc.
        """
        eventloop = asyncio.get_event_loop()
        while self._run:
            try:
                request_messages = await server.poll_requests() 
                # when stop poll is set, this will exit with an empty list
            except BreakLoop:
                break
            except Exception as ex:
                self.logger.error(f"exception occurred while polling for server '{server.id}' - {str(ex)}")
                continue

            for request_message in request_messages:
                try:
                    # handle invokation timeout
                    invokation_timeout = request_message.server_execution_context.get("invokation_timeout", None)

                    ready_to_process_event = None
                    timeout_task = None
                    if invokation_timeout is not None:
                        ready_to_process_event = asyncio.Event()
                        timeout_task = asyncio.create_task(
                                                self._process_timeouts(
                                                    request_message=request_message, 
                                                    ready_to_process_event=ready_to_process_event, 
                                                    timeout=invokation_timeout, 
                                                    origin_server=server, 
                                                    timeout_type='invokation'
                                                )
                                            )
                        eventloop.call_soon(lambda : timeout_task)

                    # check object level scheduling requirements and schedule the message
                    # append to messages list - message, event, timeout task, origin socket
                    job = (server, request_message, timeout_task, ready_to_process_event) # type: Scheduler.JobInvokationType
                    if request_message.qualified_operation in self.schedulers:
                        scheduler = self.schedulers[request_message.qualified_operation](self.things[request_message.thing_id], self)
                    else:
                        scheduler = self.schedulers[request_message.thing_id]
                    scheduler.dispatch_job(job)

                except Exception as ex:
                    # handle invalid message
                    self.logger.error(f"exception occurred for message id '{request_message.id}' - {str(ex)}")
                    invalid_message_task = asyncio.create_task(
                                                    server._handle_invalid_message(
                                                        request_message=request_message,        
                                                        exception=ex
                                                    )
                                                )      
                    eventloop.call_soon(lambda: invalid_message_task)
        self.stop()
        self.logger.info(f"stopped polling for server '{server.id}' {server.socket_address.split(':')[0].upper()}")


    async def tunnel_message_to_things(self, scheduler: "Scheduler") -> None:
        """
        message tunneler between external sockets and interal inproc client
        """
        eventloop = get_current_async_loop()
        while self._run and scheduler.run:
            # wait for message first
            if not scheduler.has_job:
                await scheduler.wait_for_job()
                # this means in next loop it wont be in this block as a message arrived  
                continue

            # retrieve from messages list - message, execution context, event, timeout task, origin socket
            origin_server, request_message, timeout_task, ready_to_process_event = scheduler.next_job
            server_execution_context = request_message.server_execution_context

            # handle invokation timeout
            invokation_timed_out = True 
            if ready_to_process_event is not None: 
                ready_to_process_event.set() # releases timeout task 
                invokation_timed_out = await timeout_task
            if ready_to_process_event is not None and invokation_timed_out:
                # drop call to thing, timeout message was already sent in _process_timeouts()
                continue 

            # handle execution through thing
            scheduler.last_operation_request = scheduler.extract_operation_tuple_from_request(request_message)

            # schedule an execution timeout
            execution_timeout = server_execution_context.get("execution_timeout", None)
            execution_completed_event = None 
            execution_timeout_task = None
            execution_timed_out = True
            if execution_timeout is not None:
                execution_completed_event = asyncio.Event()
                execution_timeout_task = asyncio.create_task(
                                                    self._process_timeouts(
                                                        request_message=request_message, 
                                                        ready_to_process_event=execution_completed_event,
                                                        timeout=execution_timeout,
                                                        origin_server=origin_server,
                                                        timeout_type='execution'
                                                    )
                                                )
                eventloop.call_soon(lambda: execution_timeout_task)

            # always wait for reply from thing, since this loop is asyncio task (& in its own thread in RPC server), 
            # timeouts always reach client without truly blocking by the GIL. If reply does not arrive, all other requests
            # get invokation timeout.            
            # await eventloop.run_in_executor(None, scheduler.wait_for_reply)
            await scheduler.wait_for_reply(eventloop)
            # check if reply is never undefined, Undefined is a sensible placeholder for NotImplemented singleton
            if scheduler.last_operation_reply is Undefined:
                # this is a logic error, as the reply should never be undefined
                await origin_server._handle_error_message(
                            request_message=request_message, 
                            exception=RuntimeError("No reply from thing - logic error")
                        )
                continue
            payload, preserialized_payload, reply_message_type = scheduler.last_operation_reply
            scheduler.reset_operation_reply()

            # check if execution completed within time
            if execution_completed_event is not None:
                execution_completed_event.set() # releases timeout task
                execution_timed_out = await execution_timeout_task
            if execution_timeout_task is not None and execution_timed_out:
                # drop reply to client as timeout was already sent
                continue
            if server_execution_context.get("oneway", False):
                # drop reply if oneway
                continue 

            # send reply to client            
            await origin_server.async_send_response_with_message_type(
                request_message=request_message,
                message_type=reply_message_type,
                payload=payload,
                preserialized_payload=preserialized_payload
            ) 

        scheduler.cleanup()
        self.logger.info("stopped tunneling messages to things")


    async def run_thing_instance(self, instance: Thing, scheduler: typing.Optional["Scheduler"] = None) -> None: 
        """
        run a single `Thing` instance in an infinite loop by allowing the scheduler to schedule operations on it.

        Parameters
        ----------
        instance: Thing
            instance of the `Thing`
        scheduler: Optional[Scheduler]
            scheduler that schedules operations on the `Thing` instance, a default is always available. 
        """
        self.logger.info("starting to run operations on thing {} of class {}".format(instance.id, instance.__class__.__name__))
        if self.logger.level >= logging.ERROR:
            # sleep added to resolve some issue with logging related IO bound tasks in asyncio - not really clear what it is.
            # This loop crashes for log levels above ERROR without the following statement
            await asyncio.sleep(0.001) 
        scheduler = scheduler or self.schedulers[instance.id]
        eventloop = get_current_async_loop()

        while self._run and scheduler.run:
            # print("starting to serve thing {}".format(instance.id))
            await scheduler.wait_for_operation(eventloop)
            # await scheduler.wait_for_operation()
            if scheduler.last_operation_request is Undefined:
                instance.logger.warning("No operation request found in thing '{}'".format(instance.id))
                continue

            try:
                # fetch operation_request which is a tuple of 
                # (thing_id, objekt, operation, payload, preserialized_payload, execution_context)
                thing_id, objekt, operation, payload, preserialized_payload, execution_context = scheduler.last_operation_request 

                # deserializing the payload required to execute the operation
                payload = payload.deserialize() 
                preserialized_payload = preserialized_payload.value
                instance.logger.debug(f"thing {instance.id} with {thing_id} starting execution of operation {operation} on {objekt}")

                # start activities related to thing execution context
                fetch_execution_logs = execution_context.pop("fetch_execution_logs", False)
                if fetch_execution_logs:
                    list_handler = ListHandler([])
                    list_handler.setLevel(logging.DEBUG)
                    list_handler.setFormatter(instance.logger.handlers[0].formatter)
                    instance.logger.addHandler(list_handler)

                # execute the operation
                return_value = await self.execute_operation(instance, objekt, operation, payload, preserialized_payload) 

                # handle return value
                if isinstance(return_value, tuple) and len(return_value) == 2 and (
                    isinstance(return_value[1], bytes) or 
                    isinstance(return_value[1], PreserializedData) 
                ):  
                    if fetch_execution_logs:
                        return_value[0] = {
                            "return_value" : return_value[0],
                            "execution_logs" : list_handler.log_list
                        }
                    payload = SerializableData(return_value[0], Serializers.for_object(thing_id, instance.__class__.__name__, objekt))
                    if isinstance(return_value[1], bytes):
                        preserialized_payload = PreserializedData(return_value[1])
                # elif isinstance(return_value, PreserializedData):
                #     if fetch_execution_logs:
                #         return_value = {
                #             "return_value" : return_value.value,
                #             "execution_logs" : list_handler.log_list
                #         }
                #     payload = SerializableData(return_value.value, content_type='application/json')
                #     preserialized_payload = return_value

                elif isinstance(return_value, bytes):
                    payload = SerializableData(None, content_type='application/json')
                    preserialized_payload = PreserializedData(return_value)
                else:
                     # complete thing execution context
                    if fetch_execution_logs:
                        return_value = {
                            "return_value" : return_value,
                            "execution_logs" : list_handler.log_list
                        }
                    payload = SerializableData(return_value, Serializers.for_object(thing_id, instance.__class__.__name__, objekt))
                    preserialized_payload = PreserializedData(EMPTY_BYTE, content_type='text/plain')
                # set reply
                scheduler.last_operation_reply = (payload, preserialized_payload, REPLY)
            except BreakInnerLoop:
                # exit the loop and stop the thing
                instance.logger.info("Thing {} with instance name {} exiting event loop.".format(
                                                            instance.__class__.__name__, instance.id))
                return_value = None
                if fetch_execution_logs:
                    return_value = { 
                        "return_value" : None,
                        "execution_logs" : list_handler.log_list
                    }
                scheduler.last_operation_reply = (
                    SerializableData(return_value, content_type='application/json'), 
                    PreserializedData(EMPTY_BYTE, content_type='text/plain'),
                    None
                )
                return 
            except Exception as ex:
                # error occurred while executing the operation
                instance.logger.error("Thing {} with ID {} produced error : {} - {}.".format(
                                                        instance.__class__.__name__, instance.id, type(ex), ex))
                return_value = dict(exception=format_exception_as_json(ex))                
                if fetch_execution_logs:
                    return_value["execution_logs"] = list_handler.log_list
                scheduler.last_operation_reply = (
                    SerializableData(return_value, content_type='application/json'), 
                    PreserializedData(EMPTY_BYTE, content_type='text/plain'),
                    ERROR
                )
            finally:
                # cleanup
                if fetch_execution_logs:
                    instance.logger.removeHandler(list_handler)
                instance.logger.debug("thing {} with instance name {} completed execution of operation {} on {}".format(
                                                            instance.__class__.__name__, instance.id, operation, objekt))
        self.logger.info("stopped running thing {}".format(instance.id))


    @classmethod
    async def execute_operation(cls, 
                        instance: Thing, 
                        objekt: str, 
                        operation: str,
                        payload: typing.Any,
                        preserialized_payload: bytes
                    ) -> typing.Any:
        """
        Execute a given operation on a thing instance. 

        Parameters
        ----------
        instance: Thing
            instance of the thing
        objekt: str
            name of the property, action or event
        operation: str
            operation to be executed on the property, action or event
        payload: Any
            payload to be used for the operation 
        preserialized_payload: bytes
            preserialized payload to be used for the operation
        """
        if operation == 'readProperty':
            prop = instance.properties[objekt] # type: Property
            return getattr(instance, prop.name) 
        elif operation == 'writeProperty':
            prop = instance.properties[objekt] # type: Property
            if preserialized_payload != EMPTY_BYTE:
                prop_value = preserialized_payload
            else:
                prop_value = payload
            return prop.external_set(instance, prop_value)
        elif operation == 'deleteProperty':
            prop = instance.properties[objekt] # type: Property
            del prop # raises NotImplementedError when deletion is not implemented which is mostly the case
        elif operation == 'invokeAction':
            action = instance.actions[objekt] # type: BoundAction
            args = payload.pop('__args__', tuple())
            # payload then become kwargs
            if preserialized_payload != EMPTY_BYTE:
                args = (preserialized_payload,) + args
            if action.execution_info.iscoroutine:
                # the actual scheduling as a purely async task is done by the scheduler, not here, 
                # this will be a blocking call
                return await action.external_call(*args, **payload)
            return action.external_call(*args, **payload) 
        elif operation == 'readMultipleProperties' or operation == 'readAllProperties':
            if objekt is None:
                return instance._get_properties()
            return instance._get_properties(names=objekt)
        elif operation == 'writeMultipleProperties' or operation == 'writeAllProperties':
            return instance._set_properties(payload)
        raise NotImplementedError("Unimplemented execution path for Thing {} for operation {}".format(
                                                                            instance.id, operation))


    async def _process_timeouts(self, 
                            request_message: RequestMessage, 
                            ready_to_process_event: asyncio.Event,
                            origin_server: AsyncZMQServer,
                            timeout: float | int | None, 
                            timeout_type : str
                        ) -> bool:
        """
        replies timeout to client if timeout occured along with returning `True` to indicate that.
        If timeout did not occur, the `ready_to_process_event` is set to indicate that the operation can be processed.
        `False` is returned in this case.
        """
        try:
            await asyncio.wait_for(ready_to_process_event.wait(), timeout)
            return False 
        except TimeoutError:    
            await origin_server._handle_timeout(request_message, timeout_type)
            return True


    def run_zmq_request_listener(self):
        """
        Runs ZMQ's socket polling in an async loop. This method is blocking and is automatically called by `run()` 
        method. Please dont call this method when the async loop is already running. 
        """
        self.logger.info("starting external message listener thread")
        self._run = True
        eventloop = get_current_async_loop()
        existing_tasks = asyncio.all_tasks(eventloop)
        eventloop.run_until_complete(
            asyncio.gather(
                self.recv_requests_and_dispatch_jobs(self.req_rep_server),
                *[self.tunnel_message_to_things(scheduler) for scheduler in self.schedulers.values()],
                *existing_tasks
            )
        )
        self.logger.info("exiting external listener event loop {}".format(self.id))
        eventloop.close()


    def run_things(self, things: typing.List[Thing]):
        """
        Run loop that executes operations on `Thing` instances. This method is blocking and is called by `run()` method.

        Parameters
        ----------
        things: List[Thing]
            list of `Thing` instances to be executed
        """
        thing_executor_loop = get_current_async_loop()
        self.logger.info(f"starting thing executor loop in thread {threading.get_ident()} for {[obj.id for obj in things]}")
        thing_executor_loop.run_until_complete(
            asyncio.gather(*[self.run_thing_instance(instance) for instance in things])
        )
        self.logger.info(f"exiting event loop in thread {threading.get_ident()}")
        thing_executor_loop.close()


    def run(self):
        """
        Start the server. This method is blocking. 
        Creates job schedulers for each `Thing`, dispatches each `Thing` to its own thread and starts the ZMQ sockets
        polling loop. Call stop() (threadsafe) to stop the server.
        """
        self.logger.info(f"starting RPC server {self.id}")
        for thing in self.things.values():
            self.schedulers[thing.id] = QueuedScheduler(thing, self)       
        threads = dict() # type: typing.Dict[int, threading.Thread]
        for thing in self.things.values():
            thread = threading.Thread(target=self.run_things, args=([thing],))
            thread.start()
            threads[thread.ident] = thread
        self.run_zmq_request_listener()
        for thread in threads.values():
            thread.join()       
        self.logger.info(f"server stopped {self.id}")


    def stop(self):
        """Stop the server. This method is threadsafe."""
        self._run = False
        self.req_rep_server.stop_polling()
        for scheduler in self.schedulers.values():
            scheduler.cleanup()


    def exit(self):
        try:
            self.stop()
            if self.req_rep_server is not None:
                self.req_rep_server.exit()
            if self.event_publisher is not None:
                self.event_publisher.exit()
        except:
            pass 
        if self._terminate_context:
            self.context.term()
        self.logger.info("terminated context of socket '{}' of type '{}'".format(self.id, self.__class__))


    def __hash__(self):
        return hash('RPCServer' + self.id)

    def __eq__(self, other):
        if not isinstance(other, RPCServer):
            return False
        return self.id == other.id

    def __str__(self):
        return f"RPCServer({self.id})"

Functions

__init__

__init__(*, id: str, things: typing.List[Thing], context: zmq.asyncio.Context | None = None, transport: ZMQ_TRANSPORTS = ZMQ_TRANSPORTS.INPROC, **kwargs: typing.Dict[str, typing.Any]) -> None

Parameters:

Name Type Description Default

id

str

id of the server

required

things

List[Thing]

list of Thing instances to be served

required

context

Context | None

ZeroMQ async Context object to use. All sockets except those created by event publisher share this context. Automatically created when None is supplied.

None

transport

ZMQ_TRANSPORTS

transport layer to be used for the server, default is INPROC

INPROC

**kwargs

Dict[str, Any]

tcp_socket_address: str address of the TCP socket, if not given, a random port is chosen

{}
Source code in hololinked\core\zmq\rpc_server.py
def __init__(self, *, 
            id: str, 
            things: typing.List[Thing],
            context: zmq.asyncio.Context | None = None, 
            transport: ZMQ_TRANSPORTS = ZMQ_TRANSPORTS.INPROC,
            **kwargs: typing.Dict[str, typing.Any]
        ) -> None:
    """
    Parameters
    ----------
    id: str
        `id` of the server
    things: List[Thing]
        list of `Thing` instances to be served
    context: Optional, zmq.asyncio.Context
        ZeroMQ async Context object to use. All sockets except those created by event publisher share this context. 
        Automatically created when None is supplied.
    transport: ZMQ_TRANSPORTS
        transport layer to be used for the server, default is `INPROC`
    **kwargs:
        tcp_socket_address: str
            address of the `TCP` socket, if not given, a random port is chosen
    """
    super().__init__(id=id, **kwargs)
    self.things = dict() 
    for thing in things:
        self.things[thing.id] = thing

    if self.logger is None:
        self.logger =  get_default_logger('{}|{}|{}|{}'.format(self.__class__.__name__, 
                                            'RPC', 'MIXED', self.id), kwargs.get('log_level', logging.INFO))
        kwargs['logger'] = self.logger
    # contexts and poller
    self._run = False # flag to stop all the
    self._terminate_context = context is None
    self.context = context or zmq.asyncio.Context()

    self.req_rep_server = AsyncZMQServer(
                            id=self.id, 
                            context=self.context, 
                            transport=transport,
                            poll_timeout=1000,
                            **kwargs
                        )        
    self.event_publisher = EventPublisher(
                            id=f'{self.id}/event-publisher',
                            # dont pass the context
                            transport=transport,
                            **kwargs
                        )        

    self.schedulers = dict()

    # setup scheduling requirements
    for instance in self.things.values():
        instance.rpc_server = self
        instance.event_publisher = self.event_publisher 
        for action in instance.actions.descriptors.values():
            if action.execution_info.iscoroutine and not action.execution_info.synchronous:
                self.schedulers[f'{instance.id}.{action.name}.invokeAction'] = AsyncScheduler
            elif not action.execution_info.synchronous:
                self.schedulers[f'{instance.id}.{action.name}.invokeAction'] = ThreadedScheduler

run

run()

Start the server. This method is blocking. Creates job schedulers for each Thing, dispatches each Thing to its own thread and starts the ZMQ sockets polling loop. Call stop() (threadsafe) to stop the server.

Source code in hololinked\core\zmq\rpc_server.py
def run(self):
    """
    Start the server. This method is blocking. 
    Creates job schedulers for each `Thing`, dispatches each `Thing` to its own thread and starts the ZMQ sockets
    polling loop. Call stop() (threadsafe) to stop the server.
    """
    self.logger.info(f"starting RPC server {self.id}")
    for thing in self.things.values():
        self.schedulers[thing.id] = QueuedScheduler(thing, self)       
    threads = dict() # type: typing.Dict[int, threading.Thread]
    for thing in self.things.values():
        thread = threading.Thread(target=self.run_things, args=([thing],))
        thread.start()
        threads[thread.ident] = thread
    self.run_zmq_request_listener()
    for thread in threads.values():
        thread.join()       
    self.logger.info(f"server stopped {self.id}")

stop

stop()

Stop the server. This method is threadsafe.

Source code in hololinked\core\zmq\rpc_server.py
def stop(self):
    """Stop the server. This method is threadsafe."""
    self._run = False
    self.req_rep_server.stop_polling()
    for scheduler in self.schedulers.values():
        scheduler.cleanup()

run_zmq_request_listener

run_zmq_request_listener()

Runs ZMQ's socket polling in an async loop. This method is blocking and is automatically called by run() method. Please dont call this method when the async loop is already running.

Source code in hololinked\core\zmq\rpc_server.py
def run_zmq_request_listener(self):
    """
    Runs ZMQ's socket polling in an async loop. This method is blocking and is automatically called by `run()` 
    method. Please dont call this method when the async loop is already running. 
    """
    self.logger.info("starting external message listener thread")
    self._run = True
    eventloop = get_current_async_loop()
    existing_tasks = asyncio.all_tasks(eventloop)
    eventloop.run_until_complete(
        asyncio.gather(
            self.recv_requests_and_dispatch_jobs(self.req_rep_server),
            *[self.tunnel_message_to_things(scheduler) for scheduler in self.schedulers.values()],
            *existing_tasks
        )
    )
    self.logger.info("exiting external listener event loop {}".format(self.id))
    eventloop.close()

recv_requests_and_dispatch_jobs async

recv_requests_and_dispatch_jobs(server: AsyncZMQServer) -> None

Continuously receives messages from different clients and dispatches them as jobs according to the specific requirements of a how an object (property/action/event) must be executed (queued/threaded/async). Also handles messages that dont need separate jobs like HANDSHAKE, EXIT, timeouts etc.

Source code in hololinked\core\zmq\rpc_server.py
async def recv_requests_and_dispatch_jobs(self, server: AsyncZMQServer) -> None:
    """
    Continuously receives messages from different clients and dispatches them as jobs according to the specific
    requirements of a how an object (property/action/event) must be executed (queued/threaded/async). 
    Also handles messages that dont need separate jobs like `HANDSHAKE`, `EXIT`, timeouts etc.
    """
    eventloop = asyncio.get_event_loop()
    while self._run:
        try:
            request_messages = await server.poll_requests() 
            # when stop poll is set, this will exit with an empty list
        except BreakLoop:
            break
        except Exception as ex:
            self.logger.error(f"exception occurred while polling for server '{server.id}' - {str(ex)}")
            continue

        for request_message in request_messages:
            try:
                # handle invokation timeout
                invokation_timeout = request_message.server_execution_context.get("invokation_timeout", None)

                ready_to_process_event = None
                timeout_task = None
                if invokation_timeout is not None:
                    ready_to_process_event = asyncio.Event()
                    timeout_task = asyncio.create_task(
                                            self._process_timeouts(
                                                request_message=request_message, 
                                                ready_to_process_event=ready_to_process_event, 
                                                timeout=invokation_timeout, 
                                                origin_server=server, 
                                                timeout_type='invokation'
                                            )
                                        )
                    eventloop.call_soon(lambda : timeout_task)

                # check object level scheduling requirements and schedule the message
                # append to messages list - message, event, timeout task, origin socket
                job = (server, request_message, timeout_task, ready_to_process_event) # type: Scheduler.JobInvokationType
                if request_message.qualified_operation in self.schedulers:
                    scheduler = self.schedulers[request_message.qualified_operation](self.things[request_message.thing_id], self)
                else:
                    scheduler = self.schedulers[request_message.thing_id]
                scheduler.dispatch_job(job)

            except Exception as ex:
                # handle invalid message
                self.logger.error(f"exception occurred for message id '{request_message.id}' - {str(ex)}")
                invalid_message_task = asyncio.create_task(
                                                server._handle_invalid_message(
                                                    request_message=request_message,        
                                                    exception=ex
                                                )
                                            )      
                eventloop.call_soon(lambda: invalid_message_task)
    self.stop()
    self.logger.info(f"stopped polling for server '{server.id}' {server.socket_address.split(':')[0].upper()}")

run_things

run_things(things: typing.List[Thing])

Run loop that executes operations on Thing instances. This method is blocking and is called by run() method.

Parameters:

Name Type Description Default

things

List[Thing]

list of Thing instances to be executed

required
Source code in hololinked\core\zmq\rpc_server.py
def run_things(self, things: typing.List[Thing]):
    """
    Run loop that executes operations on `Thing` instances. This method is blocking and is called by `run()` method.

    Parameters
    ----------
    things: List[Thing]
        list of `Thing` instances to be executed
    """
    thing_executor_loop = get_current_async_loop()
    self.logger.info(f"starting thing executor loop in thread {threading.get_ident()} for {[obj.id for obj in things]}")
    thing_executor_loop.run_until_complete(
        asyncio.gather(*[self.run_thing_instance(instance) for instance in things])
    )
    self.logger.info(f"exiting event loop in thread {threading.get_ident()}")
    thing_executor_loop.close()

run_thing_instance async

run_thing_instance(instance: Thing, scheduler: typing.Optional[Scheduler] = None) -> None

run a single Thing instance in an infinite loop by allowing the scheduler to schedule operations on it.

Parameters:

Name Type Description Default

instance

Thing

instance of the Thing

required

scheduler

Optional[Scheduler]

scheduler that schedules operations on the Thing instance, a default is always available.

None
Source code in hololinked\core\zmq\rpc_server.py
async def run_thing_instance(self, instance: Thing, scheduler: typing.Optional["Scheduler"] = None) -> None: 
    """
    run a single `Thing` instance in an infinite loop by allowing the scheduler to schedule operations on it.

    Parameters
    ----------
    instance: Thing
        instance of the `Thing`
    scheduler: Optional[Scheduler]
        scheduler that schedules operations on the `Thing` instance, a default is always available. 
    """
    self.logger.info("starting to run operations on thing {} of class {}".format(instance.id, instance.__class__.__name__))
    if self.logger.level >= logging.ERROR:
        # sleep added to resolve some issue with logging related IO bound tasks in asyncio - not really clear what it is.
        # This loop crashes for log levels above ERROR without the following statement
        await asyncio.sleep(0.001) 
    scheduler = scheduler or self.schedulers[instance.id]
    eventloop = get_current_async_loop()

    while self._run and scheduler.run:
        # print("starting to serve thing {}".format(instance.id))
        await scheduler.wait_for_operation(eventloop)
        # await scheduler.wait_for_operation()
        if scheduler.last_operation_request is Undefined:
            instance.logger.warning("No operation request found in thing '{}'".format(instance.id))
            continue

        try:
            # fetch operation_request which is a tuple of 
            # (thing_id, objekt, operation, payload, preserialized_payload, execution_context)
            thing_id, objekt, operation, payload, preserialized_payload, execution_context = scheduler.last_operation_request 

            # deserializing the payload required to execute the operation
            payload = payload.deserialize() 
            preserialized_payload = preserialized_payload.value
            instance.logger.debug(f"thing {instance.id} with {thing_id} starting execution of operation {operation} on {objekt}")

            # start activities related to thing execution context
            fetch_execution_logs = execution_context.pop("fetch_execution_logs", False)
            if fetch_execution_logs:
                list_handler = ListHandler([])
                list_handler.setLevel(logging.DEBUG)
                list_handler.setFormatter(instance.logger.handlers[0].formatter)
                instance.logger.addHandler(list_handler)

            # execute the operation
            return_value = await self.execute_operation(instance, objekt, operation, payload, preserialized_payload) 

            # handle return value
            if isinstance(return_value, tuple) and len(return_value) == 2 and (
                isinstance(return_value[1], bytes) or 
                isinstance(return_value[1], PreserializedData) 
            ):  
                if fetch_execution_logs:
                    return_value[0] = {
                        "return_value" : return_value[0],
                        "execution_logs" : list_handler.log_list
                    }
                payload = SerializableData(return_value[0], Serializers.for_object(thing_id, instance.__class__.__name__, objekt))
                if isinstance(return_value[1], bytes):
                    preserialized_payload = PreserializedData(return_value[1])
            # elif isinstance(return_value, PreserializedData):
            #     if fetch_execution_logs:
            #         return_value = {
            #             "return_value" : return_value.value,
            #             "execution_logs" : list_handler.log_list
            #         }
            #     payload = SerializableData(return_value.value, content_type='application/json')
            #     preserialized_payload = return_value

            elif isinstance(return_value, bytes):
                payload = SerializableData(None, content_type='application/json')
                preserialized_payload = PreserializedData(return_value)
            else:
                 # complete thing execution context
                if fetch_execution_logs:
                    return_value = {
                        "return_value" : return_value,
                        "execution_logs" : list_handler.log_list
                    }
                payload = SerializableData(return_value, Serializers.for_object(thing_id, instance.__class__.__name__, objekt))
                preserialized_payload = PreserializedData(EMPTY_BYTE, content_type='text/plain')
            # set reply
            scheduler.last_operation_reply = (payload, preserialized_payload, REPLY)
        except BreakInnerLoop:
            # exit the loop and stop the thing
            instance.logger.info("Thing {} with instance name {} exiting event loop.".format(
                                                        instance.__class__.__name__, instance.id))
            return_value = None
            if fetch_execution_logs:
                return_value = { 
                    "return_value" : None,
                    "execution_logs" : list_handler.log_list
                }
            scheduler.last_operation_reply = (
                SerializableData(return_value, content_type='application/json'), 
                PreserializedData(EMPTY_BYTE, content_type='text/plain'),
                None
            )
            return 
        except Exception as ex:
            # error occurred while executing the operation
            instance.logger.error("Thing {} with ID {} produced error : {} - {}.".format(
                                                    instance.__class__.__name__, instance.id, type(ex), ex))
            return_value = dict(exception=format_exception_as_json(ex))                
            if fetch_execution_logs:
                return_value["execution_logs"] = list_handler.log_list
            scheduler.last_operation_reply = (
                SerializableData(return_value, content_type='application/json'), 
                PreserializedData(EMPTY_BYTE, content_type='text/plain'),
                ERROR
            )
        finally:
            # cleanup
            if fetch_execution_logs:
                instance.logger.removeHandler(list_handler)
            instance.logger.debug("thing {} with instance name {} completed execution of operation {} on {}".format(
                                                        instance.__class__.__name__, instance.id, operation, objekt))
    self.logger.info("stopped running thing {}".format(instance.id))

execute_operation async classmethod

execute_operation(instance: Thing, objekt: str, operation: str, payload: typing.Any, preserialized_payload: bytes) -> typing.Any

Execute a given operation on a thing instance.

Parameters:

Name Type Description Default

instance

Thing

instance of the thing

required

objekt

str

name of the property, action or event

required

operation

str

operation to be executed on the property, action or event

required

payload

Any

payload to be used for the operation

required

preserialized_payload

bytes

preserialized payload to be used for the operation

required
Source code in hololinked\core\zmq\rpc_server.py
@classmethod
async def execute_operation(cls, 
                    instance: Thing, 
                    objekt: str, 
                    operation: str,
                    payload: typing.Any,
                    preserialized_payload: bytes
                ) -> typing.Any:
    """
    Execute a given operation on a thing instance. 

    Parameters
    ----------
    instance: Thing
        instance of the thing
    objekt: str
        name of the property, action or event
    operation: str
        operation to be executed on the property, action or event
    payload: Any
        payload to be used for the operation 
    preserialized_payload: bytes
        preserialized payload to be used for the operation
    """
    if operation == 'readProperty':
        prop = instance.properties[objekt] # type: Property
        return getattr(instance, prop.name) 
    elif operation == 'writeProperty':
        prop = instance.properties[objekt] # type: Property
        if preserialized_payload != EMPTY_BYTE:
            prop_value = preserialized_payload
        else:
            prop_value = payload
        return prop.external_set(instance, prop_value)
    elif operation == 'deleteProperty':
        prop = instance.properties[objekt] # type: Property
        del prop # raises NotImplementedError when deletion is not implemented which is mostly the case
    elif operation == 'invokeAction':
        action = instance.actions[objekt] # type: BoundAction
        args = payload.pop('__args__', tuple())
        # payload then become kwargs
        if preserialized_payload != EMPTY_BYTE:
            args = (preserialized_payload,) + args
        if action.execution_info.iscoroutine:
            # the actual scheduling as a purely async task is done by the scheduler, not here, 
            # this will be a blocking call
            return await action.external_call(*args, **payload)
        return action.external_call(*args, **payload) 
    elif operation == 'readMultipleProperties' or operation == 'readAllProperties':
        if objekt is None:
            return instance._get_properties()
        return instance._get_properties(names=objekt)
    elif operation == 'writeMultipleProperties' or operation == 'writeAllProperties':
        return instance._set_properties(payload)
    raise NotImplementedError("Unimplemented execution path for Thing {} for operation {}".format(
                                                                        instance.id, operation))

exit

exit()
Source code in hololinked\core\zmq\rpc_server.py
def exit(self):
    try:
        self.stop()
        if self.req_rep_server is not None:
            self.req_rep_server.exit()
        if self.event_publisher is not None:
            self.event_publisher.exit()
    except:
        pass 
    if self._terminate_context:
        self.context.term()
    self.logger.info("terminated context of socket '{}' of type '{}'".format(self.id, self.__class__))

Attributes

id

str
instance-attribute, writable
ID of the RPC server, used to direct requests from client. Must be unique. For IPC & INPROC sockets, ID is used to create the socket as well. For TCP, it is used for message routing.

things

List[Thing]
instance-attribute, read-only
Thing objects that will be served by this RPC server.

logger

logging.Logger
logger instance

req_rep_server

AsyncZMQServer
instance-attribute, read-only
ZMQ server that handler request-reply pattern. Used for properties & actions.

event_publisher

EventPublisher
instance-attribute, read-only
ZMQ servers that publishes events to clients in publish-subscribe pattern.

schedulers

tying.Dict[str, Scheduler]
instance-attribute
A map of thing ID to a Scheduler object. For actions requiring special scheduling, additional schdulers with ID "(thing ID).(action name).(opertaion)" is created.