Skip to main content
Version: 2.0.0

Class: Stream

The Stream class is the underlying class of the stream object. The stream instance represents the stream being processed at the time and is automatically available for access in any script-based Processor, e.g. the Python Processor. It provides a number of functions which can - and sometimes have to - be invoked to control stream processing. It is used to emit messages to output ports, request retries, rollbacks, and log messages, access the stream's metadata, and more.

Properties

id

readonly id: str

The ID of the stream is represented as a v4 UUID string. It is unique for each stream and automatically generated by layline.io. The ID is used to identify the stream in the system. It is used in the Audit Trail and in the logs.

Example

STREAM_ID = stream.id

name

readonly name: str

Returns the name of a stream.

For file-based processing, the stream name is the name of the file being processed. For other sources, you usually explicitly set the stream name in the respective Source Asset.

For example if your data stems from a Service Source Asset, then the stream name is defined in the configuration of that Asset like so: ${msg:IoT.StreamName}-${date:yyyy-MM-dd-HH-mm-ss} which will result in DeviceX-2022-10-10-21-45-33. If your data source is a file, then this would be the file name, e.g. my_file_name.csv.

Same as getName.

Example

STREAM_NAME = stream.name

Methods

emit()

emit(message: Message, output_port: OutputPort) -> None

Emit a message to the specified output port. Once emitted, the context of the message is lost. If you need to emit a message to another output port, you need to clone the original message first, and then emit the clone to that other output port.

Parameters

  • message: Message
  • output_port: OutputPort

Returns

None

Example

OUTPUT_PORT = processor.getOutputPort('Output')  # Get the output port by the name given in the UI
stream.emit(message, OUTPUT_PORT) # Emits the message to the output port

getId()

getId() -> str

Each Stream in layline.io has an ID which uniquely identifies a stream. Use this method to retrieve this unique ID.

Returns

str - Unique Stream ID

Example

STREAM_ID = stream.getId()

getMetadata()

getMetadata() -> Message

Retrieves metadata information from a stream. Returns the information in form of a Message. Depending on the type of stream, the message contains differing information. For example a file-based stream will return other data than a Kafka stream.

Returns

Message - Returns a message which contains the stream-type specific data.

Stream-type specific message return content:

[The content remains the same as in the original documentation]

Example

# Get the metadata for the stream (a Kafka stream in our example):
msg_metadata = stream.getMetadata()
# Result:
# msg_metadata.data = {
# "GroupId": "MyConsumerGroup",
# "Topic": "mytopic",
# "Partition": "0"
# }

getName()

getName() -> str

Returns the name of a stream.

[The description remains the same as in the original documentation]

Returns

str - Stream name, e.g. my_file_name.csv

Example

STREAM_NAME = stream.getName()

logError()

logError(msg: str) -> None

Logs a message with Severity.ERROR to the stream log. You can view this both via the Audit Trail in the UI and output in the process terminal output.

Parameters

  • msg: str - Information you want to log.

Returns

None

Example

stream.logError(f'Ran into the following problem: {problem}')

logFatal()

logFatal(msg: str) -> None

Logs a message with Severity.FATAL to the stream log. You can view this both via the Audit Trail in the UI and output in the process terminal output.

Parameters

  • msg: str - Information you want to log.

Returns

None

Example

stream.logFatal(f'Ran into the following problem: {problem}')

logInfo()

logInfo(msg: str) -> None

Logs a message with Severity.INFO to the stream log. You can view this both via the Audit Trail in the UI and output in the process terminal output.

Parameters

  • msg: str - Information you want to log.

Returns

None

Example

stream.logInfo(f'Here is some interesting information: {info}')

logWarning()

logWarning(msg: str) -> None

Logs a message with Severity.WARNING to the stream log. You can view this both via the Audit Trail in the UI and output in the process terminal output.

Parameters

  • msg: str - Information you want to log.

Returns

None

Example

stream.logWarning(f'Here is a warning: {warning}')

requestRetry()

requestRetry(reason: Status, ms: int = 60000) -> None

In case you encounter an error in a stream and have caught it, then you can request for the stream to be retried after a defined number of milliseconds.

Parameters

  • reason: Status - Status which should be attached to the retry signal. The Status should describe the reason for the retry.
  • ms: int, optional - Retry timeout in milliseconds. Default is 60000 (60 seconds).

Returns

None

Example

# Request retry after 30 seconds
stream.requestRetry(Status.create(VENDOR, 'DB_NOT_AVAILABLE'), 30000)

requestRollback()

requestRollback(status: Status) -> None

Requests to roll back the currently processed stream.

To roll back a stream, you may issue a requestRollback to signal to layline.io that you want this stream rolled back and provide a Status which describes the reason for the rollback.

Parameters

  • status: Status - Status which should be attached to the rollback signal. The Status should describe the reason for the rollback.

Returns

None

Example

# Request a rollback and add a status which (example) adds a Status of your type 'SEQ_NUMBER_UNKNOWN' and adds the file name
file_info = {
'sequence': message.data.IOT.SEQ_NO
}
stream.requestRollback(Status.create(VENDOR, 'SEQ_NUMBER_UNKNOWN', file_info['sequence']))

