Skip to content

Headers

hololinked.core.zmq.message.RequestHeader

Bases: Struct

Header of a request message

Source code in hololinked/hololinked/core/zmq/message.py
class RequestHeader(msgspec.Struct):
    """Header of a request message"""

    messageType: str
    messageID: str
    senderID: str
    receiverID: str
    serverExecutionContext: ServerExecutionContext = msgspec.field(
        default_factory=lambda: default_server_execution_context
    )
    thingExecutionContext: ThingExecutionContext = msgspec.field(
        default_factory=lambda: default_thing_execution_context
    )
    thingID: Optional[str] = ""
    objekt: Optional[str] = ""
    operation: Optional[str] = ""
    payloadContentType: Optional[str] = "application/json"
    preencodedPayloadContentType: Optional[str] = "text/plain"

    def __getitem__(self, key: str) -> Any:
        try:
            return getattr(self, key)
        except AttributeError:
            raise KeyError(f"key {key} not found in {self.__class__.__name__}") from None

    def json(self):
        return {f: getattr(self, f) for f in self.__struct_fields__}

hololinked.core.zmq.message.ResponseHeader

Bases: Struct

Header of a response message

Source code in hololinked/hololinked/core/zmq/message.py
class ResponseHeader(msgspec.Struct):
    """Header of a response message"""

    messageType: str
    messageID: str
    receiverID: str
    senderID: str
    payloadContentType: Optional[str] = "application/json"
    preencodedPayloadContentType: Optional[str] = ""

    def __getitem__(self, key: str) -> Any:
        try:
            return getattr(self, key)
        except AttributeError:
            raise KeyError(f"key {key} not found in {self.__class__.__name__}") from None

    def json(self):
        return {f: getattr(self, f) for f in self.__struct_fields__}

hololinked.core.zmq.message.EventHeader

Bases: Struct

Header of an event message

Source code in hololinked/hololinked/core/zmq/message.py
class EventHeader(msgspec.Struct):
    """Header of an event message"""

    messageType: str
    messageID: str
    senderID: str
    eventID: str
    payloadContentType: Optional[str] = "application/json"
    preencodedPayloadContentType: Optional[str] = ""

    def __getitem__(self, key: str) -> Any:
        try:
            return getattr(self, key)
        except AttributeError:
            raise KeyError(f"key {key} not found in {self.__class__.__name__}") from None

    def json(self):
        return {f: getattr(self, f) for f in self.__struct_fields__}