Skip to main content

Stream


id: py-Stream

Stream

The Stream class represents the current data stream being processed. It is available globally as stream in every Python processor.

Use it to emit messages to output ports, log stream events, access stream metadata, and control stream lifecycle (retries, rollbacks).


At a Glance

def on_message():
# Log stream context
stream.log_info(f"Processing {stream.name} (id: {stream.id})")

# Emit message to an output port
stream.emit(message, OUTPUT_PORT)

Properties

PropertyTypeDescription
idstrUnique stream identifier (UUID v4)
namestrStream name (filename for file sources, or configured name)
originalNamestrOriginal name including prefix and suffix
pathstrPath associated with the stream
prefixstrDetected filename prefix
suffixstrDetected filename suffix

id

A UUID v4 that uniquely identifies this stream across the system. Used in audit trails and logs.

stream_id = stream.id # "550e8400-e29b-41d4-a716-446655440000"

name

The stream name. For file-based sources, this is the filename without prefix and suffix. For other sources, it's the name configured in the Source Asset.

# File source: "my_data.csv" (prefix/suffix stripped)
# Service source: "DeviceX-2022-10-10-21-45-33"
name = stream.name

originalName

The full original name including any configured prefix and suffix.

# If prefix="IN_" and suffix=".csv", originalName is "IN_my_data.csv"
original = stream.originalName

path

The path associated with the stream (directory for file sources, URL path for HTTP, etc.).

path = stream.path # e.g., "/incoming/orders"

prefix

The detected prefix from the original filename.

prefix = stream.prefix # e.g., "IN_"

suffix

The detected suffix (extension) from the original filename.

suffix = stream.suffix # e.g., ".csv"

Message Flow

emit(message, outputPort)

Sends a message to the specified output port. Once emitted, the message context is lost — if you need to send the same message to multiple ports, clone it first.

ParameterTypeDescription
messageMessageThe message to emit
outputPortOutputPortThe target output port
def on_init():
global OUTPUT_PORT
OUTPUT_PORT = processor.getOutputPort('Output')

def on_message():
stream.emit(message, OUTPUT_PORT)

Logging

Log messages at different severity levels. These appear in the Audit Trail and processor terminal output.

MethodSeverityUse For
logInfo(msg)INFOGeneral information, progress
logWarning(msg)WARNINGNon-critical issues
logError(msg)ERRORProcessing errors
logFatal(msg)FATALCritical failures
stream.log_info(f"Processing record {message.id}")
stream.log_warning(f"Unexpected format: {message.typeName}")
stream.log_error(f"Failed to parse: {error}")

Stream Metadata

getMetadata()

Returns stream-type-specific metadata as a Message. The structure varies by source type.

Returns: Message

meta = stream.getMetadata()

# File source
stream.log_info(f"File size: {meta.data.Size} bytes")
stream.log_info(f"Last modified: {meta.data.LastModified}")

# Kafka source
stream.log_info(f"Topic: {meta.data.Topic}, Partition: {meta.data.Partition}")

Metadata by Source Type

Source TypePropertiesDescription
File / FTP / S3 / OneDrive / SharePointPath, Size, LastModified, FolderSetupFile path, size in bytes, modification timestamp, folder config name
HTTPBindAddress, BindPortServer IP and port
Kafka (Exclusive Partition)GroupId, TopicsConsumer group and topic array
Kafka (Standard)GroupId, Topic, PartitionConsumer group, single topic, partition number
SQSQueueUrlQueue URL
SerialPort, BaudRate, DataBits, StopBits, Parity, FlowControlSerial port configuration
Secondary StreamParentStreamName, ParentStreamIdOriginating stream reference
Service SourceServiceService name
TCP / WebSocketLocalAddress, LocalPort, RemoteAddress, RemotePortConnection endpoints

Stream Lifecycle Control

requestRetry(reason, ms?)

Requests that the stream be retried after a delay. Use when encountering transient errors (e.g., database unavailable).

