Wilson Mar bio photo

Wilson Mar

Hello!

Calendar YouTube Github

LinkedIn

Workflow as Code (WaC)

US (English)   Norsk (Norwegian)   Español (Spanish)   Français (French)   Deutsch (German)   Italiano   Português   Estonian   اَلْعَرَبِيَّةُ (Egypt Arabic)   Napali   中文 (简体) Chinese (Simplified)   日本語 Japanese   한국어 Korean

Overview

Airflow is a platform to programmatically author, schedule, and monitor workflows to process data where output of one element is the input of the next element.

A “data workflow” is a sequence of tasks which begin by a triggering event.

It’s a hot market, so Airflow’s competitor includes Prefect, Streamz.

NOTE: Content here are my personal opinions, and not intended to represent any employer (past or present). “PROTIP:” here highlight information I haven’t seen elsewhere on the internet because it is hard-won, little-know but significant facts based on my personal research and experience.

The Airflow scheduler executes tasks on an array of workers while following specified dependencies.

Use Airflow to author workflows as DAGs (Directed Acyclic Graphs) of tasks (named t1, t2, etc.).

  • “Directed” meaning sequenced
  • “Acyclic” meaning flow is one-way
  • https://www.youtube.com/watch?v=XD7euLOzKbs

Airflow instantiates pipelines dynamically.

https://www.youtube.com/watch?v=l764YAGPlIs Scalable Data Ingestion Architecture Using Airflow and Spark | Komodo Health Data Council

When workflows are defined as code so they become more maintainable, versionable, testable, and collaborative.

Airflow uses operators for working with Python, Postgres, Bash, Email, etc.

Airflow pipelines are lean and explicit. Parameterizing your scripts is built into the core of Airflow using the powerful Jinja templating engine.

Rich command line utilities make performing complex surgeries on DAGs a snap.

A rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.

Airflow is commonly used to process data, but has the opinion that tasks should ideally be idempotent (i.e., results of the task will be the same, and will not create duplicated data in a destination system).

Airflow is not designed to pass large quantities of data from one task to the next (though tasks can pass metadata using Airflow’s Xcom feature).

For high-volume, data-intensive tasks, a best practice is to delegate to external services specializing in that type of work.

Amazon

VIDEO:

https://docs.aws.amazon.com/mwaa/latest/userguide/what-is-mwaa.html

Intro Videos

https://www.youtube.com/results?search_query=apache+airflow Among the list of videos:

VIDEO: 🌈Apache Airflow for beginners by Varya Karpenko references https://github.com/KarpenkoVarya/airflow_for_beginners which sets up a data pipline to get a fresh portion of Stack Overflow questions with tag pandas to our mailbox daily.

https://www.youtube.com/watch?v=i25ttd32-eo Airflow for Beginners - Run Spotify ETL Job in 15 minutes! by Karolina Sowinska referencing https://github.com/karolina-sowinska/free-data-engineering-course-for-beginners where main.py calls spotify_dag.py calls spotify_etl.py

https://github.com/apache/airflow is “Configuration as Code” (Python)

https://gtoonstra.github.io/etl-with-airflow/etlexample.html

Tutorials

For those looking to get started on their journey becoming an Airflow Developer please take a look at this course from Alexandra Abbas https://www.udemy.com/course/apache-airflow-course/

Install Airflow executables

VIDEO https://airflow.apache.org/docs/apache-airflow/stable/installation.html

Airflow was tested on Ubuntu Buster LTS.

https://airflow.apache.org/docs/apache-airflow/stable/installation.html

