Skip to content

hololinked.core.zmq.message.RequestMessage

A single unit of message from a ZMQ client to server. The message may be parsed and deserialized into header and body.

Message indices:

Index 0 1 2 3
Desc address header payload preserialized payload

The header is a JSON with the following (shortened) schema:

{
    "messageType": "string",
    "messageID": "string",
    "senderID": "string",
    "serverExecutionContext": {
        "invokationTimeout": "number",
        "executionTimeout": "number",
        "oneway": "boolean"
    },
    "thingID": "string",
    "objekt": "string",
    "operation": "string",
    "payloadContentType": "string",
    "preencodedPayloadContentType": "string",
    "thingExecutionContext": {
        "fetchExecutionLogs": "boolean"
    }
}

For detailed schema, visit here.

Source code in hololinked\core\zmq\message.py
class RequestMessage:
    """
    A single unit of message from a ZMQ client to server. The message may be parsed and deserialized into header and body.

    Message indices:

    | Index | 0       | 1      |   2     |          3            |
    |-------|---------|--------|---------|-----------------------|
    | Desc  | address | header | payload | preserialized payload |

    The header is a JSON with the following (shortened) schema:

    ```json

    {
        "messageType": "string",
        "messageID": "string",
        "senderID": "string",
        "serverExecutionContext": {
            "invokationTimeout": "number",
            "executionTimeout": "number",
            "oneway": "boolean"
        },
        "thingID": "string",
        "objekt": "string",
        "operation": "string",
        "payloadContentType": "string",
        "preencodedPayloadContentType": "string",
        "thingExecutionContext": {
            "fetchExecutionLogs": "boolean"
        }
    }
    ```

    For detailed schema, visit [here](https://hololinked.readthedocs.io/en/latest/protocols/zmq/message.json).
    """
    length = Integer(default=4, readonly=True, 
                    doc="length of the message") # type: int

    def __init__(self, msg : typing.List[bytes]) -> None:
        self._bytes = msg  
        self._header = None # deserialized header
        self._body = None  # type: typing.Optional[typing.Tuple[SerializableData, PreserializedData]]
        self._sender_id = None

    @property
    def byte_array(self) -> typing.List[bytes]:
        """returns the message in bytes"""
        return self._bytes

    @property
    def header(self) -> typing.Tuple[bytes, bytes, bytes, bytes, bytes, typing.Dict[str, typing.Any]]:
        """
        returns the header of the message, namely index 1 from the following:

        | Index | 0       | 1       | 2       | 3                      |
        |-------|---------|---------|---------|------------------------|
        | Desc  | address | header  | payload | preserialized payload  |

        deserizalized to a dictionary. 
        """
        if self._header is None:
            self.parse_header()
        return self._header 

    @property
    def body(self) -> typing.Tuple[bytes, bytes, bytes, typing.Any, typing.Dict[str, typing.Any]]:
        """
        payload of the message
        """
        if self._body is None:
            self.parse_body()
        return self._body

    @property
    def id(self) -> str:
        """ID of the message"""
        return self.header['messageID']

    @property
    def receiver_id(self) -> str:
        """ID of the sender"""
        return self.header['receiverID']

    @property
    def sender_id(self) -> str:
        """ID of the receiver"""
        return self.header['senderID']

    @property
    def thing_id(self) -> str:
        """ID of the thing on which the operation is to be performed"""
        return self.header['thingID']

    @property
    def type(self) -> str:
        """type of the message"""
        return self.header['messageType']

    @property
    def server_execution_context(self) -> typing.Dict[str, typing.Any]:
        """server execution context"""
        return self.header['serverExecutionContext']

    @property
    def thing_execution_context(self) -> typing.Dict[str, typing.Any]:
        """thing execution context"""
        return self.header['thingExecutionContext']

    @property
    def qualified_operation(self) -> str:
        """qualified objekt"""
        return f"{self.header['thingID']}.{self.header['objekt']}.{self.header['operation']}"

    def parse_header(self) -> None:
        """
        extract the header and deserialize the server execution context
        """
        if isinstance(self._bytes[INDEX_HEADER], RequestHeader):
            self._header = self._bytes[INDEX_HEADER]
        elif isinstance(self._bytes[INDEX_HEADER], byte_types):
            self._header = RequestHeader(**Serializers.json.loads(self._bytes[INDEX_HEADER]))
        else:
            raise ValueError(f"header must be of type RequestHeader or bytes, not {type(self._bytes[INDEX_HEADER])}")

    def parse_body(self) -> None:
        """
        extract the body and deserialize payload and thing execution context
        """
        self._body = [
            SerializableData(self._bytes[INDEX_BODY], content_type=self.header['payloadContentType']),
            PreserializedData(self._bytes[INDEX_PRESERIALIZED_BODY], content_type=self.header['preencodedPayloadContentType'])
        ]


    @classmethod
    def craft_from_arguments(cls, 
                            receiver_id: str, 
                            sender_id: str,
                            thing_id: str, 
                            objekt: str, 
                            operation: str, 
                            payload: SerializableData = SerializableNone,
                            preserialized_payload: PreserializedData = PreserializedEmptyByte,
                            server_execution_context: typing.Dict[str, typing.Any] = default_server_execution_context, 
                            thing_execution_context: typing.Dict[str, typing.Any] = default_thing_execution_context
                        ) -> "RequestMessage": 
        """
        create a request message from the given arguments

        Parameters
        ----------
        thing_id: bytes
            id of the thing to which the operation is to be performed
        objekt: str
            objekt of the thing on which the operation is to be performed, i.e. a property, action or event
        operation: str
            operation to be performed
        payload: SerializableData
            payload for the operation
        server_execution_context: Dict[str, Any]
            server-level execution context while performing the operation
        thing_execution_context: Dict[str, Any]
            thing-level execution context while performing the operation

        Returns
        -------
        message: RequestMessage
            the crafted message
        """
        message = RequestMessage([])
        message._header = RequestHeader(
                                messageID=str(uuid4()),
                                messageType=OPERATION, 
                                senderID=sender_id,
                                receiverID=receiver_id,
                                # i.e. the message type is 'OPERATION', not 'HANDSHAKE', 'REPLY', 'TIMEOUT' etc.
                                serverExecutionContext=server_execution_context,
                                thingID=thing_id,
                                objekt=objekt,
                                operation=operation,
                                payloadContentType=payload.content_type,
                                preencodedPayloadContentType=preserialized_payload.content_type,
                                thingExecutionContext=thing_execution_context
                            )
        message._body = [payload, preserialized_payload]
        message._bytes = [
            bytes(receiver_id, encoding='utf-8'),
            Serializers.json.dumps(message._header.json()),
            payload.serialize(),
            preserialized_payload.value
        ]
        return message


    @classmethod
    def craft_with_message_type(cls, 
                                sender_id: str, 
                                receiver_id: str, 
                                message_type: bytes = HANDSHAKE
                            ) -> "RequestMessage":
        """
        create a plain message with a certain type, for example a handshake message.

        Parameters
        ----------
        receiver_id: str
            id of the server
        message_type: bytes
            message type to be sent

        Returns
        -------
        message: RequestMessage
            the crafted message
        """

        message = RequestMessage([])
        message._header = RequestHeader(
            messageID=str(uuid4()),
            messageType=message_type,
            senderID=sender_id,
            receiverID=receiver_id,
            serverExecutionContext=default_server_execution_context
        )
        payload = SerializableNone
        preserialized_payload = PreserializedEmptyByte
        message._body = [
            payload,
            preserialized_payload
        ]
        message._bytes = [
            bytes(receiver_id, encoding='utf-8'),
            Serializers.json.dumps(message._header.json()),
            payload.serialize(),
            preserialized_payload.value
        ]
        return message