ParameterTypeDescription
reasonStatusStatus describing why the retry is needed
msint (optional)Delay in milliseconds (default: 60000)
if not database_available:
stream.requestRetry(
Status.create(VENDOR, 'DB_UNAVAILABLE'),
30000 # Retry after 30 seconds
)

requestRollback(status)

Requests that the current stream be rolled back. Use for unrecoverable errors where reprocessing from the start is needed.

ParameterTypeDescription
statusStatusStatus describing the rollback reason
if sequence_gap_detected:
stream.requestRollback(
Status.create(VENDOR, 'SEQ_NUMBER_UNKNOWN', expected_seq)
)

Output Naming

Control the name and path of output streams. These are particularly useful for file-based sinks where you want to dynamically set filenames and directories.

All setter methods return Stream for chaining.

MethodDescriptionExample
setOutputName(name)Name for normal outputstream.setOutputName('orders.csv')
setOutputPath(path)Path for normal outputstream.setOutputPath('/processed')
setDoneName(name)Name for done pathstream.setDoneName('orders.done')
setDonePath(path)Path for done pathstream.setDonePath('/archive')
setErrorName(name)Name for error pathstream.setErrorName('orders.err')
setErrorPath(path)Path for error pathstream.setErrorPath('/failed')
# Chain multiple setters
stream \
.setOutputName(f"processed-{stream.name}") \
.setOutputPath('/outgoing/success') \
.setErrorName(f"failed-{stream.name}") \
.setErrorPath('/outgoing/errors')

setProcessorOutputName(processorName, streamName)

Sets the output stream name for a specific downstream processor. Use when a processor has multiple output ports and you need different names per port.

ParameterTypeDescription
processorNamestrName of the target output processor
streamNamestrName to assign to the stream
# Option 1: By processor name
stream.setProcessorOutputName('File-Writer-A', 'valid-orders.csv')
stream.setProcessorOutputName('File-Writer-B', 'invalid-orders.csv')

# Option 2: Via output port (must connect to actual output processor)
port_a = processor.getOutputPort('Valid')
stream.setProcessorOutputName(port_a.getPeerProcessorName(), 'valid.csv')

setProcessorOutputPath(processorName, path)

Sets the output path for a specific downstream processor.

ParameterTypeDescription
processorNamestrName of the target output processor
pathstrDirectory path
stream.setProcessorOutputPath('File-Writer', '/data/processed')

Complete Example

A file-processing stream that routes valid records to one output and errors to another, with dynamic naming:

OUTPUT_PORT = None
ERROR_PORT = None
VALID_COUNT = 0
ERROR_COUNT = 0

def on_init():
global OUTPUT_PORT, ERROR_PORT
OUTPUT_PORT = processor.getOutputPort('Output')
ERROR_PORT = processor.getOutputPort('Error')

def on_stream_start():
global VALID_COUNT, ERROR_COUNT
VALID_COUNT = 0
ERROR_COUNT = 0

# Set dynamic output filenames based on input
import re
base_name = re.sub(r'\.csv$', '', stream.name, flags=re.IGNORECASE)
stream \
.setOutputName(f"{base_name}-processed.csv") \
.setErrorName(f"{base_name}-errors.csv")

# Log stream metadata
meta = stream.getMetadata()
stream.log_info(f"Processing {stream.originalName} ({meta.data.Size} bytes)")

def on_message():
global VALID_COUNT, ERROR_COUNT
if is_valid(message):
VALID_COUNT += 1
stream.emit(message, OUTPUT_PORT)
else:
ERROR_COUNT += 1
message.addStatus(Severity.ERROR, Status.create(VENDOR, 'VALIDATION_FAILED'))
stream.emit(message, ERROR_PORT)

def on_stream_end():
stream.log_info(f"Complete: {VALID_COUNT} valid, {ERROR_COUNT} errors")

def is_valid(msg):
return msg.exists(dataDictionary.type.Record.ID) and \
len(msg.getString(dataDictionary.type.Record.ID)) > 0

See Also

  • Message — The messages you emit through the stream
  • OutputPort — The ports you emit to
  • Processor — Get output ports and access configuration
  • Status — Create status objects for retries and rollbacks