See https://arpitrana.medium.com/install-airflow-on-macos-guide-fc66399b2a9e

  1. Create airflow folder under the root folder:

    mkdir -p $HOME/airflow/dags
    
  2. Define the path as the AIRFLOW_HOME environment variable in ~/.bash_profile

    export AIRFLOW_HOME=$HOME/airflow-tutorial
    cd "$AIRFLOW_HOME
    
  3. Airflow is installed in ~/.local/bin so in .bash_profile:

    PATH=$PATH:~/.local/bin
    ls ~/.local/bin
    

    WARNING: brew info airflow refers to a video watching app for AppleTV.

  4. Since Airflow uses obsolete Python 3.7, instead of pyenv, use Conda:

    pyenv install 3.7.10

    Upgrade Conda:

    conda update -n base -c defaults conda
    
    cd "$AIRFLOW_HOME"
    conda create --name airflow-tutorial python=3.7
    

    At time of writing (12 SEP 2021):

    /usr/local/anaconda3/lib/python3.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.26.6) or chardet (3.0.4) doesn't match a supported version!
      RequestsDependencyWarning)
    Collecting package metadata (current_repodata.json): done
    Solving environment: done
     
    ## Package Plan ##
     
      environment location: /usr/local/anaconda3/envs/airflow-tutorial
     
      added / updated specs:
     - python=3.7
     
     
    The following packages will be downloaded:
     
     package                    |            build
     ---------------------------|-----------------
     ca-certificates-2021.7.5   |       hecd8cb5_1         113 KB
     certifi-2021.5.30          |   py37hecd8cb5_0         139 KB
     libcxx-12.0.0              |       h2f01273_0         805 KB
     libffi-3.3                 |       hb1e8313_2          44 KB
     ncurses-6.2                |       h0a44026_1         749 KB
     openssl-1.1.1l             |       h9ed2024_0         2.2 MB
     pip-21.0.1                 |   py37hecd8cb5_0         1.8 MB
     python-3.7.11              |       h88f2d9e_0        18.1 MB
     readline-8.1               |       h9ed2024_0         333 KB
     setuptools-52.0.0          |   py37hecd8cb5_0         721 KB
     sqlite-3.36.0              |       hce871da_0         1.1 MB
     tk-8.6.10                  |       hb0a8c7a_0         3.0 MB
     wheel-0.37.0               |     pyhd3eb1b0_1          33 KB
     xz-5.2.5                   |       h1de35cc_0         240 KB
     ------------------------------------------------------------
                                            Total:        29.3 MB
     
    The following NEW packages will be INSTALLED:
     
      ca-certificates    pkgs/main/osx-64::ca-certificates-2021.7.5-hecd8cb5_1
      certifi            pkgs/main/osx-64::certifi-2021.5.30-py37hecd8cb5_0
      libcxx             pkgs/main/osx-64::libcxx-12.0.0-h2f01273_0
      libffi             pkgs/main/osx-64::libffi-3.3-hb1e8313_2
      ncurses            pkgs/main/osx-64::ncurses-6.2-h0a44026_1
      openssl            pkgs/main/osx-64::openssl-1.1.1l-h9ed2024_0
      pip                pkgs/main/osx-64::pip-21.0.1-py37hecd8cb5_0
      python             pkgs/main/osx-64::python-3.7.11-h88f2d9e_0
      readline           pkgs/main/osx-64::readline-8.1-h9ed2024_0
      setuptools         pkgs/main/osx-64::setuptools-52.0.0-py37hecd8cb5_0
      sqlite             pkgs/main/osx-64::sqlite-3.36.0-hce871da_0
      tk                 pkgs/main/osx-64::tk-8.6.10-hb0a8c7a_0
      wheel              pkgs/main/noarch::wheel-0.37.0-pyhd3eb1b0_1
      xz                 pkgs/main/osx-64::xz-5.2.5-h1de35cc_0
      zlib               pkgs/main/osx-64::zlib-1.2.11-h1de35cc_3
     
     
    Proceed ([y]/n)? _
    
  5. Type y to proceed.

    Downloading and Extracting Packages
    readline-8.1         | 333 KB    | ##################################### | 100%
    certifi-2021.5.30    | 139 KB    | ##################################### | 100%
    ncurses-6.2          | 749 KB    | ##################################### | 100%
    openssl-1.1.1l       | 2.2 MB    | ##################################### | 100%
    libffi-3.3           | 44 KB     | ##################################### | 100%
    python-3.7.11        | 18.1 MB   | ##################################### | 100%
    pip-21.0.1           | 1.8 MB    | ##################################### | 100%
    xz-5.2.5             | 240 KB    | ##################################### | 100%
    setuptools-52.0.0    | 721 KB    | ##################################### | 100%
    libcxx-12.0.0        | 805 KB    | ##################################### | 100%
    wheel-0.37.0         | 33 KB     | ##################################### | 100%
    sqlite-3.36.0        | 1.1 MB    | ##################################### | 100%
    tk-8.6.10            | 3.0 MB    | ##################################### | 100%
    ca-certificates-2021 | 113 KB    | ##################################### | 100%
    Preparing transaction: done
    Verifying transaction: done
    Executing transaction: done
    #
    # To activate this environment, use
    #
    #     $ conda activate airflow-tutorial
    #
    # To deactivate an active environment, use
    #
    #     $ conda deactivate
    
  6. Activate Conda:

    conda activate "$AIRFLOW_HOME"
    

    You should now see “(airflow-tutorial)” above the Terminal prompt.

    NOTE: The above is instead of

    pip install apache-airflow-upgrade-check
  7. Verify install:

    airflow version 2>/dev/null

    The 2>/dev/null is to ignore some errors:

    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:287: FutureWarning: The hostname_callable setting in [core] has the old default value of 'socket:getfqdn'. This value has been changed to 'socket.getfqdn' in the running config, but please update your config before Apache Airflow 2.1.
      FutureWarning,
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:287: FutureWarning: The navbar_color setting in [webserver] has the old default value of '#007A87'. This value has been changed to '#fff' in the running config, but please update your config before Apache Airflow 2.1.
      FutureWarning,
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345: DeprecationWarning: The log_format option in [core] has been moved to the log_format option in [logging] - the old setting has been used, but please update your config.
      option = self._get_option_from_config_file(deprecated_key, deprecated_section, key, kwargs, section)
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345: DeprecationWarning: The simple_log_format option in [core] has been moved to the simple_log_format option in [logging] - the old setting has been used, but please update your config.
      option = self._get_option_from_config_file(deprecated_key, deprecated_section, key, kwargs, section)
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The hide_sensitive_variable_fields option in [admin] has been moved to the hide_sensitive_var_conn_fields option in [core] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The logging_config_class option in [core] has been moved to the logging_config_class option in [logging] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The logging_level option in [core] has been moved to the logging_level option in [logging] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The fab_logging_level option in [core] has been moved to the fab_logging_level option in [logging] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The colored_log_format option in [core] has been moved to the colored_log_format option in [logging] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The colored_console_log option in [core] has been moved to the colored_console_log option in [logging] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The colored_formatter_class option in [core] has been moved to the colored_formatter_class option in [logging] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The base_log_folder option in [core] has been moved to the base_log_folder option in [logging] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The dag_processor_manager_log_location option in [core] has been moved to the dag_processor_manager_log_location option in [logging] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The log_filename_template option in [core] has been moved to the log_filename_template option in [logging] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The log_processor_filename_template option in [core] has been moved to the log_processor_filename_template option in [logging] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The remote_logging option in [core] has been moved to the remote_logging option in [logging] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The task_log_reader option in [core] has been moved to the task_log_reader option in [logging] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The default_queue option in [celery] has been moved to the default_queue option in [operators] - the old setting has been used, but please update your config.
    

    The expected response is:

    usage: airflow [-h] GROUP_OR_COMMAND ...
     
    positional arguments:
      GROUP_OR_COMMAND
     
     Groups:
       celery         Celery components
       config         View configuration
       connections    Manage connections
       dags           Manage DAGs
       db             Database operations
       jobs           Manage jobs
       kubernetes     Tools to help run the KubernetesExecutor
       pools          Manage pools
       providers      Display providers
       roles          Manage roles
       tasks          Manage tasks
       users          Manage users
       variables      Manage variables
     
     Commands:
       cheat-sheet    Display cheat sheet
       info           Show information about current Airflow and environment
       kerberos       Start a kerberos ticket renewer
       plugins        Dump information about loaded plugins
       rotate-fernet-key
                      Rotate encrypted connection credentials and variables
       scheduler      Start a scheduler instance
       sync-perm      Update permissions for existing roles and optionally DAGs
       version        Show the version
       webserver      Start a Airflow webserver instance
     
    optional arguments:
      -h, --help         show this help message and exit
     
    airflow command error: the following arguments are required: GROUP_OR_COMMAND, see help above.
    
  8. Define

    export AIRFLOW_VERSION=$( airflow version 2>/dev/null )  # 2.1.3 at time of writing
    echo "$AIRFLOW_VERSION"
    

    Some installation tutorials said “1.10.10” which is now obsolete.

  9. Install Airflow extras using pip:

    pip install apache-airflow[gcp,statsd,sentry]==”$AIRFLOW_VERSION”

    Alternately, for Zsh:

    pip install ‘apache-airflow[gcp,statsd,sentry]’==”$AIRFLOW_VERSION”

    That installs a lot.

