Skip to content

hololinked.core.zmq.rpc_server.Scheduler

Scheduler class to schedule the operations of a thing either in queued mode, or a one-shot mode in either async or threaded loops.

UML Diagram

UML Diagram subclasses

Source code in hololinked\core\zmq\rpc_server.py
class Scheduler:
    """
    Scheduler class to schedule the operations of a thing either in queued mode, or a one-shot mode in either 
    async or threaded loops. 

    [UML Diagram](http://localhost:8000/UML/PDF/RPCServer.pdf)

    [UML Diagram subclasses](http://localhost:8000/UML/PDF/Scheduler.pdf)
    """

    OperationRequest = typing.Tuple[str, str, str, SerializableData, PreserializedData, typing.Dict[str, typing.Any]]
    OperationReply = typing.Tuple[SerializableData, PreserializedData, str]
    JobInvokationType = typing.Tuple[AsyncZMQServer, RequestMessage, asyncio.Task, asyncio.Event]
    # [UML Diagram](http://localhost:8000/UML/PDF/RPCServer.pdf)
    _operation_execution_complete_event: asyncio.Event | threading.Event
    _operation_execution_ready_event: asyncio.Event | threading.Event

    def __init__(self, instance: Thing, rpc_server: RPCServer) -> None:
        self.instance = instance # type: Thing
        self.rpc_server = rpc_server # type: RPCServer
        self.run = True # type: bool 
        self._one_shot = False # type: bool
        self._last_operation_request = Undefined # type: Scheduler.OperationRequest
        self._last_operation_reply = Undefined # type: Scheduler.OperationRequest
        self._job_queued_event = asyncio.Event() # type: asyncio.Event

    @property
    def last_operation_request(self) -> OperationRequest:
        return self._last_operation_request

    @last_operation_request.setter
    def last_operation_request(self, value: OperationRequest):
        self._last_operation_request = value
        self._operation_execution_ready_event.set()

    def reset_operation_request(self) -> None:
        self._last_operation_request = Undefined

    @property
    def last_operation_reply(self) -> OperationReply:
        return self._last_operation_reply

    @last_operation_reply.setter
    def last_operation_reply(self, value: OperationReply):
        self._last_operation_request = Undefined
        self._last_operation_reply = value
        self._operation_execution_complete_event.set()
        if self._one_shot:
            self.run = False

    def reset_operation_reply(self) -> None:
        self._last_operation_reply = Undefined

    async def wait_for_job(self) -> None:
        await self._job_queued_event.wait()
        self._job_queued_event.clear()

    async def wait_for_operation(self, eventloop: asyncio.AbstractEventLoop | None) -> None:
        # assert isinstance(self._operation_execution_ready_event, threading.Event), "not a threaded scheduler"
        if isinstance(self._operation_execution_ready_event, threading.Event):
            await eventloop.run_in_executor(None, self._operation_execution_ready_event.wait)
        else:
            await self._operation_execution_ready_event.wait()
        self._operation_execution_ready_event.clear()

    async def wait_for_reply(self, eventloop: asyncio.AbstractEventLoop | None) -> None:
        if isinstance(self._operation_execution_complete_event, threading.Event):
            await eventloop.run_in_executor(None, self._operation_execution_complete_event.wait)
        else:
            await self._operation_execution_complete_event.wait()
        self._operation_execution_complete_event.clear()

    @property
    def has_job(self) -> bool:
        raise NotImplementedError("has_job method must be implemented in the subclass")

    @property
    def next_job(self) -> JobInvokationType:
        raise NotImplementedError("next_job method must be implemented in the subclass")

    def dispatch_job(self, job: JobInvokationType) -> None:
        raise NotImplementedError("dispatch_job method must be implemented in the subclass")

    def cleanup(self):
        self.run = False
        self._job_queued_event.set()
        self._operation_execution_ready_event.set()
        self._operation_execution_complete_event.set()

    @classmethod
    def extract_operation_tuple_from_request(self, request_message: RequestMessage) -> OperationRequest:
        """thing execution info"""
        return (request_message.header['thingID'], request_message.header['objekt'], request_message.header['operation'], 
            request_message.body[0], request_message.body[1], request_message.header['thingExecutionContext'])

    @classmethod
    def format_reply_tuple(self, return_value: typing.Any) -> OperationReply:
        pass

Functions

extract_operation_tuple_from_request classmethod

extract_operation_tuple_from_request(request_message: RequestMessage) -> OperationRequest

thing execution info

Source code in hololinked\core\zmq\rpc_server.py
@classmethod
def extract_operation_tuple_from_request(self, request_message: RequestMessage) -> OperationRequest:
    """thing execution info"""
    return (request_message.header['thingID'], request_message.header['objekt'], request_message.header['operation'], 
        request_message.body[0], request_message.body[1], request_message.header['thingExecutionContext'])