Skip to content

hololinked.client.abstractions.ConsumedThingEvent

Client side event subscription abstraction. Subclass from here to implement protocol specific event subscription.

Source code in hololinked/hololinked/client/abstractions.py
class ConsumedThingEvent:
    # event subscription
    # Dont add class doc otherwise __doc__ in slots will conflict with class variable

    def __init__(
        self,
        resource: EventAffordance,
        logger: structlog.stdlib.BoundLogger,
        owner_inst: Any,
    ) -> None:
        """
        Parameters
        ----------
        resource: EventAffordance
            dataclass object representing the event
        logger: structlog.stdlib.BoundLogger
            logger instance
        owner_inst: Any
            the parent object that owns this event
        """
        from . import ObjectProxy  # noqa: F401

        self.resource = resource
        self.logger = logger
        self.owner_inst = owner_inst  # type: ObjectProxy
        self._subscribed = dict()
        # self._sync_callbacks = []
        # self._async_callbacks = []

    def subscribe(
        self,
        callbacks: list[Callable] | Callable,
        asynch: bool = False,
        concurrent: bool = False,
        deserialize: bool = True,
        # create_new_connection: bool = False,
    ) -> None:
        """
        subscribe to the event

        Parameters
        ----------
        callbacks: list[Callable] | Callable
            callback or list of callbacks to add
        asynch: bool
            whether to start an async(-io task) event listener instead of a threaded listener
        concurrent: bool
            - asyncio - if `True`, each callback is scheduled as a separate task, if `False` they are awaited sequentially.
            - threading - if `True`, each callback is called in a separate thread, if `False` they are called sequentially.
        deserialize: bool
            if `False`, event payload is passed to the callbacks as raw bytes, if `True` it is deserialized
        """
        op = Operations.observeproperty if isinstance(self.resource, PropertyAffordance) else Operations.subscribeevent
        form = self.resource.retrieve_form(op, None)
        callbacks = callbacks if isinstance(callbacks, (list, tuple)) else [callbacks]
        # if not create_new_connection:
        #   see tag v0.3.2 for logic
        if form is None:
            raise ValueError(f"No form found for {op} operation for {self.resource.name}")
        if asynch:
            get_current_async_loop().call_soon(
                lambda: asyncio.create_task(self.async_listen(form, callbacks, concurrent, deserialize))
            )
        else:
            _thread = threading.Thread(target=self.listen, args=(form, callbacks, concurrent, deserialize), daemon=True)
            _thread.start()

    def unsubscribe(self):
        """unsubscribe from the event"""
        self._subscribed.clear()
        # self._sync_callbacks.clear()
        # self._async_callbacks.clear()

    def listen(self, form: Form, callbacks: list[Callable], concurrent: bool = True, deserialize: bool = True):
        """
        Listen to events and call the callbacks. This method needs to be invoked by the `subscribe()` method
        in threaded mode. Use `async_listen()` for asyncio mode.

        Parameters
        ----------
        form: Form
            form to use for event subscription
        callbacks: list[Callable]
            list of callbacks to call on event
        concurrent: bool
            whether to run each callback concurrently in a separate thread
        deserialize: bool
            whether to deserialize the event payload before passing to callbacks
        """
        raise NotImplementedError("implement listen per protocol")

    async def async_listen(
        self, form: Form, callbacks: list[Callable], concurrent: bool = True, deserialize: bool = True
    ):
        """
        Listen to events and call the callbacks. This method needs to be invoked by the `subscribe()` method
        in asyncio mode. Use `listen()` for threaded mode.

        Parameters
        ----------
        form: Form
            form to use for event subscription
        callbacks: list[Callable]
            list of callbacks to call on event
        concurrent: bool
            whether to run each callback concurrently as separate tasks
        deserialize: bool
            whether to deserialize the event payload before passing to callbacks
        """
        raise NotImplementedError("implement async_listen per protocol")

    def schedule_callbacks(self, callbacks: list[Callable], event_data: Any, concurrent: bool = False) -> None:
        """
        schedule the callbacks to be called with the event data

        Parameters
        ----------
        callbacks: list[Callable]
            list of callbacks to call
        event_data: Any
            event data to pass to the callbacks
        concurrent: bool
            whether to run each callback in a separate thread
        """
        for cb in callbacks:
            try:
                if not concurrent:
                    cb(event_data)
                else:
                    threading.Thread(target=cb, args=(event_data,)).start()
            except Exception as ex:
                self.logger.error(f"Error occurred in callback {cb}: {ex}")
                self.logger.exception(ex)

    async def async_schedule_callbacks(self, callbacks, event_data: Any, concurrent: bool = False) -> None:
        """
        async schedule the callbacks to be called with the event data

        Parameters
        ----------
        callbacks: list[Callable]
            list of callbacks to call
        event_data: Any
            event data to pass to the callbacks
        concurrent: bool
            whether to run each callback in a separate thread
        """
        loop = get_current_async_loop()
        for cb in callbacks:
            try:
                if concurrent:
                    if asyncio.iscoroutinefunction(cb):
                        loop.create_task(cb(event_data))
                    else:
                        loop.run_in_executor(None, cb, event_data)
                elif asyncio.iscoroutinefunction(cb):
                    await cb(event_data)
                else:
                    cb(event_data)
            except Exception as ex:
                self.logger.error(f"Error occurred in callback {cb}: {ex}")
                self.logger.exception(ex)

    def add_callbacks(self, callbacks: list[Callable] | Callable, asynch: bool = False) -> None:
        """
        add callbacks to the event

        Parameters
        ----------
        *callbacks: list[Callable] | Callable
            callback or list of callbacks to add
        """
        raise NotImplementedError(
            "logic error - cannot add callbacks to reuse event subscription. Unsubscribe and resubscribe with new callbacks"
        )

