Skip to main content
Version: 2.0.0

Class: JavaScriptProcessor

This isn't an actual class, but a placeholder for hooks which are available within a Javascript Asset. Hooks are automatically invoked as part of a Javascript Assets normal lifecycle. Read more about JS-lifecycle in the introduction.

Methods

onCommit()

onCommit(): void

Invoked when a Stream is committed. Use this to perform potential final tasks when a stream ends.

Returns

void

Example

export function onCommit() {
if (connection) {
connection.commitTransaction();
connection.closeConnection();
connection = null;
}
}

onInit()

onInit(): void

onInit is invoked upon instantiation of the Javascript Asset. Use this method to perform any initialization actions, e.g. acquiring a database connection, initializing data structures which are used within the script, etc. Note, that this method is only invoked one upon startup of the Project.

export function onInit() {
OUTPUT_PORT = processor.getOutputPort('Output');
}

Returns

void


onMessage()

onMessage(): void

This is one of the most important methods which you will use every time within a Javascript Asset. layline.io is a reactive messaging system, meaning a script within a Javascript Asset is triggered by the delivery of a message to this Javascript Asset. You can consider the onMessage method as a starting point for processing within Javascript Asset.

Returns

void

Example


// Get the output port
const OUTPUT_PORT = processor.getOutputPort('MyOutput');

export function onMessage() {
if (message.typeName === 'Header') {
// do nothing
} else if (message.typeName === 'Trailer') {
// do something with the trailer
} else if (message.typeName === 'Detail') {
// invoke a self-defined function which handles the message.
handleDetail(message);
}

stream.emit(message, OUTPUT_PORT);
}

function handleDetail(detail) {
// do something with the message
}


onPrepareCommit()

onPrepareCommit(): void

Invoked before a Stream is finally committed. Use this method to do any preparatory work before a Stream is finally committed.

Returns

void

Example

export function onPrepareCommit() {
// Invoke custom function to write errors which we gathered during stream processing
WriteAllRejectErrors();
}

functionWriteAllRejectErrors () {
// ...
}

onPrepareRetry()

onPrepareRetry(): void

Invoked when a "prepare-retry" signal is emitted by layline.io.

Returns

void

Example

export function onPrepareRetry() {
if (connection) {
try {
connection.rollbackTransaction();
connection.closeConnection();
} catch (err) {
} finally {
connection = null;
}
}
}

onPullMessage()

onPullMessage(): void

layline.io is a reactive system and works according to the principle of “dynamic push / pull mode”. This means, that in a network of Processors, each Processor can signal to connected Processors that it wants to push, or pull messages, thus managing smooth message flow without the risk of clogging.

  • Push-mode: the downstream processor (“Consumer”) consumes messages at the same or even a faster rate than the source processor (“Producer”) produces the messages (= Slow Producer, fast Consumer)
  • Pull-mode: the source processor produces messages faster than a downstream processor can consume them (= Fast Producer, slow Consumer)

Using layline.io you usually do not have to think about “push”- or “pull”-mode and how it is applied throughout the Workflow processing. It is all built-in! Making use of the onMessage method will ensure that your JavaScript processor is receiving available messages at an applicable signaling rate.

Only in case your JavaScript processor includes logic to become a “producer” of (additional) messages, there is a need to explicitly implement the onPullMessage method making sure to receive the signals for readiness to receive next messages from connected downstream Processors within your Workflow.

Practical example:

A Workflow is processing Streams. Each Stream has 100,000 messages where two records each need to be correlated to form a new message to then be sent downstream, resulting in a total of 50,000 downstream messages. You cannot correlate them, however until all 100,000 messages have been received. This implies, that no messages leave Processor A, until the Stream has been completely received (e.g. marked by an ending message) and all messages have been correlated. (To not store all of them in memory, Processor A may use a queue Service for storage. See example below.) During that phase Processor B is idling.

Normally, you would say that once the last message has been received, we can instantly correlate all messages to then send the resulting 50,000 messages downstream to Processor B all in one go. A sudden spike of a wave of these messages is not economical for a reactive system and may take a toll on memory consumption and performance. To avoid this, instead of simply emitting messages when Processor A is ready, you can wait for Processor B to be ready and then send one message at a time until all messages have been emitted.

This is what onPullMessage allows you to implement.

Returns

void

Example

// Invoked each a downstream Processor is ready for the next message.
export function onPullMessage() {
let message = null;
let emitted = false;
if (streamComplete) { // Stream was fully received
message = queue.ReadMessage(); // Read one message
if (message) {
stream.emit(message, MY_OUTPUT_PORT); // emit the message
}
}

if (!message) {
queue.closeConnection();
queue = null;
}
}

NOTE: In case you have two or more downstream Processors connected to the current Javascript processor, you are unable to tell which of the downstream Processors is ready for the next message. This should be of no concern. You can simply send the next message out to the correct Processor. The system will behave in a balanced manner following standard reactive rules.


onRollback()

onRollback(): void

Invoked when a rollback signal is issued by the system. Perform and "undo" and cleanup tasks here.

Returns

void

Example

export function onRollback() {
if (connection) {
try {
connection.rollbackTransaction();
connection.closeConnection();
} catch (err) {
} finally {
connection = null;
}
}
}

onStreamEnd()

onStreamEnd(): void

Invoked when current stream ends. Use this to run potential clean up tasks.

Returns

void

Example

export function onStreamEnd() {
// Report in case some customer data could not be found during stream processing
if (numCustomerDataNotFound > 0) {
stream.logInfo(numCustomerDataNotFound + ' customers could not be found in the database.')
}
}

onStreamStart()

onStreamStart(): void

Invoked when current stream is starting. Use this to run potential stream startup initialization tasks on every new Stream.

Returns

void

Example

export function onStreamStart() {
streamId = stream.getId();
fileName = stream.getName();
}