Pipeline
Pipelines are small transformations applied to incoming data before writing the data to the Database.
Pipelines contain tasks to perform atomic operations on each field of each incoming data item.
Pipelines support conditions to perform tasks only if certain conditions are met.
Internally, pipelines are managed as a special built-in _pipeline Schema.
Use cases
Pipelines can be used at import time to normalize, enrich or filter incoming data before it is stored. They can also be applied to bulk transform existing objects and assets already stored in a Schema.
Typical use cases:
- Disable out-of-sync objects — compare
last_syncagainst a reference date and set_action: disableon stale entries - Delete entries matching a criteria — discard entries where a field matches a pattern or a condition is met
- Update a field conditionally — set a status field, overwrite a value, or compute a new field when a condition is true
- Normalize imported data — lowercase emails, strip whitespace, rename columns, convert types
- Generate keys — compute a deterministic
keynamefrom multiple fields usingfield_md5orfield_uuid - Enrich data — compute timestamps, append prefixes, join fields into composite values
Example — disable assets not seen since a cutoff date:
tasks:
- set_condition: [STALE, lt, last_sync, cutoff_date]
- field_set: [STALE, _action, disable]Example — delete entries where status matches a pattern:
tasks:
- set_condition: [OBSOLETE, field_match, status, '^(deleted|obsolete)$']
- discard: [OBSOLETE]Example — update a field when a condition is met:
tasks:
- set_condition: [NO_OWNER, empty, owner]
- field_set: [NO_OWNER, owner, 'unassigned']Pipeline Example
# A small pipeline to adapt user.csv to bulk load users from external systems
- classname: _pipeline
keyname: user_import_pipeline
displayname: user_import_csv
description: Use this pipeline to import users as a csv file from system X/Y/Z
content: |
csv_delimiter: ';'
classname: _user
keyfield: login
encoding: 'utf-8'
tasks:
# TASKNAME: ["[!]CONDITION", "opt1", "opt2", "opt3", ...]
# use "!" before CONDITION to negate
# use '' CONDITION as always-True
- field_lower: ['', email, login]
- field_upper: ['', external_id]
- field_uuid: ['', uuid_auto]
- field_datetime_now: ['', last_sync] Pipeline usage
$ cavaliba load files/user.csv --pipeline user_import_pipelineIn the Web UI Import Tool, you can specify a pipeline to apply on provided data.
run_permission
A pipeline can declare a run_permission field. When set, the caller must hold the named permission to execute apply_to_schema against a schema. If the caller does not hold the permission, the execution is denied and an error is returned.
This is independent of the data-level permissions checked when each instance is written back to the database.
Example — restrict a pipeline to users holding p_pipeline_run:
- classname: _pipeline
keyname: my_cleanup_pipeline
run_permission: p_pipeline_run
content: |
tasks:
- field_set: ['', _action, disable] If run_permission is empty or absent, no permission check is performed at pipeline execution time.
apply_to_schema
apply_to_schema iterates over all instances of a given schema, applies the pipeline tasks to each one, and writes the result back to the database. It is the standard way to run a bulk pipeline against existing data.
Parameters:
| Parameter | Description |
|---|---|
schemaname |
The target schema keyname (e.g. server, application) |
dryrun |
If True, pipeline is applied but no changes are written to the database (default: False) |
aaa |
Caller identity and permissions dict — required for permission checks and write operations |
Returns a tuple (count_ok, count_discarded, errors):
| Value | Description |
|---|---|
count_ok |
Number of instances successfully processed (and written if not dryrun) |
count_discarded |
Number of instances discarded by the pipeline (not written) |
errors |
List of error strings for instances that failed to write |
classname
For CSV files, this mandatory field provides the Schema name to load.
For YAML/JSON files, classname is provided by each data entry. A single file can combine objects for different Schemas.
keyfield
The keyfield option defines the name of the CSV column which provides the keyname (primary key) value for each Instance.
Default if none provided: keyname
encoding
For CSV files, you can configure the character encoding.
Default (if none) is utf-8.
Example:
content: |
encoding: 'ISO-8859-1'
Pipeline conditions
Conditions are True or False. They are valid for an entry. They are reset when processing the next entry.
An empty condition is True.
A non-empty condition is False by default.
You set a condition with a set_condition task, performing various checks on any fields of an entry.
You check a condition by providing its name as the first parameter of a task operation.
You use quotes around a condition name if it contains special characters.
If you want to negate a condition (perform operation if condition is False), you put a ! before the name of the condition, and you surround with quotes.
Example:
# check a condition : does myfield contains 'test' ?
# perform a field operation (set_field, create my_status field) if condition is True
tasks:
- set_condition: [CONDITION_TEST, field_match, myfield, 'test']
- field_set: [CONDITION_TEST, my_status, 'testok']
# set a condition, and perform a field operation if condition is NOT met
# notice the ! in the field_set
tasks:
- set_condition: [CONDITION_TEST, field_match, myfield, 'test']
- field_set: ['!CONDITION_TEST', my_status, 'test_not_ok']
# no condition, always perform
tasks:
- field_set: ['', new_field, 'Hello']