Skip to content

hololinked.core.zmq.message.RequestMessage

A single unit of message from a ZMQ client to server. The message may be parsed and deserialized into header and body.

Message indices:

Index 0 1 2 3 4
Desc address empty byte header payload preserialized payload

For header's JSON schema, visit here.

Source code in hololinked/hololinked/core/zmq/message.py
class RequestMessage:
    """
    A single unit of message from a ZMQ client to server. The message may be parsed and deserialized into header and body.

    Message indices:

    | Index | 0       | 1          | 2      |   3     |          4            |
    |-------|---------|------------|--------|---------|-----------------------|
    | Desc  | address | empty byte | header | payload | preserialized payload |

    For header's JSON schema, visit [here](https://github.com/hololinked-dev/hololinked/blob/main/hololinked/core/zmq/request_message_header_schema.json).
    """

    length = Integer(default=5, readonly=True, class_member=True, doc="length of the message")  # type: int

    def __init__(self, msg: list[bytes]) -> None:
        self._bytes = msg
        self._header = None  # deserialized header
        self._body = None  # type: Optional[tuple[SerializableData, PreserializedData]]
        self._sender_id = None

    @property
    def byte_array(self) -> list[bytes]:
        """
        message byte array, either after being composed or as received from the socket.

        Message indices:

        | Index | 0       | 1          | 2      |   3     |          4            |
        |-------|---------|------------|--------|---------|-----------------------|
        | Desc  | address | empty byte | header | payload | preserialized payload |
        """
        return self._bytes

    @property
    def header(self) -> RequestHeader:
        """header of the message, namely index 1 of the byte array, deserizalized to a dictionary"""
        if self._header is None:
            self.parse_header()
        return self._header

    @property
    def body(self) -> tuple[SerializableData, PreserializedData]:
        """body of the message"""
        if self._body is None:
            self.parse_body()
        return self._body

    @property
    def id(self) -> str:
        """ID of the message"""
        return self.header["messageID"]

    @property
    def receiver_id(self) -> str:
        """ID of the sender"""
        return self.header["receiverID"]

    @property
    def sender_id(self) -> str:
        """ID of the receiver"""
        return self.header["senderID"]

    @property
    def thing_id(self) -> str:
        """ID of the thing on which the operation is to be performed"""
        return self.header["thingID"]

    @property
    def type(self) -> str:
        """type of the message"""
        return self.header["messageType"]

    @property
    def server_execution_context(self) -> dict[str, Any]:
        """server execution context"""
        return self.header["serverExecutionContext"]

    @property
    def thing_execution_context(self) -> dict[str, Any]:
        """thing execution context"""
        return self.header["thingExecutionContext"]

    @property
    def qualified_operation(self) -> str:
        """qualified objekt - a possibly unique string for the operation"""
        return f"{self.header['thingID']}.{self.header['objekt']}.{self.header['operation']}"

    def parse_header(self) -> None:
        """extract the header and deserialize it"""
        if isinstance(self._bytes[INDEX_HEADER], RequestHeader):
            self._header = self._bytes[INDEX_HEADER]
        elif isinstance(self._bytes[INDEX_HEADER], byte_types):
            self._header = RequestHeader(**Serializers.json.loads(self._bytes[INDEX_HEADER]))
        else:
            raise ValueError(f"header must be of type RequestHeader or bytes, not {type(self._bytes[INDEX_HEADER])}")

    def parse_body(self) -> None:
        """extract the body and deserialize payload"""
        self._body = [
            SerializableData(self._bytes[INDEX_BODY], content_type=self.header["payloadContentType"]),
            PreserializedData(
                self._bytes[INDEX_PRESERIALIZED_BODY], content_type=self.header["preencodedPayloadContentType"]
            ),
        ]

    @classmethod
    def craft_from_arguments(
        cls,
        receiver_id: str,
        sender_id: str,
        thing_id: str,
        objekt: str,
        operation: str,
        payload: SerializableData = SerializableNone,
        preserialized_payload: PreserializedData = PreserializedEmptyByte,
        server_execution_context: dict[str, Any] = default_server_execution_context,
        thing_execution_context: dict[str, Any] = default_thing_execution_context,
    ) -> "RequestMessage":
        """
        create a request message from the given arguments

        Parameters
        ----------
        receiver_id: str
            id of the server (ZMQ socket identity)
        sender_id: str
            id of the client (ZMQ socket identity)
        thing_id: str
            id of the thing to which the operation is to be performed
        objekt: str
            objekt of the thing on which the operation is to be performed, i.e. a property, action or event name
        operation: str
            operation to be performed (`invokeaction`, `readproperty`, `writeproperty` etc.)
        payload: SerializableData
            payload for the operation
        preserialized_payload: PreserializedData
            pre-encoded payload for the operation
        server_execution_context: Dict[str, Any]
            server-level execution context while performing the operation
        thing_execution_context: Dict[str, Any]
            thing-level execution context while performing the operation

        Returns
        -------
        message: RequestMessage
            the crafted message
        """
        message = RequestMessage([])
        message._header = RequestHeader(
            messageID=str(uuid4()),
            messageType=OPERATION,
            senderID=sender_id,
            receiverID=receiver_id,
            # i.e. the message type is 'OPERATION', not 'HANDSHAKE', 'REPLY', 'TIMEOUT' etc.
            serverExecutionContext=server_execution_context,
            thingID=thing_id,
            objekt=objekt,
            operation=operation,
            payloadContentType=payload.content_type,
            preencodedPayloadContentType=preserialized_payload.content_type,
            thingExecutionContext=thing_execution_context,
        )
        message._body = [payload, preserialized_payload]
        message._bytes = [
            bytes(receiver_id, encoding="utf-8"),
            bytes(),
            Serializers.json.dumps(message._header.json()),
            payload.serialize(),
            preserialized_payload.value,
        ]
        return message

    @classmethod
    def craft_with_message_type(
        cls, sender_id: str, receiver_id: str, message_type: bytes = HANDSHAKE
    ) -> "RequestMessage":
        """
        create a plain message with a certain type, for example a handshake message.

        Parameters
        ----------
        sender_id: str
            id of the client (ZMQ socket identity)
        receiver_id: str
            id of the server (ZMQ socket identity)
        message_type: bytes
            message type to be sent (i.e. 'HANDSHAKE', 'EXIT' etc.)

        Returns
        -------
        message: RequestMessage
            the crafted message
        """

        message = RequestMessage([])
        message._header = RequestHeader(
            messageID=str(uuid4()),
            messageType=message_type,
            senderID=sender_id,
            receiverID=receiver_id,
            serverExecutionContext=default_server_execution_context,
        )
        payload = SerializableNone
        preserialized_payload = PreserializedEmptyByte
        message._body = [payload, preserialized_payload]
        message._bytes = [
            bytes(receiver_id, encoding="utf-8"),
            bytes(),
            Serializers.json.dumps(message._header.json()),
            payload.serialize(),
            preserialized_payload.value,
        ]
        return message

    def __str__(self) -> str:
        return f"RequestMessage(id={self.id}, type={self.type}, header={self.header})"

