# Stream

# stream

stream represents the stream which is being processed at the time. It does not require instantiation and is available for access in any script-based Processor, e.g. the Javascript Processor. It provides a number of functions which can - and sometimes have to - be invoked to control stream processing.

Kind: global class

# emit

Emit a message to the specified output port. Once emitted, the context of the message is lost. If you need to emit a message to another output port, you need to clone the original message, first, and then emit the clone to that other output port.

Example:

OUTPUT_PORT = processor.getOutputPort('Output');
stream.emit(message, OUTPUT_PORT);

Kind: instance method of stream

Param
outputPort
message

# getId

Each Stream in layline.io has an ID which uniquely identifies a stream. Use this method to retrieve this unique ID.

Kind: instance method of stream
Returns: string - Unique Stream ID

Example:

const STREAM_ID = stream.getId();

# getMetadata

Retrieves metadata information from a stream. Returns the information in form of a Message. Depending on the type of stream, the message contains differing information. For example a file-based stream will return other data than a Kafka stream.

Kind: instance method of stream
Returns: Message - - Returns a message which contains the stream-type specific data.

Stream-type specific message return content:

File Stream:

Property Type Description
Path System.String Directory path from which the file was read
Size System.Long File size in bytes
LastModified System.DateTime Last modified date and time

FTP Stream:

Property Type Description
Path System.String Directory path from which the file was read
Size System.Long File size in bytes
LastModified System.DateTime Last modified date and time

HTTP Stream:

Property Type Description
BindAddress System.String IP-Address
BindPort System.Integer IP-Port number

Kafka (Exclusive Partition Stream):

Property Type Description
GroupId System.String Consumer group id
Topics System.String[] Array of topics

Kafka (Standard Stream):

Property Type Description
GroupId System.String Consumer group id
Topic System.String Topic name of the exclusive partition
Partition System.Integer Partition number

AWS S3 Service Source Stream:

Property Type Description
Path System.String S3 path
Size System.Long S3 object size
StorageClass System.String S3 storage class
LastModified System.DateTime Last modified date and time

AWS SQS Source Stream:

Property Type Description
QueueUrl System.String

Serial Source Stream:

Property Type Description
Port System.String Port name
BaudRate System.Integer Baud rate
DataBits System.Integer
StopBits System.String
Parity System.String
FlowControl System.String

Secondary Stream:

Property Type Description
ParentStreamName System.String Name of the originating stream (parent)
ParentStreamId System.String Id of the originating stream (parent)

Service Source Stream:

Property Type Description
Service System.String Service name

TCP Source Stream:

Property Type Description
LocalAddress System.String
LocalPort System.Integer
RemoteAddress System.String
RemotePort System.Integer

Example:

// Get the metadata for the stream (a Kafka stream in our example):
const msgMetadata = stream.getMetadata();
// Result:
// msgMetadata.data = {
//   GroupId: "MyConsumerGroup",
//   Topic: "mytopic",
//   Partition: "0"
// }
}

# getName

Returns the name of a stream.

For file bases processing the stream name is the name of the file being processed. For other sources, the stream name is usually explicitly set by you in the respective Source Asset.

For example if your data stems from a Service Source Asset, then the stream name is defined in the configuration of that Asset like so: ${msg:IoT.StreamName}-${date:yyyy-MM-dd-HH-mm-ss} which will result in DeviceX-2022-10-10-21-45-33.

Use this method to retrieve this unique ID.

Example:

const STREAM_NAME = stream.getName();

Kind: instance method of stream
Returns: string - Stream name

# logError

Logs a message with Severity.ERROR to the stream log. You can view this both via the Audit Trail in the UI and output in the process terminal output.

Example:

stream.logError('Ran into the following problem: ' + problem);

Kind: instance method of stream

Param Description
msg Information you want to log.

# logFatal

Logs a message with Severity.FATAL to the stream log. You can view this both via the Audit Trail in the UI and output in the process terminal output.

Example:

stream.logFatal('Ran into the following problem: ' + problem);

Kind: instance method of stream

Param Description
msg Information you want to log.

# logInfo

Logs a message with Severity.INFO to the stream log. You can view this both via the Audit Trail in the UI and output in the process terminal output.

Example:

stream.logInfo('Here is some interesting information: ' + info);

Kind: instance method of stream

Param Description
msg Information you want to log.

# logWarning

Logs a message with Severity.WARNING to the stream log. You can view this both via the Audit Trail in the UI and output in the process terminal output.

Example:

stream.logWarning('Here is a warning: ' + warning);

Kind: instance method of stream

Param Description
msg Information you want to log.

# requestRetry

In case you encounter an error in a stream and have caught it, then you can request for the stream to be retried after a defined number of milliseconds.

Example:

// Request retry after 30 seconds
stream.requestRetry(30000);

Kind: instance method of stream

Param Type Default Description
[ms] number 60000 Retry timeout in milliseconds. Default is 60 seconds.

# requestRollback

Requests to roll back the currently processed stream.

To roll back a stream, you may issue a requestRollback to signal to layline.io that you want this stream rolled back and provide a Status which describes the reason for the rollback.

Example:

// Request a rollback and add a status which (example) adds a Status of your type 'SEQ_NUMBER_UNKNOWN' and adds the file name
...
let fileInfo = {
    sequence: message.data.IOT.SEQ_NO;
}
...
stream.requestRollback(Status.create(VENDOR, 'SEQ_NUMBER_UNKNOWN', fileInfo.sequence));

Kind: instance method of stream

Param Type Description
status Status Status which should be attached to the rollback signal. The Status should describe the reason for the rollback.

# setOutputName

Set the output name of a stream. In case your output is a file, this would then be the file's name.

// Set stream output name:
stream.setOutputName('New-Output-Stream-Name');

Kind: instance method of stream

Param Type Description
name string New stream name

# setProcessorOutputName

Set the name of the output stream for a specific Processor. Use this for example, if you have a processor with multiple output ports, and need to send messages to either of these ports, but also determine what the name of the streams should be on either of these ports.

This is used to change the name of the output stream. E.g. if a file is written, this will be the filename in output.

Option 1:
// Set stream output names:
stream.setProcessorOutputName('NameOfOutputProcessor', 'MyNewStreamName');

// or Option 2:
const OUTPUT_PORT = getOutputPort('NameOfOutputPort');
stream.setProcessorOutputName(OUTPUT_PORT.getPeerProcessorName(), 'MyNewStreamName');
// Options 2 only works if the processor connected to OUTPUT_PORT is the actual Output-Processor.

Kind: instance method of stream

Param Type Description
outputProcessorName string Name of the output processor at the end of the workflow.
streamName string New name the stream should have when data is sent out to the receiving processor.
Last Updated: 2/9/2023, 11:21:31 AM