DATA - Pipeline

Pipelines

Pipelines are small transformations applied to incoming data before writing Instances to the Database.

Pipelines contain tasks to perform atomic operations on each field of each incoming data instance.

Pipelines support conditions to perform tasks only if certains conditions are met.

Internally, pipelines are managed as a special built-in _pipeline DataClass.

Pipeline Example

# A small pipeline to adapt user.csv to bulk load users from external systems

- classname: _pipeline
  keyname: user_import_pipeline
  displayname: user_import_csv
  description: Use this pipeline to import users as a csv file from system X/Y/Z
  content: |
      csv_delimiter: ';'
      classname: _user
      keyfield: login
      encoding: 'utf-8'
      tasks: 
            # TASKNAME: ["[!]CONDITION", "opt1", "opt2", "opt3", ...]
            # use "!" before CONDITION to negate
            # use '' CONDITION as always-True
          - field_lower: ['', email, login]
          - field_upper: ['', external_id]
          - field_uuid: ['', uuid_auto]
          - field_datetime_now: ['', last_sync]      

Pipeline usage

$  cavaliba cavaliba_load /files/user.csv --pipeline user_import_pipeline

In the Web Import Tool, you can specify a pipeline to apply on provided data.

classname

For CSV files, this mandatory field provides the Schema name to load.

For YAML/JSON files, classname is provided by each data entry. A single file can combine objects for differnt Schemas.

keyfield

The keyfield option defines the name of the column which provides the keyname (primary key) value for each Instance.

Default if none provided: keyname

encoding

For CSV files, you can configure the character encoding.

Default (if none) is utf-8.

Example:

content: |
    encoding: 'ISO-8859-1'

Pipeline conditions

Conditions are True or False. They are valid for an entry. They are reset when processing the next entry.

An empty condition is True.

A non-empty condition is False by-default.

You set a condition with a set_condition task, performing various checks on any fields of an entry.

You check a condition, by providing its name as the first parameter of a task operation.

You use quote around a condition name if it contains special characters.

If you wan’t to negate a condition (perform operation if condition is False), you put a ! before the name of the condition, and you surround with quotes.

Example:


# check a condition : does myfield contains 'test' ?
# perform a field operation (set_field, create my_status field) if condition is True
tasks: 
- set_condition: [CONDITION_TEST, field_match, myfield, 'test']
- field_set: [CONDITION_TEST, my_status, 'testok']

# set a condition, and perform a field operation if condition is NOT met
# notice the ! in the field_set
tasks: 
- set_condition: [CONDITION_TEST, field_match, myfield, 'test']
- field_set: ['!CONDITION_TEST', my_status, 'testok']

# no condition, always perform
tasks:
- field_set: ['', new_field, 'Hello']

Conditions operators

field_match

field_match : [fieldname, 'regexp']

Pipeline tasks reference


    # TASKNAME: ["[!]CONDITION", "opt1", "opt2", "opt3", ...]
    # use "!" before CONDITION to negate
    # many task can operate on seveal fields ("...")

    - field_noop:         [COND]                              : do nothing
    - field_toint:        [COND, "field1","..."]              : convert to int
    - field_tofloat:      [COND, "field1","..."]              : convert to float
    - field_tostring:     [COND, "field1","..."]              : convert to string
    - field_nospace:      [COND, "field1","..."]              : remove all whitespaces
    - field_regexp_sub:   [COND, 'fiel', 'pattern','replace'] : appli regexp/replace
    - field_set:          [COND, 'field', 'value']            : create field with value
    - field_copy:         [COND, 'field1','field2']           : field1 to field2  
    - field_rename:       [COND, 'field1','field2']           : field1 to field2
    - field_merge:        [COND, 'field1','field2','field3']  : field1 + field2 > field3
    - field_delete:       [COND, "field1", "field2", ...]     : remove fields
    - field_keep:         [COND, "field1", "field2", ...]     : keep only these fields (and classname/keyname)
    - field_lower:        [COND, 'field1', ...]               : lowercase
    - field_upper:        [COND, 'field1', ...]               : uppercase
    - field_date_now:     [COND, 'field1', 'field2', ...]     : YYYY-MM-DD
    - field_time_now:     [COND, 'field1', 'field2', ...]     : HH:MM:SS
    - field_datetime_now: [COND, 'field1', 'field2', ...]     : YYYY-MM-DD HH:MM:SS
    - field_uuid:         [COND, 'field1', 'field2', ....]    : random UUID string
    - field_append:       [COND, 'field1', 'suffix']          : append a suffix to field1
    - field_prepend:      [COND, 'field1', 'prefix']          : prefix field1 with

    - discard:            [COND]                              : eliminate full entry

