Skip to main content
Version: 2.0.0

Welcome to Python

Introduction

In order to allow for complex message processing and applying custom business logic to message handling, layline.io provides the Python Asset. This asset enables you to apply theoretically any kind of scriptable logic when handling messages.

Popular examples are:

  • Data analysis
  • Enrichment
  • Statistics gathering
  • Complex mapping
  • Filtering

and potentially anything you can imagine when handling data.

Using the Python Asset is not mandatory of course. Many challenges you may be facing can be accomplished using the out-of-the-box Assets which layline.io provides without resorting to Python. But in many instances, an intermittent scripting processor inserted into a Workflow can go a long way in enabling you to execute logic which would be otherwise hard to implement using pure UI elements.

On this page we will explain how to use Python with the Python Asset.

Scope of Python language

layline.io embeds a Python runtime to facilitate Python scripting.

Limitations: There might be limitations in using Python which are specific to the layline.io environment. Please refer to the layline.io Python documentation for details.

If you want to get familiar with Python outside of layline.io, there are websites for Python tutorials available, like for instance Python.org, to start getting familiar with Python.

How it works - Lifecycle Hooks

Scripts within layline.io can only be executed using the Python Asset. In fact, that's its sole purpose. You can use none, one or as many of these Assets within your Project and within the Project's configured Workflows.

Since layline.io is a reactive system, a Python Asset receiving a message automatically starts processing this message with the underlying script you provided. One of the key methods here is onMessage:

Just like onMessage is a hook, the Python Asset provides a number of additional hooks which are automatically invoked as part of a Python Asset's lifecycle. A complete list of these hooks can be found in the Python Processor Reference.

The following sequence graph shows a typical lifecycle of a Python Processor:

Let's explain:

When a Workflow is instantiated as part of a Deployment (running on a Reactive Cluster), an instantiated Python Processor runs through a number of stages:

1. Initialize

Anything defined on the global level (non-functions) get evaluated. This can be things like variable initialization, getting an output port etc. Use this to initialize global variables and constants for example:

# Example
OUTPUT_PORT = processor.getOutputPort('MyOutput')
my_var = None
connection = None
# etc ...

2. onInit()

layline.io then automatically invokes the onInit() method. This is a more contained area to perform initializations:

# Example
def onInit():
global connection
connection = services.MyDBService.openConnection()
# etc ...

3. onStreamStart()

When a Workflow starts processing a Stream, a Workflow-wide Stream-start event is issued. You can hook on to this event using the onStreamStart() Method.

filename = None

def onStreamStart():
global filename
filename = stream.getName()
# etc ...

4. onMessage()

Every time Python Processor is fed with an event by an upstream Processor, the onMessage() hook is invoked. It is therefore central to message processing:

# Get the output port
OUTPUT_PORT = processor.getOutputPort('MyOutput')

def onMessage():
if message.typeName === 'Header':
# do nothing
pass
elif message.typeName === 'Trailer':
# do something with the trailer
pass
elif message.typeName = 'Detail':
# invoke a self-defined function which handles the message.
handle_detail(message)

stream.emit(message, OUTPUT_PORT)

def handle_detail(detail):
# do something with the message
pass

5. onStreamEnd()

Finally, when a Stream comes to an end, the onStreamEnd() hook is automatically called. Write your code here for finalizing actions regarding the processing of a stream:

def onStreamEnd():
# Report in case some customer data could not be found during stream processing
if num_customer_data_not_found > 0:
stream.logInfo(f'{num_customer_data_not_found} customers could not be found in the database.')

Referencing and Reusing Scripts

Introduction

Sometimes, you need the same functionality across multiple Scripts. This can be achieved by creating generic scripts containing general functions that can be (re)used in many other scripts.

Creating a reusable script

Reusing Python (Python Introduction)

Here we have created a file util.py (1) which contains one function which we want to reuse on other scripts (2). You can write your script as you like. All you need to know is that it needs to be valid Python.

Import functions from one script into another

There are different approaches on how to import reusable functions from generic scripts into other scripts:

# import one dedicated function from "utils.py"
from utils import get_utc_time_offset

# or: import multiple entities from external script
from utils import get_utc_time_offset, my_function2, my_function3

# or: import function with new name
from utils import get_utc_time_offset as get_utc

# or: import all available functions via alias name to reference them by alias
import utils

from utils import get_utc_time_offset # in case the script to be loaded is in the same directory as this script