Other tutorials:

https://www.youtube.com/watch?v=AHMm1wfGuHE Airflow tutorial 1: Introduction to Apache Airflow

### Configure airflow.cfg.

  1. Initialize the airflow database while in the airflow-tutorial working directory.

    cd "$AIRFLOW_HOME"
    airflow db init  2>/dev/null
    

    NOTE: “airflow command error: argument GROUP_OR_COMMAND: airflow initdb command, has been removed, please use airflow db init, see help above.

    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:287: FutureWarning: The hostname_callable setting in [core] has the old default value of 'socket:getfqdn'. This value has been changed to 'socket.getfqdn' in the running config, but please update your config before Apache Airflow 2.1.
      FutureWarning,
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:287: FutureWarning: The navbar_color setting in [webserver] has the old default value of '#007A87'. This value has been changed to '#fff' in the running config, but please update your config before Apache Airflow 2.1.
      FutureWarning,
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345: DeprecationWarning: The log_format option in [core] has been moved to the log_format option in [logging] - the old setting has been used, but please update your config.
      option = self._get_option_from_config_file(deprecated_key, deprecated_section, key, kwargs, section)
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345: DeprecationWarning: The simple_log_format option in [core] has been moved to the simple_log_format option in [logging] - the old setting has been used, but please update your config.
      option = self._get_option_from_config_file(deprecated_key, deprecated_section, key, kwargs, section)
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The hide_sensitive_variable_fields option in [admin] has been moved to the hide_sensitive_var_conn_fields option in [core] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The logging_config_class option in [core] has been moved to the logging_config_class option in [logging] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The logging_level option in [core] has been moved to the logging_level option in [logging] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The fab_logging_level option in [core] has been moved to the fab_logging_level option in [logging] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The colored_log_format option in [core] has been moved to the colored_log_format option in [logging] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The colored_console_log option in [core] has been moved to the colored_console_log option in [logging] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The colored_formatter_class option in [core] has been moved to the colored_formatter_class option in [logging] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The base_log_folder option in [core] has been moved to the base_log_folder option in [logging] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The dag_processor_manager_log_location option in [core] has been moved to the dag_processor_manager_log_location option in [logging] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The log_filename_template option in [core] has been moved to the log_filename_template option in [logging] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The log_processor_filename_template option in [core] has been moved to the log_processor_filename_template option in [logging] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The remote_logging option in [core] has been moved to the remote_logging option in [logging] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The task_log_reader option in [core] has been moved to the task_log_reader option in [logging] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The default_queue option in [celery] has been moved to the default_queue option in [operators] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The statsd_on option in [scheduler] has been moved to the statsd_on option in [metrics] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The default_queue option in [celery] has been moved to the default_queue option in [operators] - the old setting has been used, but please update your config.
    DB: sqlite:////Users/wilsonmar/airflow-tutorial/airflow.db
    [2021-09-12 15:00:19,522] {db.py:702} INFO - Creating tables
    INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
    INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
    INFO  [alembic.runtime.migration] Running upgrade  -> e3a246e0dc1, current schema
    INFO  [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 1507a7289a2f, create is_encrypted
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/alembic/ddl/sqlite.py:75 UserWarning: Skipping unsupported ALTER for creation of implicit constraintPlease refer to the batch mode feature which allows for SQLite migrations using a copy-and-move strategy.
    INFO  [alembic.runtime.migration] Running upgrade 1507a7289a2f -> 13eb55f81627, maintain history for compatibility with earlier migrations
    INFO  [alembic.runtime.migration] Running upgrade 13eb55f81627 -> 338e90f54d61, More logging into task_instance
    INFO  [alembic.runtime.migration] Running upgrade 338e90f54d61 -> 52d714495f0, job_id indices
    INFO  [alembic.runtime.migration] Running upgrade 52d714495f0 -> 502898887f84, Adding extra to Log
    INFO  [alembic.runtime.migration] Running upgrade 502898887f84 -> 1b38cef5b76e, add dagrun
    INFO  [alembic.runtime.migration] Running upgrade 1b38cef5b76e -> 2e541a1dcfed, task_duration
    INFO  [alembic.runtime.migration] Running upgrade 2e541a1dcfed -> 40e67319e3a9, dagrun_config
    INFO  [alembic.runtime.migration] Running upgrade 40e67319e3a9 -> 561833c1c74b, add password column to user
    INFO  [alembic.runtime.migration] Running upgrade 561833c1c74b -> 4446e08588, dagrun start end
    INFO  [alembic.runtime.migration] Running upgrade 4446e08588 -> bbc73705a13e, Add notification_sent column to sla_miss
    INFO  [alembic.runtime.migration] Running upgrade bbc73705a13e -> bba5a7cfc896, Add a column to track the encryption state of the 'Extra' field in connection
    INFO  [alembic.runtime.migration] Running upgrade bba5a7cfc896 -> 1968acfc09e3, add is_encrypted column to variable table
    INFO  [alembic.runtime.migration] Running upgrade 1968acfc09e3 -> 2e82aab8ef20, rename user table
    INFO  [alembic.runtime.migration] Running upgrade 2e82aab8ef20 -> 211e584da130, add TI state index
    INFO  [alembic.runtime.migration] Running upgrade 211e584da130 -> 64de9cddf6c9, add task fails journal table
    INFO  [alembic.runtime.migration] Running upgrade 64de9cddf6c9 -> f2ca10b85618, add dag_stats table
    INFO  [alembic.runtime.migration] Running upgrade f2ca10b85618 -> 4addfa1236f1, Add fractional seconds to mysql tables
    INFO  [alembic.runtime.migration] Running upgrade 4addfa1236f1 -> 8504051e801b, xcom dag task indices
    INFO  [alembic.runtime.migration] Running upgrade 8504051e801b -> 5e7d17757c7a, add pid field to TaskInstance
    INFO  [alembic.runtime.migration] Running upgrade 5e7d17757c7a -> 127d2bf2dfa7, Add dag_id/state index on dag_run table
    INFO  [alembic.runtime.migration] Running upgrade 127d2bf2dfa7 -> cc1e65623dc7, add max tries column to task instance
    WARNI [unusual_prefix_c68bbead3b2adcf8c97e2b3de57b8cea620172da_example_kubernetes_executor_config] Could not import DAGs in example_kubernetes_executor_config.py: No module named 'kubernetes'
    WARNI [unusual_prefix_c68bbead3b2adcf8c97e2b3de57b8cea620172da_example_kubernetes_executor_config] Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']
    INFO  [alembic.runtime.migration] Running upgrade cc1e65623dc7 -> bdaa763e6c56, Make xcom value column a large binary
    INFO  [alembic.runtime.migration] Running upgrade bdaa763e6c56 -> 947454bf1dff, add ti job_id index
    INFO  [alembic.runtime.migration] Running upgrade 947454bf1dff -> d2ae31099d61, Increase text size for MySQL (not relevant for other DBs' text types)
    INFO  [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 0e2a74e0fc9f, Add time zone awareness
    INFO  [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 33ae817a1ff4, kubernetes_resource_checkpointing
    INFO  [alembic.runtime.migration] Running upgrade 33ae817a1ff4 -> 27c6a30d7c24, kubernetes_resource_checkpointing
    INFO  [alembic.runtime.migration] Running upgrade 27c6a30d7c24 -> 86770d1215c0, add kubernetes scheduler uniqueness
    INFO  [alembic.runtime.migration] Running upgrade 86770d1215c0, 0e2a74e0fc9f -> 05f30312d566, merge heads
    INFO  [alembic.runtime.migration] Running upgrade 05f30312d566 -> f23433877c24, fix mysql not null constraint
    INFO  [alembic.runtime.migration] Running upgrade f23433877c24 -> 856955da8476, fix sqlite foreign key
    INFO  [alembic.runtime.migration] Running upgrade 856955da8476 -> 9635ae0956e7, index-faskfail
    INFO  [alembic.runtime.migration] Running upgrade 9635ae0956e7 -> dd25f486b8ea, add idx_log_dag
    INFO  [alembic.runtime.migration] Running upgrade dd25f486b8ea -> bf00311e1990, add index to taskinstance
    INFO  [alembic.runtime.migration] Running upgrade 9635ae0956e7 -> 0a2a5b66e19d, add task_reschedule table
    INFO  [alembic.runtime.migration] Running upgrade 0a2a5b66e19d, bf00311e1990 -> 03bc53e68815, merge_heads_2
    INFO  [alembic.runtime.migration] Running upgrade 03bc53e68815 -> 41f5f12752f8, add superuser field
    INFO  [alembic.runtime.migration] Running upgrade 41f5f12752f8 -> c8ffec048a3b, add fields to dag
    INFO  [alembic.runtime.migration] Running upgrade c8ffec048a3b -> dd4ecb8fbee3, Add schedule interval to dag
    INFO  [alembic.runtime.migration] Running upgrade dd4ecb8fbee3 -> 939bb1e647c8, task reschedule fk on cascade delete
    INFO  [alembic.runtime.migration] Running upgrade 939bb1e647c8 -> 6e96a59344a4, Make TaskInstance.pool not nullable
    INFO  [alembic.runtime.migration] Running upgrade 6e96a59344a4 -> d38e04c12aa2, add serialized_dag table
    INFO  [alembic.runtime.migration] Running upgrade d38e04c12aa2 -> b3b105409875, add root_dag_id to DAG
    INFO  [alembic.runtime.migration] Running upgrade 6e96a59344a4 -> 74effc47d867, change datetime to datetime2(6) on MSSQL tables
    INFO  [alembic.runtime.migration] Running upgrade 939bb1e647c8 -> 004c1210f153, increase queue name size limit
    INFO  [alembic.runtime.migration] Running upgrade c8ffec048a3b -> a56c9515abdc, Remove dag_stat table
    INFO  [alembic.runtime.migration] Running upgrade a56c9515abdc, 004c1210f153, 74effc47d867, b3b105409875 -> 08364691d074, Merge the four heads back together
    INFO  [alembic.runtime.migration] Running upgrade 08364691d074 -> fe461863935f, increase_length_for_connection_password
    INFO  [alembic.runtime.migration] Running upgrade fe461863935f -> 7939bcff74ba, Add DagTags table
    INFO  [alembic.runtime.migration] Running upgrade 7939bcff74ba -> a4c2fd67d16b, add pool_slots field to task_instance
    INFO  [alembic.runtime.migration] Running upgrade a4c2fd67d16b -> 852ae6c715af, Add RenderedTaskInstanceFields table
    INFO  [alembic.runtime.migration] Running upgrade 852ae6c715af -> 952da73b5eff, add dag_code table
    INFO  [alembic.runtime.migration] Running upgrade 952da73b5eff -> a66efa278eea, Add Precision to execution_date in RenderedTaskInstanceFields table
    INFO  [alembic.runtime.migration] Running upgrade a66efa278eea -> da3f683c3a5a, Add dag_hash Column to serialized_dag table
    INFO  [alembic.runtime.migration] Running upgrade da3f683c3a5a -> 92c57b58940d, Create FAB Tables
    INFO  [alembic.runtime.migration] Running upgrade 92c57b58940d -> 03afc6b6f902, Increase length of FAB ab_view_menu.name column
    INFO  [alembic.runtime.migration] Running upgrade 03afc6b6f902 -> cf5dc11e79ad, drop_user_and_chart
    INFO  [alembic.runtime.migration] Running upgrade cf5dc11e79ad -> bbf4a7ad0465, Remove id column from xcom
    INFO  [alembic.runtime.migration] Running upgrade bbf4a7ad0465 -> b25a55525161, Increase length of pool name
    INFO  [alembic.runtime.migration] Running upgrade b25a55525161 -> 3c20cacc0044, Add DagRun run_type
    INFO  [alembic.runtime.migration] Running upgrade 3c20cacc0044 -> 8f966b9c467a, Set conn_type as non-nullable
    INFO  [alembic.runtime.migration] Running upgrade 8f966b9c467a -> 8d48763f6d53, add unique constraint to conn_id
    INFO  [alembic.runtime.migration] Running upgrade 8d48763f6d53 -> e38be357a868, Add sensor_instance table
    INFO  [alembic.runtime.migration] Running upgrade e38be357a868 -> b247b1e3d1ed, Add queued by Job ID to TI
    INFO  [alembic.runtime.migration] Running upgrade b247b1e3d1ed -> e1a11ece99cc, Add external executor ID to TI
    INFO  [alembic.runtime.migration] Running upgrade e1a11ece99cc -> bef4f3d11e8b, Drop KubeResourceVersion and KubeWorkerId
    INFO  [alembic.runtime.migration] Running upgrade bef4f3d11e8b -> 98271e7606e2, Add scheduling_decision to DagRun and DAG
    INFO  [alembic.runtime.migration] Running upgrade 98271e7606e2 -> 52d53670a240, fix_mssql_exec_date_rendered_task_instance_fields_for_MSSQL
    INFO  [alembic.runtime.migration] Running upgrade 52d53670a240 -> 364159666cbd, Add creating_job_id to DagRun table
    INFO  [alembic.runtime.migration] Running upgrade 364159666cbd -> 45ba3f1493b9, add-k8s-yaml-to-rendered-templates
    INFO  [alembic.runtime.migration] Running upgrade 45ba3f1493b9 -> 849da589634d, Prefix DAG permissions.
    INFO  [alembic.runtime.migration] Running upgrade 849da589634d -> 2c6edca13270, Resource based permissions.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/settings.py:395 DeprecationWarning: `session_lifetime_days` option from `[webserver]` section has been renamed to `session_lifetime_minutes`. The new option allows to configure session lifetime in minutes. The `force_log_out_after` option has been removed from `[webserver]` section. Please update your configuration.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/www/app.py:89 DeprecationWarning: Old deprecated value found for `cookie_samesite` option in `[webserver]` section. Using `Lax` instead. Change the value to `Lax` in airflow.cfg to remove this warning.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The logging_config_class option in [core] has been moved to the logging_config_class option in [logging] - the old setting has been used, but please update your config.
    /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The task_log_reader option in [core] has been moved to the task_log_reader option in [logging] - the old setting has been used, but please update your config.
    [2021-09-12 15:00:22,002] {manager.py:788} WARNING - No user yet created, use flask fab command to do it.
    INFO  [alembic.runtime.migration] Running upgrade 2c6edca13270 -> 61ec73d9401f, Add description field to connection
    INFO  [alembic.runtime.migration] Running upgrade 61ec73d9401f -> 64a7d6477aae, fix description field in connection to be text
    INFO  [alembic.runtime.migration] Running upgrade 64a7d6477aae -> e959f08ac86c, Change field in DagCode to MEDIUMTEXT for MySql
    INFO  [alembic.runtime.migration] Running upgrade e959f08ac86c -> 82b7c48c147f, Remove can_read permission on config resource for User and Viewer role
    [2021-09-12 15:00:25,041] {manager.py:788} WARNING - No user yet created, use flask fab command to do it.
    INFO  [alembic.runtime.migration] Running upgrade 82b7c48c147f -> 449b4072c2da, Increase size of connection.extra field to handle multiple RSA keys
    INFO  [alembic.runtime.migration] Running upgrade 449b4072c2da -> 8646922c8a04, Change default pool_slots to 1
    INFO  [alembic.runtime.migration] Running upgrade 8646922c8a04 -> 2e42bb497a22, rename last_scheduler_run column
    INFO  [alembic.runtime.migration] Running upgrade 2e42bb497a22 -> 90d1635d7b86, Increase pool name size in TaskInstance
    INFO  [alembic.runtime.migration] Running upgrade 90d1635d7b86 -> e165e7455d70, add description field to variable
    INFO  [alembic.runtime.migration] Running upgrade e165e7455d70 -> a13f7613ad25, Resource based permissions for default FAB views.
    [2021-09-12 15:00:26,292] {manager.py:788} WARNING - No user yet created, use flask fab command to do it.
    INFO  [alembic.runtime.migration] Running upgrade a13f7613ad25 -> 97cdd93827b8, Add queued_at column to dagrun table
    INFO  [airflow.models.dagbag.DagBag] Filling up the DagBag from /Users/wilsonmar/airflow-tutorial/dags
    WARNI [unusual_prefix_c68bbead3b2adcf8c97e2b3de57b8cea620172da_example_kubernetes_executor_config] Could not import DAGs in example_kubernetes_executor_config.py: No module named 'kubernetes'
    WARNI [unusual_prefix_c68bbead3b2adcf8c97e2b3de57b8cea620172da_example_kubernetes_executor_config] Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']
    INFO  [airflow.models.dag] Sync 33 DAGs
    INFO  [airflow.models.dag] Creating ORM DAG for example_trigger_controller_dag
    INFO  [airflow.models.dag] Creating ORM DAG for latest_only_with_trigger
    INFO  [airflow.models.dag] Creating ORM DAG for tutorial
    INFO  [airflow.models.dag] Creating ORM DAG for example_task_group_decorator
    INFO  [airflow.models.dag] Creating ORM DAG for tutorial_taskflow_api_etl_virtualenv
    INFO  [airflow.models.dag] Creating ORM DAG for example_python_operator
    INFO  [airflow.models.dag] Creating ORM DAG for example_complex
    INFO  [airflow.models.dag] Creating ORM DAG for example_external_task_marker_child
    INFO  [airflow.models.dag] Creating ORM DAG for example_xcom_args
    INFO  [airflow.models.dag] Creating ORM DAG for example_subdag_operator
    INFO  [airflow.models.dag] Creating ORM DAG for example_xcom
    INFO  [airflow.models.dag] Creating ORM DAG for example_short_circuit_operator
    INFO  [airflow.models.dag] Creating ORM DAG for example_task_group
    INFO  [airflow.models.dag] Creating ORM DAG for example_weekday_branch_operator
    INFO  [airflow.models.dag] Creating ORM DAG for example_dag_decorator
    INFO  [airflow.models.dag] Creating ORM DAG for example_trigger_target_dag
    INFO  [airflow.models.dag] Creating ORM DAG for example_subdag_operator.section-1
    INFO  [airflow.models.dag] Creating ORM DAG for tutorial_etl_dag
    INFO  [airflow.models.dag] Creating ORM DAG for latest_only
    INFO  [airflow.models.dag] Creating ORM DAG for example_skip_dag
    INFO  [airflow.models.dag] Creating ORM DAG for example_branch_operator
    INFO  [airflow.models.dag] Creating ORM DAG for example_branch_datetime_operator_2
    INFO  [airflow.models.dag] Creating ORM DAG for example_bash_operator
    INFO  [airflow.models.dag] Creating ORM DAG for example_nested_branch_dag
    INFO  [airflow.models.dag] Creating ORM DAG for tutorial_taskflow_api_etl
    INFO  [airflow.models.dag] Creating ORM DAG for example_external_task_marker_parent
    INFO  [airflow.models.dag] Creating ORM DAG for example_branch_dop_operator_v3
    INFO  [airflow.models.dag] Creating ORM DAG for test_utils
    INFO  [airflow.models.dag] Creating ORM DAG for example_xcom_args_with_operators
    INFO  [airflow.models.dag] Creating ORM DAG for example_subdag_operator.section-2
    INFO  [airflow.models.dag] Creating ORM DAG for example_passing_params_via_test_command
    INFO  [airflow.models.dag] Creating ORM DAG for example_kubernetes_executor
    INFO  [airflow.models.dag] Creating ORM DAG for example_branch_labels
    INFO  [airflow.models.dag] Setting next_dagrun for example_bash_operator to 2021-09-10 00:00:00+00:00
    INFO  [airflow.models.dag] Setting next_dagrun for example_branch_datetime_operator_2 to 2021-09-10 00:00:00+00:00
    INFO  [airflow.models.dag] Setting next_dagrun for example_branch_dop_operator_v3 to 2021-09-10 00:00:00+00:00
    INFO  [airflow.models.dag] Setting next_dagrun for example_branch_labels to 2021-09-10 00:00:00+00:00
    INFO  [airflow.models.dag] Setting next_dagrun for example_branch_operator to 2021-09-10 00:00:00+00:00
    INFO  [airflow.models.dag] Setting next_dagrun for example_complex to None
    INFO  [airflow.models.dag] Setting next_dagrun for example_dag_decorator to None
    INFO  [airflow.models.dag] Setting next_dagrun for example_external_task_marker_child to None
    INFO  [airflow.models.dag] Setting next_dagrun for example_external_task_marker_parent to None
    INFO  [airflow.models.dag] Setting next_dagrun for example_kubernetes_executor to None
    INFO  [airflow.models.dag] Setting next_dagrun for example_nested_branch_dag to 2021-09-10 00:00:00+00:00
    INFO  [airflow.models.dag] Setting next_dagrun for example_passing_params_via_test_command to 2021-09-11 00:00:00+00:00
    INFO  [airflow.models.dag] Setting next_dagrun for example_python_operator to None
    INFO  [airflow.models.dag] Setting next_dagrun for example_short_circuit_operator to 2021-09-10 00:00:00+00:00
    INFO  [airflow.models.dag] Setting next_dagrun for example_skip_dag to 2021-09-10 00:00:00+00:00
    INFO  [airflow.models.dag] Setting next_dagrun for example_subdag_operator to 2021-09-10 00:00:00+00:00
    INFO  [airflow.models.dag] Setting next_dagrun for example_subdag_operator.section-1 to None
    INFO  [airflow.models.dag] Setting next_dagrun for example_subdag_operator.section-2 to None
    INFO  [airflow.models.dag] Setting next_dagrun for example_task_group to 2021-09-10 00:00:00+00:00
    INFO  [airflow.models.dag] Setting next_dagrun for example_task_group_decorator to 2021-09-10 00:00:00+00:00
    INFO  [airflow.models.dag] Setting next_dagrun for example_trigger_controller_dag to 2021-09-10 00:00:00+00:00
    INFO  [airflow.models.dag] Setting next_dagrun for example_trigger_target_dag to None
    INFO  [airflow.models.dag] Setting next_dagrun for example_weekday_branch_operator to 2021-09-10 00:00:00+00:00
    INFO  [airflow.models.dag] Setting next_dagrun for example_xcom to 2021-09-10 00:00:00+00:00
    INFO  [airflow.models.dag] Setting next_dagrun for example_xcom_args to None
    INFO  [airflow.models.dag] Setting next_dagrun for example_xcom_args_with_operators to None
    INFO  [airflow.models.dag] Setting next_dagrun for latest_only to 2021-09-10 00:00:00+00:00
    INFO  [airflow.models.dag] Setting next_dagrun for latest_only_with_trigger to 2021-09-10 00:00:00+00:00
    INFO  [airflow.models.dag] Setting next_dagrun for test_utils to None
    INFO  [airflow.models.dag] Setting next_dagrun for tutorial to 2021-09-10 00:00:00+00:00
    INFO  [airflow.models.dag] Setting next_dagrun for tutorial_etl_dag to None
    INFO  [airflow.models.dag] Setting next_dagrun for tutorial_taskflow_api_etl to None
    INFO  [airflow.models.dag] Setting next_dagrun for tutorial_taskflow_api_etl_virtualenv to None
    INFO  [airflow.models.dag] Sync 2 DAGs
    INFO  [airflow.models.dag] Setting next_dagrun for example_subdag_operator.section-1 to None
    INFO  [airflow.models.dag] Setting next_dagrun for example_subdag_operator.section-2 to None
    Initialization done
    
  2. Exec:

    CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
    echo "$CONSTRAINT_URL"
    

    Expected response:

    https://raw.githubusercontent.com/apache/airflow/constraints-2.1.3/constraints-3.7.txt
    

    It’s contents:

    # Editable install with no version control (apache-airflow==2.1.3)
    APScheduler==3.6.3
    ...
    
  3. Do according to Airflow’s instructions:

    pip install --upgrade "apache-airflow[postgres,google]==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
    
  4. Download initial file:

    curl ???
  5. Run your own Airflow workflow:

    python "$AIRFLOW_HOME/dags/tutorial.py"

    Now both scheduler and webserver should be running on localhost:8080

Start

  1. Execute

    airflow scheduler
  2. Start Airflow server:

    airflow webserver
    
  3. Start airflow with

    python -m airflow
    

Airflow Pipeline

https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html

tasks named t1, t2, etc.

Imports

from datetime import timedelta
from textwrap import dedent
 
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
 
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
   

Default Arguments (Args)

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
   

Instantiate

with DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=days_ago(2),
    tags=['example'],
) as dag:

Tasks

Python bit-wise operators are used to specify dependencies:

   first_task >> [second_task, third_task]
   third_task << fourth_task
   

Alternately, use the more explicit set_upstream and set_downstream methods:

first_task.set_downstream(second_task, third_task) third_task.set_upstream(fourth_task)

Install Husky