Skip to main content
Version: 2.0.0

Python Flow Processor

Purpose

Asset Dependency Graph (Python Flow Processor)

The Python Asset allows you to define detailed business logic which you may want to apply to a flow of messages. Here are some examples:

  • Convert message data from one format to another
  • Filter information based on specific rules
  • Enrich individual data using specific rules and/or external data sources (e.g. reference data)
  • Route messages based on your own criteria
  • Gather metrics and statistics, and store and forward them to other targets

and basically anything else you can imagine here.

Prerequisites

You need:

  • A Source Script which should be executed within this asset.
  • Knowledge on how to work with Python in layline.io. Please check the Python Language Reference to learn about this.

Configuration

Name & Description

Name & Description (Python)

  • Name : Name of the Asset. Spaces are not allowed in the name.

  • Description : Enter a description.

The Asset Usage box shows how many times this Asset is used and which parts are referencing it. Click to expand and then click to follow, if any.

Asset Dependencies

Use this section to add Formats which you plan to use as part of your filtering and routing rules.

Why do I have to add formats here?
Doesn't the system know which Formats I am using? layline.io automatically understands when you are using Formats as part of your input and output processors and automatically mounts them at runtime. But when you are referencing Formats which are not used as part of an input or output Processor directly, but rather referenced in a Javascript Flow Processor or Quickscript, then the system may not be aware that you are using this format within any of those scripts. This would result in a runtime error.

To avoid this, you can explicitly mention the Formats you are referencing in your scripts. This ensures, that these Formats will always be mounted at runtime. So it is best practice to add Formats which are referenced in this Asset here.

Asset Dependency

To add formats click on Add Dependency and select the Format you wish to add as a dependency. Repeat for any other Format dependency.

Input Ports

Input Ports

This Processor can have one or more input ports from which it receives data to process. It must have at least one input port, however.

A port can have a name and description. Names must exist and be unique within the Processor.

You can add an input port by clicking on Add Port, or remove an input port by clicking on Delete. You cannot delete the port if it is the last one within the processor.

Output Ports

This Processor can have one-to-many Output Ports to send messages on within the Workflow.

Output Ports

A port can have a name and description. Names must exist and be unique within the processor.

Root Script

The Python Asset obviously needs a Script to be executed. Prior to version 1.0 of layline.io the Script was configured as part of this Asset. Starting with v1.0 all Scripts are defined in the Sources tab of the project (2):

Assigned Source Script (Python)

The root script to be executed within this Asset is then selected here:

Root Script (Python)

Python Language Reference

To understand how a Source must be structured to work in a Python Asset, please consult the Python Language Reference.

Service Mappings

Python scripts may make use of Services which you may have configured here. These methods could be database operations, HTTP-request and whatever else Services do provide.

Let's say your Python script invokes an HTTP-Service which provides a method to retrieve the current Bitcoin price via a REST-Api. Let's also assume that the name of the Service to be linked is BTCService.

  1. Add a Service Mapping by clicking on Add Service Mapping (1).
  2. Select the Service which you want to map (2).
  3. Provide a Logical Service Name. This is the name by which the Service is used in the underlying Python script! If the name you enter here, is different to what you are using in your script, the script will not recognize the Service.

Service Mappings (Python)

Arguments

You can pass arguments to the assigned script. This may be useful when reusing the same script in various different Python Assets and Workflows, but the script should behave slightly different in each of those instances. Passing arguments from a Python Asset can provide this functionality. Please check the getArguments() method here, on how to retrieve arguments in the script.

Arguments (Python)

In case you are entering arguments (1), the editor will check for valid JSON and outline this in case it is invalid. You can format the JSON entries with a click on Format JSON (2).

Invalid JSON

Entering invalid JSON will cause problems when using the Arguments in the underlying script.

Failure Handling

Processing within a Flow Processor like this one can fail for various reasons. In this section you can define how the system should behave in case of such problems.

Failure Types

Four types of failures are observable:

#Failure observables / ReactionIgnoreRetry Event/MessageRetry StreamRollback Stream
1Stream start failure handling
A problem occurred in this Asset when starting a new stream.
2Stream end failure handling
A problem occurred in this Asset when ending a stream.
3Message failure handling
A problem occurred when handling a specific message in this Asset.
4Rollback commit failure handling
A problem occurred during system issued rollback or commit procedure.

Failure Type Reactions

Each of these failure types can be responded to with four different reactions:

Ignore

Don't do anything.

Rollback Stream

Rollback the complete stream. In the case of batch/file processing for example the complete file (which represents the stream) will be rolled back and put into error.

warning

A rollback signal will be issued to all participating Workflow Processors. Each Processor needs to ensure itself how to deal with a rollback. A Javascript Flow Processor, for example, which directly interacts with a database will have to react to a rollback signal:

Rollback example in Javascript
   export function onRollback() {
if (connection) {
try {
connection.rollbackTransaction();
connection.closeConnection();
} catch (err) {
} finally {
connection = null;
}
}
}
Retry Stream

Don't simply give up. Try to process the whole stream again. This option allows you to define how often and in what intervals the retries should be performed.

Failure Handling Retry Stream

Stream Retry Settings

  • Max. Retries: The number of retries which should be performed. For example "5".
  • Min. Backoff [ms]: Wait at least x milliseconds between each retry. For example "12000" (12 seconds).
  • Max. Backoff [ms]: Wait at max x milliseconds between each retry. For example "150000" (150 seconds).

Based on these parameters, the system will try to balance the defined number of retries within the time boundaries of min. backoff and max. backoff. Taken the example numbers from above, the five retries would happen in this timespan:

Failure Retry Stream Handling

Retry Event/Message

Pick this reaction if you want to retry processing the current message. As is the case with the Retry Stream reaction you can define how often and in what intervals the retries should be performed.

Failure Retry Event/Message Handling

The settings are the same as with the Retry Stream reaction. So please refer to this. There is one additional setting, however, which is When final retry failed.

You here have the option to decide what to do if the message cannot be processed, even after the final retry:

  • Ignore: Don't do anything.

  • Rollback Stream: Fallback to rolling back the whole stream.

  • Retry Stream: Retry the whole stream once again. If you pick this option, then you can again define all relevant Retry Stream parameters.

    Failure Retry Event/Message -> Retry Stream Handling

Only works for specific Source Types within a Workflow

A Workflow has one Input Processor which is responsible for reading data from a Source. Sources are for example files, databases, or message queues.

The settings for Retry Event/Message and Retry Stream only work for specific Source Types which a Workflow uses. These are:

Please see section Forced Errors to understand how to use these settings.


Can't find what you are looking for?

Please note, that the creation of the online documentation is Work-In-Progress. It is constantly being updated. should you have questions or suggestions, please don't hesitate to contact us at support@layline.io .