setDoneName()

setDoneName(name: str) -> Stream

Set the name of a stream that is sent to the done path. In case your output is a file, this would then be the file's name.

Parameters

  • name: str - New stream name

Returns

Stream - The manipulated stream.

Example

# Set stream done name:
stream.setDoneName('New-Done-Stream-Name')

# Functions which return a Stream can be chained like this:
stream.setDoneName('New-Stream-Name').setDonePath('/my/specific/path')

setDonePath()

setDonePath(path: str) -> Stream

Set the done path of a stream that is sent to the done path. In case your data is written to a file, this would then be the path to which the file is written.

Parameters

  • path: str - New path name

Returns

Stream - The manipulated stream.

Example

# Set stream done path:
stream.setDonePath('New-Done-Stream-Path')

# Functions which return a Stream can be chained like this:
stream.setDoneName('New-Stream-Name').setDonePath('/my/specific/path')

setErrorName()

setErrorName(name: str) -> Stream

Set the name of a stream that is sent to the error path. In case your output is a file, this would then be the file's name.

Parameters

  • name: str - New stream name

Returns

Stream - The manipulated stream.

Example

# Set stream error name:
stream.setErrorName('New-Error-Stream-Name')

# Functions which return a Stream can be chained like this:
stream.setErrorName('New-Stream-Name').setErrorPath('/my/specific/path')

setErrorPath()

setErrorPath(path: str) -> Stream

Set the error path of a stream that is sent to the done path. In case your data is written to a file, this would then be the path to which the file is written.

Parameters

  • path: str - New path name

Returns

Stream - The manipulated stream.

Example

# Set stream error path:
stream.setErrorPath('New-Error-Stream-Path')

# Functions which return a Stream can be chained like this:
stream.setErrorName('New-Stream-Name').setErrorPath('/my/specific/path')

setOutputName()

setOutputName(name: str) -> Stream

Set the name of a stream that is sent to the output path. In case your output is a file, this would then be the file's name.

Parameters

  • name: str - New stream name

Returns

Stream - The manipulated stream.

Example

# Set stream output name:
stream.setOutputName('New-Output-Stream-Name')

# Functions which return a Stream can be chained like this:
stream.setOutputName('New-Stream-Name').setOutputPath('/my/specific/path')

setOutputPath()

setOutputPath(path: str) -> Stream

Set the output path of a stream that is sent to the normal output channel. In case your data is written to a file, this would then be the path to which the file is written.

Parameters

  • path: str - New path name

Returns

Stream - The manipulated stream.

Example

# Set stream output path:
stream.setOutputPath('New-Output-Stream-Path')

# Functions which return a Stream can be chained like this:
stream.setOutputName('New-Stream-Name').setOutputPath('/my/specific/path')

setProcessorOutputName()

setProcessorOutputName(output_processor_name: str, stream_name: str) -> Stream

Set the name of the output stream for a specific Processor. Use this for example, if you have a processor with multiple output ports, and need to send messages to either of these ports, but also determine what the name of the streams should be on either of these ports.

This is used to change the name of the output stream. E.g. if a file is written, this will be the filename in output.

Parameters

  • output_processor_name: str - Name of the output processor at the end of the workflow.
  • stream_name: str - New name the stream should have when data is sent out to the receiving processor.

Returns

Stream - The manipulated stream.

Example

# Option 1:
# Set stream output names:
stream.setProcessorOutputName('NameOfOutputProcessor', 'MyNewStreamName')

# or Option 2:
OUTPUT_PORT = getOutputPort('NameOfOutputPort')
stream.setProcessorOutputName(OUTPUT_PORT.getPeerProcessorName(), 'MyNewStreamName')
# Option 2 only works if the processor connected to OUTPUT_PORT is the actual Output-Processor.

# Functions which return a Stream can be chained like this:
stream.setProcessorOutputName('New-Stream-Name').setProcessorOutputPath('/my/specific/path')

setProcessorOutputPath()

setProcessorOutputPath(output_processor_name: str, path: str) -> Stream

Set the path of the output stream for a specific Processor. Use this, for example, if you have a processor with multiple output ports, and need to send messages to either of these ports, but also determine what the path of the streams should be on either of these ports.

This is used to change the path of the output stream. E.g., if a file is written, this will be written to the specified path.

Parameters

  • output_processor_name: str - Name of the output processor at the end of the workflow.
  • path: str - New name the stream should have when data is sent out to the receiving processor.

Returns

Stream - The manipulated stream.

Example

# Option 1:
# Set stream path name:
stream.setProcessorPathName('NameOfOutputProcessor', '/my/specific/output/path')

# or Option 2:
OUTPUT_PORT = getOutputPort('NameOfOutputPort')
stream.setProcessorOutputName(OUTPUT_PORT.getPeerProcessorName(), '/my/specific/output/path')
# Option 2 only works if the processor connected to OUTPUT_PORT is the actual Output-Processor.

# Functions which return a Stream can be chained like this:
stream.setProcessorOutputName('New-Stream-Name').setProcessorOutputPath('/my/specific/path')