Functions

__init__

__init__(msg: typing.List[bytes]) -> None
Source code in hololinked\core\zmq\message.py
def __init__(self, msg : typing.List[bytes]) -> None:
    self._bytes = msg  
    self._header = None # deserialized header
    self._body = None  # type: typing.Optional[typing.Tuple[SerializableData, PreserializedData]]
    self._sender_id = None

header

header() -> typing.Tuple[bytes, bytes, bytes, bytes, bytes, typing.Dict[str, typing.Any]]

returns the header of the message, namely index 1 from the following:

Index 0 1 2 3
Desc address header payload preserialized payload

deserizalized to a dictionary.

Source code in hololinked\core\zmq\message.py
@property
def header(self) -> typing.Tuple[bytes, bytes, bytes, bytes, bytes, typing.Dict[str, typing.Any]]:
    """
    returns the header of the message, namely index 1 from the following:

    | Index | 0       | 1       | 2       | 3                      |
    |-------|---------|---------|---------|------------------------|
    | Desc  | address | header  | payload | preserialized payload  |

    deserizalized to a dictionary. 
    """
    if self._header is None:
        self.parse_header()
    return self._header 

body

body() -> typing.Tuple[bytes, bytes, bytes, typing.Any, typing.Dict[str, typing.Any]]

payload of the message

Source code in hololinked\core\zmq\message.py
@property
def body(self) -> typing.Tuple[bytes, bytes, bytes, typing.Any, typing.Dict[str, typing.Any]]:
    """
    payload of the message
    """
    if self._body is None:
        self.parse_body()
    return self._body

thing_id

thing_id() -> str