field_keep

tasks:
- field_keep: [CONDITION, "field1","field2", ...]

Keep provided field only. Removes all other fields.

field_delete

tasks:
- field_delete: [CONDITION, "field1","field2", "field3", ...]

Removes all specified fields.

field_noop

tasks:
- field_noop: [CONDITION]

Performs nothing.

field_set

tasks:
- field_set: [CONDITION, "fieldname","AnyValue"]

Creates / Overwrite fieldname and set its value with provided value.

field_copy

tasks:
- field_copy: [CONDITION, "field1","field2"]

Create/Overwrite field2 and set its value with field1 value.

field_rename

tasks:
- field_rename: [CONDITION, "field1","field2"]

Rename field1 to field2 ; create/overwrite existing field2 and set its value with field1 value.

field_lower

tasks:
- field_lower: [CONDITION, "field", "field", ...]

Convert field value to lowercase

field_upper

tasks:
- field_upper: [CONDITION, "field", "field", ...]

Convert field value to uppercase

field_date_now

tasks:
- field_date_now: [CONDITION, "field", "field", ...]

Set field’s value with current date in YYYY-MM-FF format.

Usefull for automated or periodic import to determine in-sync/out-of-sync objects.

field_datetime_now

Same as field_date_now, with time information as HH:MM:SS format.

field_time_now

Same as field_date_now, with time information only, also as HH:MM:SS format.

field_uuid

tasks:
- field_uuid: [CONDITION, "field", "field", ...]

Create/overwrites a field with a UUID string.

Usefull to establich a single/primary keep for an object.

field_toint

tasks:
- field_toint: [CONDITION, "field", "field", ...]

Convert a field’s value to integer.

field_tofloat

tasks:
- field_tofloat: [CONDITION, "field", "field", ...]

Convert a field’s value to floating point value.

field_tostring

tasks:
- field_tostring: [CONDITION, "field", "field", ...]

Convert a field’s value to a string.

field_nospace

tasks:
- field_nospace: [CONDITION, "field", "field", ...]

Removes all whitespace and tab from field’s value. Faster than general regexp task.

field_regexp_sub

tasks:
- field_nospace: [CONDITION, "fieldname", "pattern","replace"]

Alter fieldname’s value by replacing a regexp pattern with the provided value. You may use any Python standard regexp for your pattern.

Example:

tasks:
- field_regexp_sub: ['',  'field_a', 'test', 'QWERTY']

# Before : {'field_a': 'This is a test from unittest !']
# After  : {'field_a': "This is a QWERTY from unitQWERTY !")

field_merge

tasks:
- field_merge: [CONDITION, 'field1','field2','field3']

Concat field1 and field2 into new field3 (overwrite if already exists).

If field’s value are numerical, a mathematical addition is performed.

If field’s value are stings, a string concatenation is performed.

You may use toint/tofloat/tostring to handle various case.

field_prepend

tasks:
- field_prepend: [CONDITION, 'field1','prefix']

Put the prefix string in front of the value of field1.

field_append

tasks:
- field_append: [CONDITION, 'field1','suffix']

Put the suffix string at the end of field1’s value.