Functions

__init__

__init__(resource: EventAffordance, logger: BoundLogger, owner_inst: Any) -> None

Parameters:

Name Type Description Default

resource

EventAffordance

dataclass object representing the event

required

logger

BoundLogger

logger instance

required

owner_inst

Any

the parent object that owns this event

required
Source code in hololinked/hololinked/client/abstractions.py
def __init__(
    self,
    resource: EventAffordance,
    logger: structlog.stdlib.BoundLogger,
    owner_inst: Any,
) -> None:
    """
    Parameters
    ----------
    resource: EventAffordance
        dataclass object representing the event
    logger: structlog.stdlib.BoundLogger
        logger instance
    owner_inst: Any
        the parent object that owns this event
    """
    from . import ObjectProxy  # noqa: F401

    self.resource = resource
    self.logger = logger
    self.owner_inst = owner_inst  # type: ObjectProxy
    self._subscribed = dict()

subscribe

subscribe(callbacks: list[Callable] | Callable, asynch: bool = False, concurrent: bool = False, deserialize: bool = True) -> None

subscribe to the event

Parameters:

Name Type Description Default

callbacks

list[Callable] | Callable

callback or list of callbacks to add

required

asynch

bool

whether to start an async(-io task) event listener instead of a threaded listener

False

concurrent

bool
  • asyncio - if True, each callback is scheduled as a separate task, if False they are awaited sequentially.
  • threading - if True, each callback is called in a separate thread, if False they are called sequentially.
False

deserialize

bool

if False, event payload is passed to the callbacks as raw bytes, if True it is deserialized

True
Source code in hololinked/hololinked/client/abstractions.py
def subscribe(
    self,
    callbacks: list[Callable] | Callable,
    asynch: bool = False,
    concurrent: bool = False,
    deserialize: bool = True,
    # create_new_connection: bool = False,
) -> None:
    """
    subscribe to the event

    Parameters
    ----------
    callbacks: list[Callable] | Callable
        callback or list of callbacks to add
    asynch: bool
        whether to start an async(-io task) event listener instead of a threaded listener
    concurrent: bool
        - asyncio - if `True`, each callback is scheduled as a separate task, if `False` they are awaited sequentially.
        - threading - if `True`, each callback is called in a separate thread, if `False` they are called sequentially.
    deserialize: bool
        if `False`, event payload is passed to the callbacks as raw bytes, if `True` it is deserialized
    """
    op = Operations.observeproperty if isinstance(self.resource, PropertyAffordance) else Operations.subscribeevent
    form = self.resource.retrieve_form(op, None)
    callbacks = callbacks if isinstance(callbacks, (list, tuple)) else [callbacks]
    # if not create_new_connection:
    #   see tag v0.3.2 for logic
    if form is None:
        raise ValueError(f"No form found for {op} operation for {self.resource.name}")
    if asynch:
        get_current_async_loop().call_soon(
            lambda: asyncio.create_task(self.async_listen(form, callbacks, concurrent, deserialize))
        )
    else:
        _thread = threading.Thread(target=self.listen, args=(form, callbacks, concurrent, deserialize), daemon=True)
        _thread.start()

unsubscribe

unsubscribe()

unsubscribe from the event

Source code in hololinked/hololinked/client/abstractions.py
def unsubscribe(self):
    """unsubscribe from the event"""
    self._subscribed.clear()

listen

listen(form: Form, callbacks: list[Callable], concurrent: bool = True, deserialize: bool = True)

Listen to events and call the callbacks. This method needs to be invoked by the subscribe() method in threaded mode. Use async_listen() for asyncio mode.

Parameters:

Name Type Description Default

form

Form

form to use for event subscription

required

callbacks

list[Callable]

list of callbacks to call on event

required

concurrent

bool

whether to run each callback concurrently in a separate thread

True

deserialize

bool

whether to deserialize the event payload before passing to callbacks

