Skip to main content
Version: 2.0.0

Class: Message

Events traversing layline.io Workflows are instantiated as a Message. This class exposes a number of properties and methods to extract and set data within messages.

To understand the anatomy of a message please read the respective chapter in the documentation.

Example Message Structure

Assume we have the following data dictionary structure

  • Header
    • IOT
      • RECORD_TYPE
      • DEVICE_NO
  • Detail
    • IOT
      • RECORD_TYPE
      • TIME
      • MEASUREMENT
  • Trailer
    • IOT
      • RECORD_TYPE
      • COUNT

Then in a Python processor we can do this:

def on_message():
if message.typeName == 'Header':
on_header(message)
elif message.typeName == 'Detail':
on_detail(message)
elif message.typeName == 'Trailer':
on_detail(message)

# send the message on through OUTPUT_PORT of Processor
stream.emit(message, OUTPUT_PORT)

And this:

# Handle a detail record type
def on_detail(message):
m = message.data.IOT.MEASUREMENT

VENDOR = Status.getVendorByName('MyVendorLongName')

if m < 0:
message.addStatus(Severity.ERROR, Status.create(VENDOR, 'ILLEGAL_MEASUREMENT', m))

Properties

data

data: object

Methods

addStatus()

addStatus(severity: Severity, status: Status, add_to_log: bool = True) -> None

Adds a Status to a message. The Status must have been created with Status.create or otherwise instantiated.

Parameters

  • severity: Severity

    Severity value.

  • status: Status

    The Status which should be added.

  • add_to_log: bool, optional

    Signals whether the Status shall also be added to the log, or not. Will be added by default if not specified. If True then the Status will be visible in the Stream Log of the Audit Trail.

Returns

None

Example

if error:
message.addStatus(Severity.ERROR, Status.create(VENDOR, 'ILLEGAL_VALUE', value_string))

clone()

clone() -> Message

Creates a full clone of a Message

cloned_message = message.clone()

Returns

Message - A copy of a Message

commit()

commit() -> Message

Commits the message, typically used with message queues or streaming platforms.

This method is used to acknowledge the successful processing of a message. The exact behavior depends on the underlying system:

  • For an SQS queue, it deletes the message from the queue.
  • For a Kafka topic, it commits the offset of the consumer.

Calling this method indicates that the message has been successfully processed and should not be redelivered.

Returns

Message - Returns the message instance for method chaining.

Example

def on_message():
try:
# Process the message
result = process_message(message)

if result['success']:
# If processing was successful, commit the message
message.commit()
print("Message processed and committed successfully")
else:
print("Message processing failed, not committing")
except Exception as error:
print(f"Error processing message: {error}")
# In case of an error, you might choose not to commit
# so that the message can be reprocessed

def process_message(msg):
# Implement your message processing logic here
# Return a dictionary indicating success or failure
return {'success': True}

In this example, we demonstrate a common pattern for message processing:

  1. We attempt to process the message.
  2. If processing is successful, we commit the message to acknowledge its completion.
  3. If processing fails or an error occurs, we don't commit the message, allowing it to be reprocessed.

This approach helps ensure message reliability and prevents data loss in distributed systems.

exists()

exists(entity_declaration: DataDictionaryEntity) -> bool

Checks if a known data structure is recognized within a given Message. Data structures are spawned into existence by the definition of data formats (Format Assets). You can test a particular Message on whether a specific structure is present within a message by using this method.

This is typically used to check whether a message is of a certain type, or not.

Parameters

  • entity_declaration: DataDictionaryEntity

    Path to data dictionary structure which you want to test for existence in the message (DataDictionaryEntity.)

Returns

bool - True, if it exists, else false.

Example

# Get the access path to a structure within the compiled data dictionary
MY_RECORD_TYPE = dataDictionary.type.MyFormat.Detail

# Test message against the existence of the data dictionary structure.
if message.exists(MY_RECORD_TYPE):
...

findStatus()

findStatus(value: Vendor | Severity | (status: Status) => boolean) -> List[Status]

Finds and returns a list of status entries attached to the message based on the provided filter.

This method allows you to search for status entries using three different approaches:

  1. By Vendor: Find all statuses from a specific vendor.
  2. By Severity: Find all statuses of a specific severity level.
  3. By Custom Function: Use a custom filter function to find statuses based on any criteria.

Parameters

  • value: Union[Vendor, Severity, Callable[[Status], bool]]
    • If a Vendor is provided, it returns all statuses from that vendor.
    • If a Severity is provided, it returns all statuses of that severity level.
    • If a function is provided, it should take a Status as input and return a boolean. The method will return all statuses for which this function returns True.

Returns

List[Status] - A list of Status objects that match the provided filter. Returns an empty list if no matching statuses are found.

Examples

  1. Finding statuses by Vendor:
# Assume we have a vendor defined
VENDOR = Status.getVendorByName('MyVendorName')