from src.main.python.utils import get_utc_time_offset # absolute path configuration

# ...

offset = get_utc_time_offset(date_a, date_b)

second_offset = get_utc(date_a, date_b)

third_offset = utils.get_utc_time_offset(date_a, date_b)

Invalid import script path

layline.io will check for the existence of referenced scripts upon deployment. If the script cannot be found, layline.io will show an error, and you have to correct the problem.

Deployment error due to wrong script reference (Python Introduction)

Error handling

Unforced errors

Python is an interpreted language. You can therefore encounter unforced errors at runtime. Most of them show up when trying to start a Workflow containing a Python Processor. In this case the Workflow will not start and the cause of error is displayed:

Workflow activation failure due to Python runtime error (Python Introduction)

Forced errors

Forced errors occur when you deliberately raise an error at runtime:

try:
insert_result = connection.MyInsert(
{
"DeviceID": message.data.IOT.DEVICE_ID,
"Measurement": message.data.IOT.MEASUREMENT,
"Timestamp": message.data.IOT.TIMESTAMP
}
)
except Exception as error:
close_connection()
raise error
finally:
connection = None

# OR
# raise ValueError("My individual error message: " + my_message)

Raising an error which you do not catch yourself with a try ... except clause, will be bubbled up. For this purpose a Python Asset provides a number configuration options (which you will also find in other Asset configurations):

Processing within a Flow Processor like this one can fail for various reasons. In this section you can define how the system should behave in case of such problems.

Failure Types

Four types of failures are observable:

#Failure observables / ReactionIgnoreRetry Event/MessageRetry StreamRollback Stream
1Stream start failure handling
A problem occurred in this Asset when starting a new stream.
2Stream end failure handling
A problem occurred in this Asset when ending a stream.
3Message failure handling
A problem occurred when handling a specific message in this Asset.
4Rollback commit failure handling
A problem occurred during system issued rollback or commit procedure.

Failure Type Reactions

Each of these failure types can be responded to with four different reactions:

Ignore

Don't do anything.

Rollback Stream

Rollback the complete stream. In the case of batch/file processing for example the complete file (which represents the stream) will be rolled back and put into error.

warning

A rollback signal will be issued to all participating Workflow Processors. Each Processor needs to ensure itself how to deal with a rollback. A Javascript Flow Processor, for example, which directly interacts with a database will have to react to a rollback signal:

Rollback example in Javascript
   export function onRollback() {
if (connection) {
try {
connection.rollbackTransaction();
connection.closeConnection();
} catch (err) {
} finally {
connection = null;
}
}
}
Retry Stream

Don't simply give up. Try to process the whole stream again. This option allows you to define how often and in what intervals the retries should be performed.

Failure Handling Retry Stream

Stream Retry Settings

  • Max. Retries: The number of retries which should be performed. For example "5".
  • Min. Backoff [ms]: Wait at least x milliseconds between each retry. For example "12000" (12 seconds).
  • Max. Backoff [ms]: Wait at max x milliseconds between each retry. For example "150000" (150 seconds).

Based on these parameters, the system will try to balance the defined number of retries within the time boundaries of min. backoff and max. backoff. Taken the example numbers from above, the five retries would happen in this timespan:

Failure Retry Stream Handling

Retry Event/Message

Pick this reaction if you want to retry processing the current message. As is the case with the Retry Stream reaction you can define how often and in what intervals the retries should be performed.

Failure Retry Event/Message Handling

The settings are the same as with the Retry Stream reaction. So please refer to this. There is one additional setting, however, which is When final retry failed.

You here have the option to decide what to do if the message cannot be processed, even after the final retry:

  • Ignore: Don't do anything.

  • Rollback Stream: Fallback to rolling back the whole stream.

  • Retry Stream: Retry the whole stream once again. If you pick this option, then you can again define all relevant Retry Stream parameters.

    Failure Retry Event/Message -> Retry Stream Handling

Only works for specific Source Types within a Workflow

A Workflow has one Input Processor which is responsible for reading data from a Source. Sources are for example files, databases, or message queues.

The settings for Retry Event/Message and Retry Stream only work for specific Source Types which a Workflow uses. These are:

Summary

Using these principles, the usage of the Python Asset is straightforward.

You may wonder how big a Python script should get, or how small. Quick answer: It's really up to you. It makes sense to split scripts into logical chunks and then potentially chain a number of scripts together in a Workflow.