True
Source code in hololinked/hololinked/client/abstractions.py
def listen(self, form: Form, callbacks: list[Callable], concurrent: bool = True, deserialize: bool = True):
    """
    Listen to events and call the callbacks. This method needs to be invoked by the `subscribe()` method
    in threaded mode. Use `async_listen()` for asyncio mode.

    Parameters
    ----------
    form: Form
        form to use for event subscription
    callbacks: list[Callable]
        list of callbacks to call on event
    concurrent: bool
        whether to run each callback concurrently in a separate thread
    deserialize: bool
        whether to deserialize the event payload before passing to callbacks
    """
    raise NotImplementedError("implement listen per protocol")

async_listen async

async_listen(form: Form, callbacks: list[Callable], concurrent: bool = True, deserialize: bool = True)

Listen to events and call the callbacks. This method needs to be invoked by the subscribe() method in asyncio mode. Use listen() for threaded mode.

Parameters:

Name Type Description Default

form

Form

form to use for event subscription

required

callbacks

list[Callable]

list of callbacks to call on event

required

concurrent

bool

whether to run each callback concurrently as separate tasks

True

deserialize

bool

whether to deserialize the event payload before passing to callbacks

True
Source code in hololinked/hololinked/client/abstractions.py
async def async_listen(
    self, form: Form, callbacks: list[Callable], concurrent: bool = True, deserialize: bool = True
):
    """
    Listen to events and call the callbacks. This method needs to be invoked by the `subscribe()` method
    in asyncio mode. Use `listen()` for threaded mode.

    Parameters
    ----------
    form: Form
        form to use for event subscription
    callbacks: list[Callable]
        list of callbacks to call on event
    concurrent: bool
        whether to run each callback concurrently as separate tasks
    deserialize: bool
        whether to deserialize the event payload before passing to callbacks
    """
    raise NotImplementedError("implement async_listen per protocol")

schedule_callbacks

schedule_callbacks(callbacks: list[Callable], event_data: Any, concurrent: bool = False) -> None

schedule the callbacks to be called with the event data

Parameters:

Name Type Description Default

callbacks

list[Callable]

list of callbacks to call

required

event_data

Any

event data to pass to the callbacks

required

concurrent

bool

whether to run each callback in a separate thread

False
Source code in hololinked/hololinked/client/abstractions.py
def schedule_callbacks(self, callbacks: list[Callable], event_data: Any, concurrent: bool = False) -> None:
    """
    schedule the callbacks to be called with the event data

    Parameters
    ----------
    callbacks: list[Callable]
        list of callbacks to call
    event_data: Any
        event data to pass to the callbacks
    concurrent: bool
        whether to run each callback in a separate thread
    """
    for cb in callbacks:
        try:
            if not concurrent:
                cb(event_data)
            else:
                threading.Thread(target=cb, args=(event_data,)).start()
        except Exception as ex:
            self.logger.error(f"Error occurred in callback {cb}: {ex}")
            self.logger.exception(ex)

async_schedule_callbacks async

async_schedule_callbacks(callbacks, event_data: Any, concurrent: bool = False) -> None

async schedule the callbacks to be called with the event data

Parameters:

Name Type Description Default

callbacks

list of callbacks to call

required

event_data

Any

event data to pass to the callbacks

required

concurrent

bool

whether to run each callback in a separate thread

False
Source code in hololinked/hololinked/client/abstractions.py
async def async_schedule_callbacks(self, callbacks, event_data: Any, concurrent: bool = False) -> None:
    """
    async schedule the callbacks to be called with the event data

    Parameters
    ----------
    callbacks: list[Callable]
        list of callbacks to call
    event_data: Any
        event data to pass to the callbacks
    concurrent: bool
        whether to run each callback in a separate thread
    """
    loop = get_current_async_loop()
    for cb in callbacks:
        try:
            if concurrent:
                if asyncio.iscoroutinefunction(cb):
                    loop.create_task(cb(event_data))
                else:
                    loop.run_in_executor(None, cb, event_data)
            elif asyncio.iscoroutinefunction(cb):
                await cb(event_data)
            else:
                cb(event_data)
        except Exception as ex:
            self.logger.error(f"Error occurred in callback {cb}: {ex}")
            self.logger.exception(ex)

add_callbacks

add_callbacks(callbacks: list[Callable] | Callable, asynch: bool = False) -> None

add callbacks to the event

Parameters:

Name Type Description Default

*callbacks

list[Callable] | Callable

callback or list of callbacks to add

required
Source code in hololinked/hololinked/client/abstractions.py
def add_callbacks(self, callbacks: list[Callable] | Callable, asynch: bool = False) -> None:
    """
    add callbacks to the event

    Parameters
    ----------
    *callbacks: list[Callable] | Callable
        callback or list of callbacks to add
    """
    raise NotImplementedError(
        "logic error - cannot add callbacks to reuse event subscription. Unsubscribe and resubscribe with new callbacks"
    )