Skip to content

hololinked.core.zmq.rpc_server.ThreadedScheduler

Bases: Scheduler

Scheduler class to schedule the operations of a thing in a threaded loop.

Source code in hololinked/hololinked/core/zmq/rpc_server.py
class ThreadedScheduler(Scheduler):
    """
    Scheduler class to schedule the operations of a thing in a threaded loop.
    """

    def __init__(self, instance: Thing, rpc_server: RPCServer) -> None:
        super().__init__(instance, rpc_server)
        self._job = None
        self._execution_thread = None
        self._one_shot = True
        self._operation_execution_ready_event = threading.Event()
        self._operation_execution_complete_event = threading.Event()

    @property
    def has_job(self) -> bool:
        return self._job is not None

    @property
    def next_job(self) -> Scheduler.JobInvokationType:
        if self._job is None:
            raise RuntimeError("No job to execute")
        return self._job

    def dispatch_job(self, job: Scheduler.JobInvokationType) -> None:
        """"""
        self._job = job
        eventloop = get_current_async_loop()
        eventloop.create_task(self.rpc_server.tunnel_message_to_things(self))
        self._execution_thread = threading.Thread(
            target=asyncio.run,
            args=(self.rpc_server.run_thing_instance(self.instance, self),),
        )
        self._execution_thread.start()
        self._job_queued_event.set()

Functions

dispatch_job

dispatch_job(job: JobInvokationType) -> None
Source code in hololinked/hololinked/core/zmq/rpc_server.py
def dispatch_job(self, job: Scheduler.JobInvokationType) -> None:
    """"""
    self._job = job
    eventloop = get_current_async_loop()
    eventloop.create_task(self.rpc_server.tunnel_message_to_things(self))
    self._execution_thread = threading.Thread(
        target=asyncio.run,
        args=(self.rpc_server.run_thing_instance(self.instance, self),),
    )
    self._execution_thread.start()
    self._job_queued_event.set()