Skip to main content
Version: 1.2.x

Class: Stream

stream represents the stream which is being processed at the time. It does not require instantiation and is available for access in any script-based Processor, e.g. the Javascript Processor. It provides a number of functions which can - and sometimes have to - be invoked to control stream processing.

Methods

emit()

emit(message, outputPort): void

Emit a message to the specified output port. Once emitted, the context of the message is lost. If you need to emit a message to another output port, you need to clone the original message, first, and then emit the clone to that other output port.

Parameters

message: Message

outputPort: OutputPort

Returns

void

void

Example

OUTPUT_PORT = processor.getOutputPort('Output');
stream.emit(message, OUTPUT_PORT);

getId()

getId(): string

Each Stream in layline.io has an ID which uniquely identifies a stream. Use this method to retrieve this unique ID.

Returns

string

Unique Stream ID

Example

const STREAM_ID = stream.getId();

getMetadata()

getMetadata(): Message

Retrieves metadata information from a stream. Returns the information in form of a Message. Depending on the type of stream, the message contains differing information. For example a file-based stream will return other data than a Kafka stream.

Returns

Message

  • Returns a message which contains the stream-type specific data.

Stream-type specific message return content:

File Stream:

PropertyTypeDescription
PathSystem.StringDirectory path from which the file was read
SizeSystem.LongFile size in bytes
LastModifiedSystem.DateTimeLast modified date and time

FTP Stream:

PropertyTypeDescription
PathSystem.StringDirectory path from which the file was read
SizeSystem.LongFile size in bytes
LastModifiedSystem.DateTimeLast modified date and time

HTTP Stream:

PropertyTypeDescription
BindAddressSystem.StringIP-Address
BindPortSystem.IntegerIP-Port number

Kafka (Exclusive Partition Stream):

PropertyTypeDescription
GroupIdSystem.StringConsumer group id
TopicsSystem.String[]Array of topics

Kafka (Standard Stream):

PropertyTypeDescription
GroupIdSystem.StringConsumer group id
TopicSystem.StringTopic name of the exclusive partition
PartitionSystem.IntegerPartition number

AWS S3 Service Source Stream:

PropertyTypeDescription
PathSystem.StringS3 path
SizeSystem.LongS3 object size
StorageClassSystem.StringS3 storage class
LastModifiedSystem.DateTimeLast modified date and time

AWS SQS Source Stream:

PropertyTypeDescription
QueueUrlSystem.String

Serial Source Stream:

PropertyTypeDescription
PortSystem.StringPort name
BaudRateSystem.IntegerBaud rate
DataBitsSystem.Integer
StopBitsSystem.String
ParitySystem.String
FlowControlSystem.String

Secondary Stream:

PropertyTypeDescription
ParentStreamNameSystem.StringName of the originating stream (parent)
ParentStreamIdSystem.StringId of the originating stream (parent)

Service Source Stream:

PropertyTypeDescription
ServiceSystem.StringService name

TCP Source Stream:

PropertyTypeDescription
LocalAddressSystem.String
LocalPortSystem.Integer
RemoteAddressSystem.String
RemotePortSystem.Integer

Example

// Get the metadata for the stream (a Kafka stream in our example):
const msgMetadata = stream.getMetadata();
// Result:
// msgMetadata.data = {
// GroupId: "MyConsumerGroup",
// Topic: "mytopic",
// Partition: "0"
// }
}

getName()

getName(): string

Returns the name of a stream.

For file bases processing the stream name is the name of the file being processed. For other sources, the stream name is usually explicitly set by you in the respective Source Asset.

For example if your data stems from a Service Source Asset, then the stream name is defined in the configuration of that Asset like so: ${msg:IoT.StreamName}-${date:yyyy-MM-dd-HH-mm-ss} which will result in DeviceX-2022-10-10-21-45-33.

Use this method to retrieve this unique ID.

Returns

string

Stream name

Example

const STREAM_NAME = stream.getName();

logError()

logError(msg): void

Logs a message with Severity.ERROR to the stream log. You can view this both via the Audit Trail in the UI and output in the process terminal output.

Parameters

msg: string

Information you want to log.

Returns

void

Example

stream.logError('Ran into the following problem: ' + problem);

logFatal()

logFatal(msg): void

Logs a message with Severity.FATAL to the stream log. You can view this both via the Audit Trail in the UI and output in the process terminal output.

Parameters

msg: string

Information you want to log.

Returns

void

Example

stream.logFatal('Ran into the following problem: ' + problem);

logInfo()

logInfo(msg): void

Logs a message with Severity.INFO to the stream log. You can view this both via the Audit Trail in the UI and output in the process terminal output.

Parameters

msg: string

Information you want to log.

Returns

void

Example

stream.logInfo('Here is some interesting information: ' + info);

logWarning()

logWarning(msg): void

Logs a message with Severity.WARNING to the stream log. You can view this both via the Audit Trail in the UI and output in the process terminal output.

Parameters

msg: string

Information you want to log.

Returns

void

Example

stream.logWarning('Here is a warning: ' + warning);

requestRetry()

requestRetry(reason, ms?): void

In case you encounter an error in a stream and have caught it, then you can request for the stream to be retried after a defined number of milliseconds.

Parameters

reason: Status

Status which should be attached to the retry signal. The Status should describe the reason for the retry.

ms?: number

Retry timeout in milliseconds. Default is 60 seconds.

Returns

void

Example

// Request retry after 30 seconds
stream.requestRetry(Status.create(VENDOR, 'DB_NOT_AVAILABLE'), 30000);