Functions

__init__

__init__(msg: list[bytes]) -> None
Source code in hololinked/hololinked/core/zmq/message.py
def __init__(self, msg: list[bytes]) -> None:
    self._bytes = msg
    self._header = None  # deserialized header
    self._body = None  # type: Optional[tuple[SerializableData, PreserializedData]]
    self._sender_id = None

header

header() -> RequestHeader

header of the message, namely index 1 of the byte array, deserizalized to a dictionary

Source code in hololinked/hololinked/core/zmq/message.py
@property
def header(self) -> RequestHeader:
    """header of the message, namely index 1 of the byte array, deserizalized to a dictionary"""
    if self._header is None:
        self.parse_header()
    return self._header

body

body() -> tuple[SerializableData, PreserializedData]

body of the message

Source code in hololinked/hololinked/core/zmq/message.py
@property
def body(self) -> tuple[SerializableData, PreserializedData]:
    """body of the message"""
    if self._body is None:
        self.parse_body()
    return self._body

thing_id

thing_id() -> str

ID of the thing on which the operation is to be performed

Source code in hololinked/hololinked/core/zmq/message.py
@property
def thing_id(self) -> str:
    """ID of the thing on which the operation is to be performed"""
    return self.header["thingID"]

byte_array

byte_array() -> list[bytes]

message byte array, either after being composed or as received from the socket.

Message indices:

Index 0 1 2 3 4
Desc address empty byte header payload preserialized payload
Source code in hololinked/hololinked/core/zmq/message.py
@property
def byte_array(self) -> list[bytes]:
    """
    message byte array, either after being composed or as received from the socket.

    Message indices:

    | Index | 0       | 1          | 2      |   3     |          4            |
    |-------|---------|------------|--------|---------|-----------------------|
    | Desc  | address | empty byte | header | payload | preserialized payload |
    """
    return self._bytes

craft_from_arguments classmethod

craft_from_arguments(receiver_id: str, sender_id: str, thing_id: str, objekt: str, operation: str, payload: SerializableData = SerializableNone, preserialized_payload: PreserializedData = PreserializedEmptyByte, server_execution_context: dict[str, Any] = default_server_execution_context, thing_execution_context: dict[str, Any] = default_thing_execution_context) -> RequestMessage

create a request message from the given arguments

Parameters:

Name Type Description Default

receiver_id

str

id of the server (ZMQ socket identity)

required

sender_id

str

id of the client (ZMQ socket identity)

required

thing_id

str

id of the thing to which the operation is to be performed

required

objekt

str

objekt of the thing on which the operation is to be performed, i.e. a property, action or event name

required

operation

str

operation to be performed (invokeaction, readproperty, writeproperty etc.)

required

payload

SerializableData

payload for the operation

SerializableNone

preserialized_payload

PreserializedData

pre-encoded payload for the operation

PreserializedEmptyByte

server_execution_context

dict[str, Any]

server-level execution context while performing the operation

default_server_execution_context

thing_execution_context

dict[str, Any]

thing-level execution context while performing the operation

default_thing_execution_context

Returns:

Name Type Description
message RequestMessage

