Skip to main content

Message

The Message class is the central data structure in layline.io. Every event that flows through a workflow is encapsulated as a Message — a typed envelope carrying structured data, metadata, and a history of processing status.

In every JavaScript processor, the current message is available as the global variable message. No import or setup required.


At a Glance

export function onMessage() {
// Every message has an identity, a type, and data
stream.logInfo(`Processing message ${message.id} of type "${message.typeName}"`);

// Branch by message type
if (message.typeName === 'Header') {
onHeader(message);
} else if (message.typeName === 'Detail') {
onDetail(message);
}

// Forward to the next processor
stream.emit(message, OUTPUT_PORT);
}

Properties

PropertyTypeDescription
dataObjectThe message payload — a nested object reflecting your data dictionary structure
idstringUnique message identifier (e.g., "1", "1.1", "1.2" for clones)
typeNamestringThe data dictionary type this message represents

data

The message payload. Its structure mirrors the data dictionary definition for this message type.

// Read nested fields directly
const productName = message.data.PRODUCT.NAME;
const price = message.data.PRODUCT.PRICE;

// Create a new message and populate it
const detail = dataDictionary.createMessage(dataDictionary.type.Detail);
detail.data.PRODUCT = {
RECORD_TYPE : "D",
ID : message.data.Id,
NAME : message.data.Name,
PRICE : message.data.Price,
};

stream.emit(detail, OUTPUT_PORT);

id

A unique identifier assigned to each message. The first message in a stream is "1", the second "2", and so on. When you clone a message, the clone gets a suffix: "1.1", "1.2", etc.

const id = message.id; // "42" or "42.3"

typeName

The name of the data dictionary type this message was created from. Use this to branch your processing logic.

if (message.typeName === 'MyType') {
// Handle this specific type
}

Type Checking

is(type)

Check if a message matches a specific data dictionary type.

if (message.is(dataDictionary.type.Detail.CSV)) {
// Message is a Detail CSV record
}
ParameterTypeDescription
typeDataDictionaryEntityThe type to check against

Returns: boolean

exists(entity)

Check if a specific data structure is present within the message. Useful when a format defines optional structures.

const recordType = dataDictionary.type.MyFormat.Detail;

if (message.exists(recordType)) {
// This message contains a Detail structure
}
ParameterTypeDescription
entityDataDictionaryEntityPath to the structure to test

Returns: boolean


Reading Data

Message fields are accessed through typed getter methods. Each method takes a DataDictionaryEntity that describes the path to the field.

Text & Boolean

MethodReturnsDescription
getString(entity)StringRead a string value
getBoolean(entity)booleanRead a boolean value
const name = message.getString(dataDictionary.type.Detail.CSV.NAME);
const isActive = message.getBoolean(dataDictionary.type.Detail.CSV.IS_ACTIVE);

Numbers: Which Type to Use?

MethodReturnsJava TypeUse When...
getInt(entity)Integerjava.lang.IntegerWhole number, -2 billion to +2 billion
getLong(entity)Longjava.lang.LongWhole number, larger than 2 billion
getDouble(entity)Doublejava.lang.DoubleFloating point, precision not critical
getDecimal(entity)BigDecimaljava.math.BigDecimalDecimal with exact precision (money, rates)
getBigInteger(entity)BigIntegerjava.math.BigIntegerArbitrary-size whole numbers
const count = message.getInt(dataDictionary.type.Order.QUANTITY);
const timestamp = message.getLong(dataDictionary.type.Order.TS_EPOCH);
const price = message.getDecimal(dataDictionary.type.Order.UNIT_PRICE);
Java Native Types

getBigInteger(), getDecimal(), getLong(), and getDouble() return Java native types — not JavaScript numbers. Simple operators may trigger implicit conversion and lose precision.

Use .equals() for comparisons and type-specific methods for math:

const BigInteger = Java.type("java.math.BigInteger");
const x = new BigInteger("9007199254740993");

x == 9007199254740993; // ⚠️ true (but precision lost!)
x.equals(9007199254740993); // false — different types
x.equals(new BigInteger("9007199254740993")); // true ✓

Date & Time

MethodReturnsDescription
getDate(entity)LocalDateDate without timezone (e.g., 2024-03-15)
getDateTime(entity)DateTimeDate-time with UTC offset (e.g., 2024-03-15T10:30:00+01:00)
const birthDate = message.getDate(dataDictionary.type.Customer.BIRTH_DATE);
const createdAt = message.getDateTime(dataDictionary.type.Order.CREATED_AT);

Binary & Specialized

MethodReturnsDescription
getByte(entity)ByteSingle byte value
getByteString(entity)ByteStringByte array as string
getCharacter(entity)CharacterSingle character
getObject(entity)ObjectGeneric object (use sparingly)

Checksums

MethodReturnsDescription
getCrc64(entity)stringCRC-64 checksum of the specified node
getMessageDigest(algorithm?, toLowerCase?, accessors?)stringMD5 digest of full or partial message
// CRC-64 of a specific structure
const crc = message.getCrc64(message.data.CSV);

// MD5 of the entire message
const digest = message.getMessageDigest();

// MD5 of selected fields only
const fields = [
dataDictionary.type.Detail.CSV.RECORD_TYPE,
dataDictionary.type.Detail.CSV.LAST_NAME,
dataDictionary.type.Detail.CSV.FIRST_NAME
];
const partialDigest = message.getMessageDigest("MD5", true, fields);

Writing Data

All setters follow the same pattern: setX(entity, value). The value can often be a JavaScript primitive — layline.io handles the conversion.

Quick Reference

