Skip to content

hololinked.core.zmq.rpc_server.Scheduler

Scheduler class to schedule the operations of a thing either in queued mode, or a one-shot mode in either async or threaded loops.

UML Diagram

UML Diagram subclasses

Source code in hololinked/hololinked/core/zmq/rpc_server.py
class Scheduler:
    """
    Scheduler class to schedule the operations of a thing either in queued mode, or a one-shot mode in either
    async or threaded loops.

    [UML Diagram](http://docs.hololinked.dev/UML/PDF/RPCServer.pdf)

    [UML Diagram subclasses](http://docs.hololinked.dev/UML/PDF/Scheduler.pdf)
    """

    OperationRequest = tuple[str, str, str, SerializableData, PreserializedData, dict[str, Any]]
    OperationReply = tuple[SerializableData, PreserializedData, str]
    JobInvokationType = tuple[AsyncZMQServer, RequestMessage, asyncio.Task, asyncio.Event]
    # [UML Diagram](http://docs.hololinked.dev/UML/PDF/RPCServer.pdf)
    _operation_execution_complete_event: asyncio.Event | threading.Event
    _operation_execution_ready_event: asyncio.Event | threading.Event

    def __init__(self, instance: Thing, rpc_server: RPCServer) -> None:
        self.instance = instance  # type: Thing
        self.rpc_server = rpc_server  # type: RPCServer
        self.run = True  # type: bool
        self._one_shot = False  # type: bool
        self._last_operation_request = Undefined  # type: Scheduler.OperationRequest
        self._last_operation_reply = Undefined  # type: Scheduler.OperationRequest
        self._job_queued_event = asyncio.Event()  # type: asyncio.Event

    @property
    def last_operation_request(self) -> OperationRequest:
        return self._last_operation_request

    @last_operation_request.setter
    def last_operation_request(self, value: OperationRequest):
        self._last_operation_request = value
        self._operation_execution_ready_event.set()

    def reset_operation_request(self) -> None:
        self._last_operation_request = Undefined

    @property
    def last_operation_reply(self) -> OperationReply:
        return self._last_operation_reply

    @last_operation_reply.setter
    def last_operation_reply(self, value: OperationReply):
        self._last_operation_request = Undefined
        self._last_operation_reply = value
        self._operation_execution_complete_event.set()
        if self._one_shot:
            self.run = False

    def reset_operation_reply(self) -> None:
        self._last_operation_reply = Undefined

    async def wait_for_job(self) -> None:
        await self._job_queued_event.wait()
        self._job_queued_event.clear()

    async def wait_for_operation(self, eventloop: asyncio.AbstractEventLoop | None) -> None:
        # assert isinstance(self._operation_execution_ready_event, threading.Event), "not a threaded scheduler"
        if isinstance(self._operation_execution_ready_event, threading.Event):
            await eventloop.run_in_executor(None, self._operation_execution_ready_event.wait)
        else:
            await self._operation_execution_ready_event.wait()
        self._operation_execution_ready_event.clear()

    async def wait_for_reply(self, eventloop: asyncio.AbstractEventLoop | None) -> None:
        if isinstance(self._operation_execution_complete_event, threading.Event):
            await eventloop.run_in_executor(None, self._operation_execution_complete_event.wait)
        else:
            await self._operation_execution_complete_event.wait()
        self._operation_execution_complete_event.clear()

    @property
    def has_job(self) -> bool:
        raise NotImplementedError("has_job method must be implemented in the subclass")

    @property
    def next_job(self) -> JobInvokationType:
        raise NotImplementedError("next_job method must be implemented in the subclass")

    def dispatch_job(self, job: JobInvokationType) -> None:
        raise NotImplementedError("dispatch_job method must be implemented in the subclass")

    def cleanup(self):
        self.run = False
        self._job_queued_event.set()
        self._operation_execution_ready_event.set()
        self._operation_execution_complete_event.set()

    @classmethod
    def extract_operation_tuple_from_request(self, request_message: RequestMessage) -> OperationRequest:
        """thing execution info"""
        return (
            request_message.header["thingID"],
            request_message.header["objekt"],
            request_message.header["operation"],
            request_message.body[0],
            request_message.body[1],
            request_message.header["thingExecutionContext"],
        )

    @classmethod
    def format_reply_tuple(self, return_value: Any) -> OperationReply:
        pass

Functions

extract_operation_tuple_from_request classmethod

extract_operation_tuple_from_request(request_message: RequestMessage) -> OperationRequest

thing execution info

Source code in hololinked/hololinked/core/zmq/rpc_server.py
@classmethod
def extract_operation_tuple_from_request(self, request_message: RequestMessage) -> OperationRequest:
    """thing execution info"""
    return (
        request_message.header["thingID"],
        request_message.header["objekt"],
        request_message.header["operation"],
        request_message.body[0],
        request_message.body[1],
        request_message.header["thingExecutionContext"],
    )