Skip to main content

JavaScriptProcessor

The JavaScriptProcessor defines the lifecycle hooks available in a JavaScript Asset. These hooks are automatically invoked by layline.io at key moments — startup, message arrival, stream boundaries, and shutdown.

Implement only the hooks you need. The most commonly used are onInit() (one-time setup) and onMessage() (per-message processing).


Lifecycle Overview

Project Startup
└── onInit() ← Once, when project starts

Stream Start
└── onStreamStart() ← Per stream

Message Arrives
└── onMessage() ← Per message (the heart of your logic)

Downstream Ready
└── onPullMessage() ← When downstream can receive more

Stream Ends
└── onStreamEnd() ← Per stream

Prepare Commit
└── onPrepareCommit() ← Before finalizing

Commit
└── onCommit() ← Finalize resources

Rollback
└── onRollback() ← On failure, undo changes

Prepare Retry
└── onPrepareRetry() ← Before retry attempt

Hooks

onInit()

Called once when the project starts. Use for one-time initialization: resolving output ports, opening connections, loading configuration.

let OUTPUT_PORT;
let DB_CONNECTION;

export function onInit() {
OUTPUT_PORT = processor.getOutputPort('Output');

const dbUrl = processor.expandString('${lay:DB_URL}');
DB_CONNECTION = openDatabaseConnection(dbUrl);
}

onMessage()

Called for every message that arrives at this processor. This is where your main processing logic lives.

export function onMessage() {
// Branch by message type
if (message.typeName === 'Header') {
processHeader(message);
} else if (message.typeName === 'Detail') {
processDetail(message);
} else if (message.typeName === 'Trailer') {
processTrailer(message);
}

// Always emit to keep the flow moving
stream.emit(message, OUTPUT_PORT);
}

function processDetail(msg) {
const qty = msg.getInt(dataDictionary.type.Order.Detail.QUANTITY);
if (qty <= 0) {
msg.addStatus(Severity.ERROR, Status.create(VENDOR, 'INVALID_QTY', qty));
}
}

onStreamStart()

Called when a new stream begins. Use to reset per-stream counters or capture stream metadata.

let streamId;
let fileName;
let recordCount = 0;

export function onStreamStart() {
streamId = stream.id;
fileName = stream.name;
recordCount = 0;

stream.logInfo(`Starting stream ${streamId}: ${fileName}`);
}

onStreamEnd()

Called when the current stream ends. Use for cleanup, summary logging, or final batch operations.

export function onStreamEnd() {
stream.logInfo(`Stream complete. Processed ${recordCount} records.`);

if (errorCount > 0) {
stream.logWarning(`${errorCount} records had errors.`);
}
}

onPrepareCommit()

Called before the stream is committed. Use for any final preparatory work.

export function onPrepareCommit() {
// Flush any buffered writes
flushPendingRecords();
}

onCommit()

Called when the stream is successfully committed. Use to release resources.

export function onCommit() {
if (DB_CONNECTION) {
DB_CONNECTION.commitTransaction();
DB_CONNECTION.closeConnection();
DB_CONNECTION = null;
}
}

onRollback()

Called when a rollback is requested. Use to undo changes and clean up.

export function onRollback() {
if (DB_CONNECTION) {
try {
DB_CONNECTION.rollbackTransaction();
DB_CONNECTION.closeConnection();
} catch (err) {
stream.logError(`Rollback failed: ${err}`);
} finally {
DB_CONNECTION = null;
}
}
}

onPrepareRetry()

Called before a retry attempt. Use to reset state for the next attempt.

export function onPrepareRetry() {
if (DB_CONNECTION) {
try {
DB_CONNECTION.rollbackTransaction();
DB_CONNECTION.closeConnection();
} catch (err) {
// Ignore cleanup errors
} finally {
DB_CONNECTION = null;
}
}
}

onPullMessage()

Called when a downstream processor signals readiness for more messages. Use this when your processor produces messages (e.g., from a queue or buffer) rather than just transforming incoming ones.

When to use: If your processor accumulates messages and emits them later (aggregation, correlation, queue reading), implement onPullMessage to emit one message at a time in response to downstream demand. This prevents memory spikes and backpressure issues.

let messageQueue = [];

export function onMessage() {
// Accumulate messages instead of emitting immediately
messageQueue.push(message.clone());
}

export function onStreamEnd() {
streamComplete = true;
}

export function onPullMessage() {
// Emit one message when downstream is ready
if (messageQueue.length > 0) {
const next = messageQueue.shift();
stream.emit(next, OUTPUT_PORT);
} else if (streamComplete) {
// Queue empty and stream done — clean up
closeQueueConnection();
}
}
Push vs Pull

In most cases, layline.io handles flow control automatically. You only need onPullMessage when your processor acts as a producer that generates messages independently of direct input (e.g., reading from a queue, correlating batches).


Complete Example

A processor that validates orders, accumulates details, and writes a summary at stream end:

let OUTPUT_PORT;
let ERROR_PORT;

let orderTotal = 0;
let itemCount = 0;
let errorCount = 0;

export function onInit() {
OUTPUT_PORT = processor.getOutputPort('Output');
ERROR_PORT = processor.getOutputPort('Error');
}

export function onStreamStart() {
orderTotal = 0;
itemCount = 0;
errorCount = 0;
}

export function onMessage() {
if (message.typeName === 'Detail') {
const qty = message.getInt(dataDictionary.type.Order.Detail.QUANTITY);
const price = message.getDecimal(dataDictionary.type.Order.Detail.PRICE);

if (qty > 0 && price.compareTo(new BigDecimal("0")) > 0) {
orderTotal += price.multiply(new BigDecimal(qty)).doubleValue();
itemCount++;
stream.emit(message, OUTPUT_PORT);
} else {
errorCount++;
message.addStatus(Severity.ERROR, Status.create(VENDOR, 'INVALID_ITEM'));
stream.emit(message, ERROR_PORT);
}
} else {
// Header and Trailer pass through
stream.emit(message, OUTPUT_PORT);
}
}

export function onStreamEnd() {
stream.logInfo(`Stream summary: ${itemCount} items, total ${orderTotal}, ${errorCount} errors`);
}

export function onRollback() {
stream.logWarning('Stream rolled back — discarding accumulated state');
}

See Also