class Scheduler:
"""
Scheduler class to schedule the operations of a thing either in queued mode, or a one-shot mode in either
async or threaded loops.
[UML Diagram](http://localhost:8000/UML/PDF/RPCServer.pdf)
[UML Diagram subclasses](http://localhost:8000/UML/PDF/Scheduler.pdf)
"""
OperationRequest = typing.Tuple[str, str, str, SerializableData, PreserializedData, typing.Dict[str, typing.Any]]
OperationReply = typing.Tuple[SerializableData, PreserializedData, str]
JobInvokationType = typing.Tuple[AsyncZMQServer, RequestMessage, asyncio.Task, asyncio.Event]
# [UML Diagram](http://localhost:8000/UML/PDF/RPCServer.pdf)
_operation_execution_complete_event: asyncio.Event | threading.Event
_operation_execution_ready_event: asyncio.Event | threading.Event
def __init__(self, instance: Thing, rpc_server: RPCServer) -> None:
self.instance = instance # type: Thing
self.rpc_server = rpc_server # type: RPCServer
self.run = True # type: bool
self._one_shot = False # type: bool
self._last_operation_request = Undefined # type: Scheduler.OperationRequest
self._last_operation_reply = Undefined # type: Scheduler.OperationRequest
self._job_queued_event = asyncio.Event() # type: asyncio.Event
@property
def last_operation_request(self) -> OperationRequest:
return self._last_operation_request
@last_operation_request.setter
def last_operation_request(self, value: OperationRequest):
self._last_operation_request = value
self._operation_execution_ready_event.set()
def reset_operation_request(self) -> None:
self._last_operation_request = Undefined
@property
def last_operation_reply(self) -> OperationReply:
return self._last_operation_reply
@last_operation_reply.setter
def last_operation_reply(self, value: OperationReply):
self._last_operation_request = Undefined
self._last_operation_reply = value
self._operation_execution_complete_event.set()
if self._one_shot:
self.run = False
def reset_operation_reply(self) -> None:
self._last_operation_reply = Undefined
async def wait_for_job(self) -> None:
await self._job_queued_event.wait()
self._job_queued_event.clear()
async def wait_for_operation(self, eventloop: asyncio.AbstractEventLoop | None) -> None:
# assert isinstance(self._operation_execution_ready_event, threading.Event), "not a threaded scheduler"
if isinstance(self._operation_execution_ready_event, threading.Event):
await eventloop.run_in_executor(None, self._operation_execution_ready_event.wait)
else:
await self._operation_execution_ready_event.wait()
self._operation_execution_ready_event.clear()
async def wait_for_reply(self, eventloop: asyncio.AbstractEventLoop | None) -> None:
if isinstance(self._operation_execution_complete_event, threading.Event):
await eventloop.run_in_executor(None, self._operation_execution_complete_event.wait)
else:
await self._operation_execution_complete_event.wait()
self._operation_execution_complete_event.clear()
@property
def has_job(self) -> bool:
raise NotImplementedError("has_job method must be implemented in the subclass")
@property
def next_job(self) -> JobInvokationType:
raise NotImplementedError("next_job method must be implemented in the subclass")
def dispatch_job(self, job: JobInvokationType) -> None:
raise NotImplementedError("dispatch_job method must be implemented in the subclass")
def cleanup(self):
self.run = False
self._job_queued_event.set()
self._operation_execution_ready_event.set()
self._operation_execution_complete_event.set()
@classmethod
def extract_operation_tuple_from_request(self, request_message: RequestMessage) -> OperationRequest:
"""thing execution info"""
return (request_message.header['thingID'], request_message.header['objekt'], request_message.header['operation'],
request_message.body[0], request_message.body[1], request_message.header['thingExecutionContext'])
@classmethod
def format_reply_tuple(self, return_value: typing.Any) -> OperationReply:
pass