# Find all statuses from this vendor
vendor_statuses = message.findStatus(VENDOR)
for status in vendor_statuses:
print(f"Found status: {status.code} - {status.message}")
  1. Finding statuses by Severity:
# Find all ERROR statuses
error_statuses = message.findStatus(Severity.ERROR)
if error_statuses:
print(f"Message has {len(error_statuses)} error statuses")
  1. Finding statuses using a custom filter function:
# Find all statuses with a specific code
def has_specific_code(status):
return status.code == "SPECIFIC_CODE"

specific_statuses = message.findStatus(has_specific_code)
for status in specific_statuses:
print(f"Found status with specific code: {status.message}")
  1. Using a lambda function for filtering:
# Find all WARNING or ERROR statuses
high_priority_statuses = message.findStatus(lambda s: s.severity in [Severity.WARNING, Severity.ERROR])
for status in high_priority_statuses:
print(f"High priority status: {s.severity} - {s.message}")

Notes

  • The method returns an empty list if no statuses match the provided filter.
  • When using a custom filter function, you have full flexibility to implement complex filtering logic based on any properties of the Status object.
  • This method is particularly useful for error handling, logging, and conditional processing based on the statuses attached to a message.

getBigInteger()

getBigInteger(entity: DataDictionaryEntity) -> int

Return a BigInteger typed value from a message field. Important!: Please note that this method returns a Python int object, which can handle arbitrarily large integers.

Parameters

Returns

int - Python integer (which can handle arbitrarily large values).

Example

n = message.getBigInteger(dataDictionary.type.Detail.CSV.A_REALLY_BIG_NUMBER_FIELD)

# Compare BigInteger to another BigInteger
x = 123 # x is now a Python int, which can handle arbitrarily large values

x == 123 # -> True

getBoolean()

getBoolean(entity: DataDictionaryEntity, default_value: bool) -> bool

Retrieves a Boolean value from a specific field in the message's data dictionary.

This method accesses a boolean value from the message using the provided data dictionary entity.

Parameters

  • entity: DataDictionaryEntity

    The data dictionary entity that specifies the path to the boolean value in the message.

Returns

bool - The boolean value from the specified field in the message.

Examples

Basic usage:

# Assuming we have a data dictionary entity for an "is_active" field
is_active_entity = dataDictionary.type.MyFormat.Detail.IS_ACTIVE

# Get the boolean value, defaulting to False if not found
is_active = message.getBoolean(is_active_entity)

if is_active:
print("The item is active")
else:
print("The item is not active")

getByte()

getByte(entity: DataDictionaryEntity) -> int

Return the Byte typed value from a message field.

Parameters

Returns

int - Python integer representing a byte value (0-255).

Example

b = message.getByte(dataDictionary.type.Detail.CSV.A_BYTE_FIELD)

getByteString()

getByteString(entity: DataDictionaryEntity) -> bytes

Return the ByteString typed value from a message field.

Parameters

Returns

bytes - Python bytes object.

Example

b = message.getByteString(dataDictionary.type.Detail.CSV.FIELD)

getCharacter()

getCharacter(entity: DataDictionaryEntity) -> str

Return a Character typed value from a message field.

Parameters

Returns

str - Python string of length 1.

Example

c = message.getCharacter(dataDictionary.type.Detail.CSV.FIELD)

getCrc64()

getCrc64(message: MessageNode) -> str

Creates a CRC 64 checksum from specified node within a Message.

Parameters

  • message: MessageNode

    MessageNode for which to create the CRC64 checksum.

Returns

str - CRC 64 checksum

Example

crc64 = message.getCrc64(message.data.CSV)

getDate()

getDate(entity: DataDictionaryEntity) -> LocalDate

Return a LocalDate typed value from a message field.

Parameters

Returns

LocalDate - A date-time with an offset from UTC/Greenwich.

Example

dt = message.getDateTime(dataDictionary.type.Detail.CSV.A_DATE_FIELD)

getDateTime()

getDateTime(entity: DataDictionaryEntity) -> DateTime

Return a DateTime typed value from a message field.

Parameters

Returns

DateTime - A date-time with an offset from UTC/Greenwich.

Example

dt = message.getDateTime(dataDictionary.type.Detail.CSV.FIELD)

getDecimal()

getDecimal(entity: DataDictionaryEntity) -> Decimal

Return a Decimal typed value from a message field.

Parameters

Returns

Decimal - Python Decimal type.

Example

dec = message.getDecimal(dataDictionary.type.Detail.CSV.FIELD)

getDouble()

getDouble(entity: DataDictionaryEntity) -> float

Return a Double typed value from a message field.

Parameters

Returns

float - Python float type.

Example

dbl = message.getDouble(dataDictionary.type.Detail.CSV.FIELD)

getInt()

getInt(entity: DataDictionaryEntity) -> int

Return an Int typed value from a message field.

Parameters

Returns

int - Python int type.

Example

integer = message.getInt(dataDictionary.type.Detail.CSV.FIELD)

getLong()

getLong(entity: DataDictionaryEntity) -> int

Return a Long typed value from a message field.