CategorySetterAcceptsExample
TextsetString(entity, value)stringmessage.setString(path, "Hello")
setCharacter(entity, value)string (single char)message.setCharacter(path, 'A')
setByteString(entity, value)stringmessage.setByteString(path, "XYZ")
BooleansetBoolean(entity, value)booleanmessage.setBoolean(path, true)
NumberssetInt(entity, value)number, String, variousmessage.setInt(path, 42)
setLong(entity, value)number, String, variousmessage.setLong(path, 9999999999)
setDouble(entity, value)number, String, variousmessage.setDouble(path, 3.14159)
setDecimal(entity, value)number, String, variousmessage.setDecimal(path, 123.45)
setBigInteger(entity, value)BigInteger, convertiblemessage.setBigInteger(path, bigInt)
BinarysetByte(entity, value)number, stringmessage.setByte(path, 7)
Date/TimesetDate(entity, value)LocalDatemessage.setDate(path, localDate)
setDateTime(entity, value)DateTimemessage.setDateTime(path, dt)
GenericsetObject(entity, value)Objectmessage.setObject(path, [1, 2, 3])

Direct Assignment Shortcut

For simple cases, assign directly via data:

// These are equivalent:
message.setString(dataDictionary.type.Detail.CSV.NAME, "Acme Corp");
message.data.CSV.NAME = "Acme Corp";

Status Management

Messages carry a Status array that tracks processing events — errors, warnings, or custom business states.

Adding Status

const VENDOR = Status.getVendorByName('MyVendor');

if (measurement < 0) {
message.addStatus(Severity.ERROR, Status.create(VENDOR, 'ILLEGAL_MEASUREMENT', measurement));
}
ParameterTypeDescription
severitySeverityINFO, WARNING, ERROR, etc.
statusStatusThe status object to attach
addToLogboolean (default: true)Also log to the audit trail?

Querying Status

MethodReturnsDescription
numStatusAttached()numberCount of attached statuses
hasStatusAttached(severity?)booleanCheck for any (or specific severity) status
getStatus(index)Status | undefinedGet status by index
findStatus(filter)Status[]Find statuses matching a filter
// Check for errors
if (message.hasStatusAttached(Severity.ERROR)) {
stream.logError("Message has errors — routing to failure port");
stream.emit(message, ERROR_PORT);
return;
}

// Find all warnings
const warnings = message.findStatus(Severity.WARNING);
warnings.forEach(s => stream.logWarn(`${s.code}: ${s.message}`));

// Find statuses from a specific vendor
const vendorStatuses = message.findStatus(VENDOR);

// Custom filter
const critical = message.findStatus(s => s.code === 'CRITICAL');

Message Lifecycle

clone()

Creates a deep copy of the message with a new ID suffix.

const original = message; // id: "5"
const copy = message.clone(); // id: "5.1"
const another = message.clone(); // id: "5.2"

Returns: Message — the cloned instance

pack()

Compresses the message into a memory-efficient PackedMessage for storage or transmission.

const packed = message.pack(); // Compact representation
const restored = packed.unpack(); // Back to full Message

Returns: PackedMessage

commit()

Acknowledges successful processing. Behavior depends on the source:

  • SQS: Deletes the message from the queue
  • Kafka: Commits the consumer offset
  • File: Marks the file as processed
try {
processMessage(message);
message.commit(); // Acknowledge success
} catch (err) {
// Don't commit — message will be redelivered
message.addStatus(Severity.ERROR, Status.create(VENDOR, 'PROCESSING_FAILED', err));
}

Returns: Message (supports chaining)


Serialization

MethodReturnsDescription
toJson()stringJSON representation of the message
toString()stringHuman-readable string representation
stream.logInfo("Received: " + message.toJson());
stream.logDebug("Message dump: " + message.toString());

Complete Example

A realistic order processor demonstrating type checking, field access, validation, cloning, and status:

export function onMessage() {
// Only process Detail records
if (!message.is(dataDictionary.type.Order.Detail)) {
stream.emit(message, OUTPUT_PORT);
return;
}

// Extract fields
const orderId = message.getString(dataDictionary.type.Order.Detail.ORDER_ID);
const quantity = message.getInt(dataDictionary.type.Order.Detail.QUANTITY);
const unitPrice = message.getDecimal(dataDictionary.type.Order.Detail.UNIT_PRICE);

stream.logInfo(`Processing order ${orderId} (message ${message.id})`);

// Business validation
const VENDOR = Status.getVendorByName('OrderValidation');

if (quantity <= 0) {
message.addStatus(Severity.ERROR, Status.create(VENDOR, 'INVALID_QUANTITY', quantity));
}

if (unitPrice.compareTo(new BigDecimal("0")) <= 0) {
message.addStatus(Severity.ERROR, Status.create(VENDOR, 'INVALID_PRICE', unitPrice));
}

// If valid, enrich and clone for fulfillment
if (!message.hasStatusAttached(Severity.ERROR)) {
const total = unitPrice.multiply(new BigDecimal(quantity));
message.setDecimal(dataDictionary.type.Order.Detail.TOTAL, total);

// Clone for parallel fulfillment pipeline
const fulfillment = message.clone();
stream.emit(fulfillment, FULFILLMENT_PORT);
}

// Always emit original for audit trail
stream.emit(message, OUTPUT_PORT);
}

Common Pitfalls

ProblemCauseSolution
getLong() returns a Java object, not a JS numberLong exceeds JS safe integer rangeUse .longValue() or treat as Java Long
getDecimal() comparison with === failsReturns BigDecimal, not primitiveUse .compareTo() or .equals()
Field appears missingOptional structure not present in this messageUse exists() before accessing
message.data.X = Y doesn't persistDirect assignment bypasses type validationUse setX() methods for type safety
Status not visible in audit trailaddToLog parameter set to falseOmit the parameter (defaults to true)

See Also