Skip to main content

Stream

The Stream class represents the current data stream being processed. It is available globally as stream in every JavaScript processor.

Use it to emit messages to output ports, log stream events, access stream metadata, and control stream lifecycle (retries, rollbacks).


At a Glance

export function onMessage() {
// Log stream context
stream.logInfo(`Processing ${stream.name} (id: ${stream.id})`);

// Emit message to an output port
stream.emit(message, OUTPUT_PORT);
}

Properties

PropertyTypeDescription
idstringUnique stream identifier (UUID v4)
namestringStream name (filename for file sources, or configured name)
originalNamestringOriginal name including prefix and suffix
pathstringPath associated with the stream
prefixstringDetected filename prefix
suffixstringDetected filename suffix

id

A UUID v4 that uniquely identifies this stream across the system. Used in audit trails and logs.

const streamId = stream.id; // "550e8400-e29b-41d4-a716-446655440000"

name

The stream name. For file-based sources, this is the filename without prefix and suffix. For other sources, it's the name configured in the Source Asset.

// File source: "my_data.csv" (prefix/suffix stripped)
// Service source: "DeviceX-2022-10-10-21-45-33"
const name = stream.name;

originalName

The full original name including any configured prefix and suffix.

// If prefix="IN_" and suffix=".csv", originalName is "IN_my_data.csv"
const original = stream.originalName;

path

The path associated with the stream (directory for file sources, URL path for HTTP, etc.).

const path = stream.path; // e.g., "/incoming/orders"

prefix

The detected prefix from the original filename.

const prefix = stream.prefix; // e.g., "IN_"

suffix

The detected suffix (extension) from the original filename.

const suffix = stream.suffix; // e.g., ".csv"

Message Flow

emit(message, outputPort)

Sends a message to the specified output port. Once emitted, the message context is lost — if you need to send the same message to multiple ports, clone it first.

ParameterTypeDescription
messageMessageThe message to emit
outputPortOutputPortThe target output port
export function onInit() {
OUTPUT_PORT = processor.getOutputPort('Output');
}

export function onMessage() {
stream.emit(message, OUTPUT_PORT);
}

Logging

Log messages at different severity levels. These appear in the Audit Trail and processor terminal output.

MethodSeverityUse For
logInfo(msg)INFOGeneral information, progress
logWarning(msg)WARNINGNon-critical issues
logError(msg)ERRORProcessing errors
logFatal(msg)FATALCritical failures
stream.logInfo(`Processing record ${message.id}`);
stream.logWarning(`Unexpected format: ${message.typeName}`);
stream.logError(`Failed to parse: ${error}`);

Stream Metadata

getMetadata()

Returns stream-type-specific metadata as a Message. The structure varies by source type.

Returns: Message

const meta = stream.getMetadata();

// File source
stream.logInfo(`File size: ${meta.data.Size} bytes`);
stream.logInfo(`Last modified: ${meta.data.LastModified}`);

// Kafka source
stream.logInfo(`Topic: ${meta.data.Topic}, Partition: ${meta.data.Partition}`);

Metadata by Source Type

Source TypePropertiesDescription
File / FTP / S3 / OneDrive / SharePointPath, Size, LastModified, FolderSetupFile path, size in bytes, modification timestamp, folder config name
HTTPBindAddress, BindPortServer IP and port
Kafka (Exclusive Partition)GroupId, TopicsConsumer group and topic array
Kafka (Standard)GroupId, Topic, PartitionConsumer group, single topic, partition number
SQSQueueUrlQueue URL
SerialPort, BaudRate, DataBits, StopBits, Parity, FlowControlSerial port configuration
Secondary StreamParentStreamName, ParentStreamIdOriginating stream reference
Service SourceServiceService name
TCP / WebSocketLocalAddress, LocalPort, RemoteAddress, RemotePortConnection endpoints

Stream Lifecycle Control

requestRetry(reason, ms?)

Requests that the stream be retried after a delay. Use when encountering transient errors (e.g., database unavailable).