requestRollback()

requestRollback(status): void

Requests to roll back the currently processed stream.

To roll back a stream, you may issue a requestRollback to signal to layline.io that you want this stream rolled back and provide a Status which describes the reason for the rollback.

Parameters

status: Status

Status which should be attached to the rollback signal. The Status should describe the reason for the rollback.

Returns

void

Example

// Request a rollback and add a status which (example) adds a Status of your type 'SEQ_NUMBER_UNKNOWN' and adds the file name
...
let fileInfo = {
sequence: message.data.IOT.SEQ_NO;
}
...
stream.requestRollback(Status.create(VENDOR, 'SEQ_NUMBER_UNKNOWN', fileInfo.sequence));

setDoneName()

setDoneName(name): internal

Set the name of a stream that is sent to the done path. In case your output is a file, this would then be the file's name.

// Set stream done name:
stream.setDoneName('New-Done-Stream-Name');

// Functions which return a Stream can be chained like this:
stream.setDoneName('New-Stream-Name').setDonePath('/my/specific/path')

Parameters

name: string

New stream name

Returns

internal

Stream: The manipulated stream.


setDonePath()

setDonePath(path): internal

Set the done path of a stream that is sent to the done path. In case your data is written to a file, this would then be the path to which the file is written.

// Set stream done path:
stream.setDonePath('New-Done-Stream-Path');

// Functions which return a Stream can be chained like this:
stream.setDoneName('New-Stream-Name').setDonePath('/my/specific/path')

Parameters

path: string

New path name

Returns

internal

Stream: The manipulated stream.


setErrorName()

setErrorName(name): internal

Set the name of a stream that is sent to the error path. In case your output is a file, this would then be the file's name.

// Set stream error name:
stream.setErrorName('New-Error-Stream-Name');

// Functions which return a Stream can be chained like this:
stream.setErrorName('New-Stream-Name').setErrorPath('/my/specific/path')

Parameters

name: string

New stream name

Returns

internal

Stream: The manipulated stream.


setErrorPath()

setErrorPath(path): internal

Set the error path of a stream that is sent to the done path. In case your data is written to a file, this would then be the path to which the file is written.

// Set stream error path:
stream.setErrorPath('New-Error-Stream-Path');

// Functions which return a Stream can be chained like this:
stream.setErrorName('New-Stream-Name').setErrorPath('/my/specific/path')

Parameters

path: string

New path name

Returns

internal

Stream: The manipulated stream.


setOutputName()

setOutputName(name): internal

Set the name of a stream that is sent to the output path. In case your output is a file, this would then be the file's name.

// Set stream output name:
stream.setOutputName('New-Output-Stream-Name');

// Functions which return a Stream can be chained like this:
stream.setOutputName('New-Stream-Name').setOutputPath('/my/specific/path')

Parameters

name: string

New stream name

Returns

internal

Stream: The manipulated stream.


setOutputPath()

setOutputPath(path): internal

Set the output path of a stream that is sent to the normal output channel. In case your data is written to a file, this would then be the path to which the file is written.

// Set stream output path:
stream.setOutputPath('New-Output-Stream-Path');

// Functions which return a Stream can be chained like this:
stream.setOutputName('New-Stream-Name').setOutputPath('/my/specific/path')

Parameters

path: string

New path name

Returns

internal

Stream: The manipulated stream.


setProcessorOutputName()

setProcessorOutputName(outputProcessorName, streamName): internal

Set the name of the output stream for a specific Processor. Use this for example, if you have a processor with multiple output ports, and need to send messages to either of these ports, but also determine what the name of the streams should be on either of these ports.

This is used to change the name of the output stream. E.g. if a file is written, this will be the filename in output.

Option 1:
// Set stream output names:
stream.setProcessorOutputName('NameOfOutputProcessor', 'MyNewStreamName');

// or Option 2:
const OUTPUT_PORT = getOutputPort('NameOfOutputPort');
stream.setProcessorOutputName(OUTPUT_PORT.getPeerProcessorName(), 'MyNewStreamName');
// Options 2 only works if the processor connected to OUTPUT_PORT is the actual Output-Processor.

// Functions which return a Stream can be chained like this:
stream.setProcessorOutputName('New-Stream-Name').setProcessorOutputPath('/my/specific/path')

Parameters

outputProcessorName: string

Name of the output processor at the end of the workflow.

streamName: string

New name the stream should have when data is sent out to the receiving processor.

Returns

internal


setProcessorOutputPath()

setProcessorOutputPath(outputProcessorName, path): internal

Set the path of the output stream for a specific Processor. Use this for example, if you have a processor with multiple output ports, and need to send messages to either of these ports, but also determine what the path of the streams should be on either of these ports.

This is used to change the path of the output stream. E.g. if a file is written, this will be written to the specified path.

Option 1:
// Set stream path name:
stream.setProcessorPathName('NameOfOutputProcessor', '/my/specific/output/path');

// or Option 2:
const OUTPUT_PORT = getOutputPort('NameOfOutputPort');
stream.setProcessorOutputName(OUTPUT_PORT.getPeerProcessorName(), '/my/specific/output/path');
// Options 2 only works if the processor connected to OUTPUT_PORT is the actual Output-Processor.

// Functions which return a Stream can be chained like this:
stream.setProcessorOutputName('New-Stream-Name').setProcessorOutputPath('/my/specific/path')

Parameters

outputProcessorName: string

Name of the output processor at the end of the workflow.

path: string

New name the stream should have when data is sent out to the receiving processor.

Returns

internal