the crafted message

Source code in hololinked/hololinked/core/zmq/message.py
@classmethod
def craft_from_arguments(
    cls,
    receiver_id: str,
    sender_id: str,
    thing_id: str,
    objekt: str,
    operation: str,
    payload: SerializableData = SerializableNone,
    preserialized_payload: PreserializedData = PreserializedEmptyByte,
    server_execution_context: dict[str, Any] = default_server_execution_context,
    thing_execution_context: dict[str, Any] = default_thing_execution_context,
) -> "RequestMessage":
    """
    create a request message from the given arguments

    Parameters
    ----------
    receiver_id: str
        id of the server (ZMQ socket identity)
    sender_id: str
        id of the client (ZMQ socket identity)
    thing_id: str
        id of the thing to which the operation is to be performed
    objekt: str
        objekt of the thing on which the operation is to be performed, i.e. a property, action or event name
    operation: str
        operation to be performed (`invokeaction`, `readproperty`, `writeproperty` etc.)
    payload: SerializableData
        payload for the operation
    preserialized_payload: PreserializedData
        pre-encoded payload for the operation
    server_execution_context: Dict[str, Any]
        server-level execution context while performing the operation
    thing_execution_context: Dict[str, Any]
        thing-level execution context while performing the operation

    Returns
    -------
    message: RequestMessage
        the crafted message
    """
    message = RequestMessage([])
    message._header = RequestHeader(
        messageID=str(uuid4()),
        messageType=OPERATION,
        senderID=sender_id,
        receiverID=receiver_id,
        # i.e. the message type is 'OPERATION', not 'HANDSHAKE', 'REPLY', 'TIMEOUT' etc.
        serverExecutionContext=server_execution_context,
        thingID=thing_id,
        objekt=objekt,
        operation=operation,
        payloadContentType=payload.content_type,
        preencodedPayloadContentType=preserialized_payload.content_type,
        thingExecutionContext=thing_execution_context,
    )
    message._body = [payload, preserialized_payload]
    message._bytes = [
        bytes(receiver_id, encoding="utf-8"),
        bytes(),
        Serializers.json.dumps(message._header.json()),
        payload.serialize(),
        preserialized_payload.value,
    ]
    return message

craft_with_message_type classmethod

craft_with_message_type(sender_id: str, receiver_id: str, message_type: bytes = HANDSHAKE) -> RequestMessage

create a plain message with a certain type, for example a handshake message.

Parameters:

Name Type Description Default

sender_id

str

id of the client (ZMQ socket identity)

required

receiver_id

str

id of the server (ZMQ socket identity)

required

message_type

bytes

message type to be sent (i.e. 'HANDSHAKE', 'EXIT' etc.)

HANDSHAKE

Returns:

Name Type Description
message RequestMessage

the crafted message

Source code in hololinked/hololinked/core/zmq/message.py
@classmethod
def craft_with_message_type(
    cls, sender_id: str, receiver_id: str, message_type: bytes = HANDSHAKE
) -> "RequestMessage":
    """
    create a plain message with a certain type, for example a handshake message.

    Parameters
    ----------
    sender_id: str
        id of the client (ZMQ socket identity)
    receiver_id: str
        id of the server (ZMQ socket identity)
    message_type: bytes
        message type to be sent (i.e. 'HANDSHAKE', 'EXIT' etc.)

    Returns
    -------
    message: RequestMessage
        the crafted message
    """

    message = RequestMessage([])
    message._header = RequestHeader(
        messageID=str(uuid4()),
        messageType=message_type,
        senderID=sender_id,
        receiverID=receiver_id,
        serverExecutionContext=default_server_execution_context,
    )
    payload = SerializableNone
    preserialized_payload = PreserializedEmptyByte
    message._body = [payload, preserialized_payload]
    message._bytes = [
        bytes(receiver_id, encoding="utf-8"),
        bytes(),
        Serializers.json.dumps(message._header.json()),
        payload.serialize(),
        preserialized_payload.value,
    ]
    return message

parse_header

parse_header() -> None

extract the header and deserialize it

Source code in hololinked/hololinked/core/zmq/message.py
def parse_header(self) -> None:
    """extract the header and deserialize it"""
    if isinstance(self._bytes[INDEX_HEADER], RequestHeader):
        self._header = self._bytes[INDEX_HEADER]
    elif isinstance(self._bytes[INDEX_HEADER], byte_types):
        self._header = RequestHeader(**Serializers.json.loads(self._bytes[INDEX_HEADER]))
    else:
        raise ValueError(f"header must be of type RequestHeader or bytes, not {type(self._bytes[INDEX_HEADER])}")

parse_body

parse_body() -> None

extract the body and deserialize payload

Source code in hololinked/hololinked/core/zmq/message.py
def parse_body(self) -> None:
    """extract the body and deserialize payload"""
    self._body = [
        SerializableData(self._bytes[INDEX_BODY], content_type=self.header["payloadContentType"]),
        PreserializedData(
            self._bytes[INDEX_PRESERIALIZED_BODY], content_type=self.header["preencodedPayloadContentType"]
        ),
    ]