ParameterTypeDescription
reasonStatusStatus describing why the retry is needed
msnumber (optional)Delay in milliseconds (default: 60000)
if (!databaseAvailable) {
stream.requestRetry(
Status.create(VENDOR, 'DB_UNAVAILABLE'),
30000 // Retry after 30 seconds
);
}

requestRollback(status)

Requests that the current stream be rolled back. Use for unrecoverable errors where reprocessing from the start is needed.

ParameterTypeDescription
statusStatusStatus describing the rollback reason
if (sequenceGapDetected) {
stream.requestRollback(
Status.create(VENDOR, 'SEQ_NUMBER_UNKNOWN', expectedSeq)
);
}

Output Naming

Control the name and path of output streams. These are particularly useful for file-based sinks where you want to dynamically set filenames and directories.

All setter methods return Stream for chaining.

MethodDescriptionExample
setOutputName(name)Name for normal outputstream.setOutputName('orders.csv')
setOutputPath(path)Path for normal outputstream.setOutputPath('/processed')
setDoneName(name)Name for done pathstream.setDoneName('orders.done')
setDonePath(path)Path for done pathstream.setDonePath('/archive')
setErrorName(name)Name for error pathstream.setErrorName('orders.err')
setErrorPath(path)Path for error pathstream.setErrorPath('/failed')
// Chain multiple setters
stream
.setOutputName(`processed-${stream.name}`)
.setOutputPath('/outgoing/success')
.setErrorName(`failed-${stream.name}`)
.setErrorPath('/outgoing/errors');

setProcessorOutputName(processorName, streamName)

Sets the output stream name for a specific downstream processor. Use when a processor has multiple output ports and you need different names per port.

ParameterTypeDescription
processorNamestringName of the target output processor
streamNamestringName to assign to the stream
// Option 1: By processor name
stream.setProcessorOutputName('File-Writer-A', 'valid-orders.csv');
stream.setProcessorOutputName('File-Writer-B', 'invalid-orders.csv');

// Option 2: Via output port (must connect to actual output processor)
const portA = processor.getOutputPort('Valid');
stream.setProcessorOutputName(portA.getPeerProcessorName(), 'valid.csv');

setProcessorOutputPath(processorName, path)

Sets the output path for a specific downstream processor.

ParameterTypeDescription
processorNamestringName of the target output processor
pathstringDirectory path
stream.setProcessorOutputPath('File-Writer', '/data/processed');

Complete Example

A file-processing stream that routes valid records to one output and errors to another, with dynamic naming:

let OUTPUT_PORT;
let ERROR_PORT;
let VALID_COUNT = 0;
let ERROR_COUNT = 0;

export function onInit() {
OUTPUT_PORT = processor.getOutputPort('Output');
ERROR_PORT = processor.getOutputPort('Error');
}

export function onStreamStart() {
VALID_COUNT = 0;
ERROR_COUNT = 0;

// Set dynamic output filenames based on input
const baseName = stream.name.replace(/\.csv$/i, '');
stream
.setOutputName(`${baseName}-processed.csv`)
.setErrorName(`${baseName}-errors.csv`);

// Log stream metadata
const meta = stream.getMetadata();
stream.logInfo(`Processing ${stream.originalName} (${meta.data.Size} bytes)`);
}

export function onMessage() {
if (isValid(message)) {
VALID_COUNT++;
stream.emit(message, OUTPUT_PORT);
} else {
ERROR_COUNT++;
message.addStatus(Severity.ERROR, Status.create(VENDOR, 'VALIDATION_FAILED'));
stream.emit(message, ERROR_PORT);
}
}

export function onStreamEnd() {
stream.logInfo(`Complete: ${VALID_COUNT} valid, ${ERROR_COUNT} errors`);
}

function isValid(msg) {
return msg.exists(dataDictionary.type.Record.ID) &&
msg.getString(dataDictionary.type.Record.ID).length > 0;
}

See Also

  • Message — The messages you emit through the stream
  • OutputPort — The ports you emit to
  • Processor — Get output ports and access configuration
  • Status — Create status objects for retries and rollbacks