ID of the thing on which the operation is to be performed

Source code in hololinked\core\zmq\message.py
@property
def thing_id(self) -> str:
    """ID of the thing on which the operation is to be performed"""
    return self.header['thingID']

craft_from_arguments classmethod

craft_from_arguments(receiver_id: str, sender_id: str, thing_id: str, objekt: str, operation: str, payload: SerializableData = SerializableNone, preserialized_payload: PreserializedData = PreserializedEmptyByte, server_execution_context: typing.Dict[str, typing.Any] = default_server_execution_context, thing_execution_context: typing.Dict[str, typing.Any] = default_thing_execution_context) -> RequestMessage

create a request message from the given arguments

Parameters:

Name Type Description Default

thing_id

str

id of the thing to which the operation is to be performed

required

objekt

str

objekt of the thing on which the operation is to be performed, i.e. a property, action or event

required

operation

str

operation to be performed

required

payload

SerializableData

payload for the operation

SerializableNone

server_execution_context

Dict[str, Any]

server-level execution context while performing the operation

default_server_execution_context

thing_execution_context

Dict[str, Any]

thing-level execution context while performing the operation

default_thing_execution_context

Returns:

Name Type Description
message RequestMessage

the crafted message

Source code in hololinked\core\zmq\message.py
@classmethod
def craft_from_arguments(cls, 
                        receiver_id: str, 
                        sender_id: str,
                        thing_id: str, 
                        objekt: str, 
                        operation: str, 
                        payload: SerializableData = SerializableNone,
                        preserialized_payload: PreserializedData = PreserializedEmptyByte,
                        server_execution_context: typing.Dict[str, typing.Any] = default_server_execution_context, 
                        thing_execution_context: typing.Dict[str, typing.Any] = default_thing_execution_context
                    ) -> "RequestMessage": 
    """
    create a request message from the given arguments

    Parameters
    ----------
    thing_id: bytes
        id of the thing to which the operation is to be performed
    objekt: str
        objekt of the thing on which the operation is to be performed, i.e. a property, action or event
    operation: str
        operation to be performed
    payload: SerializableData
        payload for the operation
    server_execution_context: Dict[str, Any]
        server-level execution context while performing the operation
    thing_execution_context: Dict[str, Any]
        thing-level execution context while performing the operation

    Returns
    -------
    message: RequestMessage
        the crafted message
    """
    message = RequestMessage([])
    message._header = RequestHeader(
                            messageID=str(uuid4()),
                            messageType=OPERATION, 
                            senderID=sender_id,
                            receiverID=receiver_id,
                            # i.e. the message type is 'OPERATION', not 'HANDSHAKE', 'REPLY', 'TIMEOUT' etc.
                            serverExecutionContext=server_execution_context,
                            thingID=thing_id,
                            objekt=objekt,
                            operation=operation,
                            payloadContentType=payload.content_type,
                            preencodedPayloadContentType=preserialized_payload.content_type,
                            thingExecutionContext=thing_execution_context
                        )
    message._body = [payload, preserialized_payload]
    message._bytes = [
        bytes(receiver_id, encoding='utf-8'),
        Serializers.json.dumps(message._header.json()),
        payload.serialize(),
        preserialized_payload.value
    ]
    return message

craft_with_message_type classmethod

craft_with_message_type(sender_id: str, receiver_id: str, message_type: bytes = HANDSHAKE) -> RequestMessage

create a plain message with a certain type, for example a handshake message.

Parameters:

Name Type Description Default

receiver_id

str

id of the server

required

message_type

bytes

message type to be sent

HANDSHAKE

Returns:

Name Type Description
message RequestMessage

the crafted message

Source code in hololinked\core\zmq\message.py
@classmethod
def craft_with_message_type(cls, 
                            sender_id: str, 
                            receiver_id: str, 
                            message_type: bytes = HANDSHAKE
                        ) -> "RequestMessage":
    """
    create a plain message with a certain type, for example a handshake message.

    Parameters
    ----------
    receiver_id: str
        id of the server
    message_type: bytes
        message type to be sent

    Returns
    -------
    message: RequestMessage
        the crafted message
    """

    message = RequestMessage([])
    message._header = RequestHeader(
        messageID=str(uuid4()),
        messageType=message_type,
        senderID=sender_id,
        receiverID=receiver_id,
        serverExecutionContext=default_server_execution_context
    )
    payload = SerializableNone
    preserialized_payload = PreserializedEmptyByte
    message._body = [
        payload,
        preserialized_payload
    ]
    message._bytes = [
        bytes(receiver_id, encoding='utf-8'),
        Serializers.json.dumps(message._header.json()),
        payload.serialize(),
        preserialized_payload.value
    ]
    return message