Parameters

Returns

int - Python int type (which can handle arbitrarily large values).

Example

l = message.getLong(dataDictionary.type.Detail.CSV.FIELD)

getMessageDigest()

getMessageDigest(algorithm: str = "MD5", to_lower_case: bool = False, accessor_array: List[DataDictionaryEntity] = None) -> str

Returns a calculated digest for a given message

Parameters

  • algorithm: str, optional

    Algorithm with which to calculate the digest. Currently only supports "MD5".

  • to_lower_case: bool, optional

    Set to true if digest should be lower-case only.

  • accessor_array: List[DataDictionaryEntity], optional

    List of DataDictionaryEntity on which to calculate the digest.

Returns

str

Example

# Option: 1. Return digest considering complete message content.
# Digest calculation defaults to MD5 hash and no lower case.
md5_digest_full = message.getMessageDigest()

# Option: 2. Return digest for full message content based on MD5 hash.
# Returned digest will be lower case only.
md5_digest_full_lower = message.getMessageDigest("MD5", True)

# Option: 3. Calculate digest considering specific data structures within message only.
record_accessor_for_md5 = [
dataDictionary.type.Detail.CSV.RECORD_TYPE,
dataDictionary.type.Detail.CSV.LAST_NAME,
dataDictionary.type.Detail.CSV.FIRST_NAME
]

md5_digest = message.getMessageDigest("MD5", True, record_accessor_for_md5)

getNumStatusAttached()

getNumStatusAttached() -> int

Gets the number of States Status attached.

result = message.getNumStatusAttached()

Returns

int - Number of States attached to the message.

getObject()

getObject(entity: DataDictionaryEntity) -> Any

Return an Object value from a message field.

Parameters

Returns

Any - Python object.

Example

o = message.getObject(dataDictionary.type.Detail.CSV.FIELD)

getStatus()

getStatus(index: int) -> Status

Retrieves a Status by index from the list of States attached to a message. A message keeps track of related States in a Status array attached to it. This list may be empty or filled with one or more States.

Parameters

  • index: int

    Index of Status to retrieve.

Returns

Status - Status or None if no Status found with that index.

Example

# Retrieve the first Status from the list of States attached to the message.
status = message.getStatus(0)

getString()

getString(entity: DataDictionaryEntity) -> str

Return a String typed value from a message field.

Parameters

Returns

str - The value as string.

Example

s = message.getString(dataDictionary.type.Detail.CSV.FIELD)

hasStatusAttached()

hasStatusAttached(severity: Severity) -> bool

Checks if a message has a Status attached which matches a particular Severity.

result = message.hasStatusAttached(Severity.ERROR)

Parameters

  • severity: Severity

    Severity to check against.

Returns

bool - True, if match found, else False.

pack()

pack() -> PackedMessage

Packs the message into a memory efficient format.

# Pack message
packed_msg = message.pack()

# Unpack message
unpacked_msg = packed_msg.unpack()

Returns

PackedMessage - Packed message.

setBigInteger()

setBigInteger(entity: DataDictionaryEntity, value: int) -> None

Sets a BigInteger value in a message target field.

big_int = 123  # Python int can handle arbitrarily large integers

message.setBigInteger(dataDictionary.type.Detail.CSV.FIELD, big_int)

Parameters

  • entity: DataDictionaryEntity

    DataDictionaryEntity describing the access path to the field value.

  • value: int

    A Python int value (which can handle arbitrarily large integers).

Returns

None

setBoolean()

setBoolean(entity: DataDictionaryEntity, value: bool) -> None

Sets a Boolean value in a message target field.

message.setBoolean(dataDictionary.type.Detail.CSV.FIELD, True)

Parameters

  • entity: DataDictionaryEntity

    DataDictionaryEntity describing the access path to the field value.

  • value: bool

    A Python bool value.

Returns

None

setByte()

setByte(entity: DataDictionaryEntity, value: Union[int, str]) -> None

Sets a Byte value in a message target field.

message.setByte(dataDictionary.type.Detail.CSV.FIELD, 7)
# or
message.setByte(dataDictionary.type.Detail.CSV.FIELD, 'X')

Parameters

  • entity: DataDictionaryEntity

    DataDictionaryEntity describing the access path to the field value.

  • value: Union[int, str]

    A value which can be represented as a Byte (0-255 or a single character).

Returns

None

setByteString()

setByteString(entity: DataDictionaryEntity, value: bytes) -> None

Sets a ByteString value in a message target field.

b = b"XYZ"  # b is now a Python bytes object

message.setByteString(dataDictionary.type.Detail.CSV.FIELD, b)

Parameters

  • entity: DataDictionaryEntity

    DataDictionaryEntity describing the access path to the field value.

  • value: bytes

    A Python bytes object.

Returns

None

setCharacter()

setCharacter(entity: DataDictionaryEntity, value: str) -> None

Sets a Character value in a message target field.

message.setCharacter(dataDictionary.type.Detail.CSV.FIELD, 'c