Skip to content

Events

API Reference

Events are pushed to the client through a publish-subscribe mechanism through all the protocols. Call push() method on the event to publish data to clients:

Definition
from hololinked.core import Thing, Event, Property


class GentecMaestroEnergyMeter(Thing):
    """
    Simple example to implement acquisition loops and events
    to push the captured data. Customize it for your application or
    implement your own.
    """

    data_point_event = Event(
        doc="Event raised when a new data point is available",
        label="Data Point Event",
    )

    def loop(self):
        self._run = True
        while self._run:
            self._last_measurement = self.read_current_value()
            timestamp = datetime.datetime.now().strftime("%H:%M:%S")
            self.data_point_event.push(
                dict(timestamp=timestamp, energy=self._last_measurement)
            )

Subscription

API Reference

One can subscribe to the event using the attribute name:

In the default synchronous mode, the subscribe_event method spawns a thread that listens to the event in backgound:

Subscription
1
2
3
4
5
6
7
8
9
from hololinked.client import ClientFactory

energy_meter = ClientFactory.http(url="http://localhost:8000/energy-meter")
# energy_meter = ClientFactory.zmq(id="energy_meter", access_point="IPC")

def event_cb(event_data):
    print(event_data)

energy_meter.subscribe_event(name="data_point_event", callbacks=event_cb)

In the asynchronous mode, the subscribe_event method creates an event listening task in the running async loop. This requires the client to be running in an async loop, otherwise no events will be received although the server will be publishing it:

Subscription
from hololinked.client import ClientFactory

energy_meter = ClientFactory.http(url="http://localhost:8000/energy-meter")
# energy_meter = ClientFactory.zmq(id="energy_meter", access_point="IPC")

def event_cb(event):
    print(event)

energy_meter.subscribe_event(
    name="data_point_event",
    callbacks=event_cb,
    asynch=True
)

The callback function(s) must accept a single argument - an instance of SSE object. The payload can be accessed as using the data attribute:

Event Data
def event_cb(event):
    print(event.data)

The SSE object also contains metadata like id, event name and retry interval, but these are currently not well supported. Improvements in the future are expected.


Each subscription creates a new event stream. One can also supply multiple callbacks which may be called in series or concurrently:

The background thread that listens to the event executes the callbacks in series in its own thread:

Sequential Callbacks
def event_cb1(event):
    print("First Callback", event.data)

def event_cb2(event):
    print("Second callback", event.data)

energy_meter.subscribe_event(
    name="statistics_event",
    callbacks=[event_cb1, event_cb2]
    # This also works for async where all callbacks are awaited in series
)

The background thread that listens to the event executes the callbacks by spawning new threads:

Thread Callbacks
def event_cb1(event):
    print("First Callback", event.data)

def event_cb2(event):
    print("Second callback", event.data)

energy_meter.subscribe_event(
    name="statistics_event",
    callbacks=[event_cb1, event_cb2],
    concurrent=True
)

The event listening task creates newer tasks in the running event loop:

Thread Callbacks
async def event_cb1(event):
    print("First Callback", event.data)
    await some_async_function1(event.data)

async def event_cb2(event):
    print("Second callback", event.data)
    await some_async_function2(event.data)

energy_meter.subscribe_event(
    name="statistics_event",
    callbacks=[event_cb1, event_cb2],
    asynch=True,
    concurrent=True
)

In GUI frameworks like PyQt, you cannot paint the GUI from the event thread. You would need to use signals and slots or other mechanisms to update the GUI to hand over the data.


To unsubscribe:

Unsubscription
energy_meter.unsubscribe_event(name="data_point_event")

All subscriptions to the same event are removed.

Payload Schema

Schema for the payload may be supplied using pydantic or JSON schema:

Payload Schema
class GentecMaestroEnergyMeter(Thing):

    data_point_event_schema = {
        "type": "object",
        "properties": {
            "timestamp": {"type": "string", "format": "date-time"},
            "energy": {"type": "number"}
        },
        "required": ["timestamp", "energy"],
    }

    data_point_event = Event(
        doc="Event raised when a new data point is available",
        label="Data Point Event",
        schema=data_point_event_schema,
    )

There is no validation on the client side currently implemented in hololinked.client. This will be added in future releases.

Schema as seen in Thing Description
GentecMaestro.data_point_event.to_affordance().json()
{
    "description": "Event raised when a new data point is available",
    "data": {
        "type": "object",
        "properties": {
            "timestamp": {
                "type": "string",
                "format": "date-time"
            },
            "energy": {
                "type": "number"
            }
        },
        "required": ["timestamp", "energy"]
    }
}

Thing Description Metadata

Key Supported Comment
subscription
data payload schema for the event
dataResponse will be supported in a future release
cancellation - Server sent events can be cancelled by the client directly