catcher_modules.pipeline package

Submodules

catcher_modules.pipeline.airflow module

class catcher_modules.pipeline.airflow.Airflow(**kwargs)[source]

Allows you to run dag sync/async, get xcom and populate connections in Apache Airflow workflow management platform.

Input:
Config:airflow config object
Db_conf:airflow db backend configuration.
  • dbname: name of the database to connect to
  • user: database user
  • host: database host
  • password: user’s password
  • port: database port
Run:Run dag. Return it’s run id in case of sync false or wait till it is done.
  • config: airflow config object. Required.
  • dag_id: dag’s id to run. Required.
  • dag_config: a dict of optional dag-config. Optional
  • sync: if true - will wait till the dag_run finished. Optional (default is false)
  • wait_timeout: wait timeout in seconds for sync=true. Optional (default is 5 sec)
Run_status:Get a dag run object for run id
  • config: airflow config object. Required.
  • dag_id: dag’s id to run. Required.
  • run_id: run id of a dag. Is returned from run with sync=false
Get_xcom:Get xcom value
  • config: airflow config object. Required.
  • task_id: task id which pushed data to xcom. Required.
  • execution_date: dag’s run execution date. Can be obtained via run_status. Optional Either execution_date or run_id must be set.
  • run_id: run id of a dag. Optional Either execution_date or run_id must be set.

Run dag async and check it’s status later manually.

variables:
        db_conf: 'airflow:airflow@localhost:5433/airflow'
        airflow: 'http://127.0.0.1:8080'
steps:
    - airflow:
        run:
            config:
                db_conf: '{{ db_conf }}'
                url: '{{ airflow }}'
            dag_id: 'init_data_sync'
        register: {run_id: '{{ OUTPUT }}'}
    - wait:
        seconds: 50
        for:
            - airflow:
                run_status:
                    config:
                        db_conf: '{{ db_conf }}'
                        url: '{{ airflow }}'
                    dag_id: 'init_data_sync'
                    run_id: '{{ run_id }}'
                register: {status: '{{ OUTPUT }}'}
            - check: {the: {{ status }}, is: 'failed'}

Run dag and wait for it to be completed successfully.

variables:
        db_conf: 'airflow:airflow@localhost:5433/airflow'
        airflow: 'http://127.0.0.1:8080'
steps:
    - airflow:
        run:
            config:
                db_conf: '{{ db_conf }}'
                url: '{{ airflow }}'
            dag_id: 'init_data_sync'
            sync: true
            wait_timeout: 50

Run dag, wait for it and get task’s xcom.

variables:
        db_conf: 'airflow:airflow@localhost:5433/airflow'
        airflow: 'http://127.0.0.1:8080'
steps:
    - airflow:
        run:
            config:
                db_conf: '{{ db_conf }}'
                url: '{{ airflow }}'
            dag_id: 'execute_batch'
            sync: true
            wait_timeout: 50
        register: {run_id: '{{ OUTPUT }}'}
    - airflow:
        get_xcom:
            config:
                db_conf: '{{ db_conf }}'
                url: '{{ airflow }}'
            task_id: fill_data
            run_id:  '{{ run_id }}'