Workflow as Code (WaC)
Overview
Airflow is a platform to programmatically author, schedule, and monitor workflows to process data where output of one element is the input of the next element.
A “data workflow” is a sequence of tasks which begin by a triggering event.
It’s a hot market, so Airflow’s competitor includes Prefect, Streamz.
NOTE: Content here are my personal opinions, and not intended to represent any employer (past or present). “PROTIP:” here highlight information I haven’t seen elsewhere on the internet because it is hard-won, little-know but significant facts based on my personal research and experience.
The Airflow scheduler executes tasks on an array of workers while following specified dependencies.
Use Airflow to author workflows as DAGs (Directed Acyclic Graphs) of tasks (named t1, t2, etc.).
- “Directed” meaning sequenced
- “Acyclic” meaning flow is one-way
- https://www.youtube.com/watch?v=XD7euLOzKbs
Airflow instantiates pipelines dynamically.
https://www.youtube.com/watch?v=l764YAGPlIs Scalable Data Ingestion Architecture Using Airflow and Spark | Komodo Health Data Council
When workflows are defined as code so they become more maintainable, versionable, testable, and collaborative.
Airflow uses operators for working with Python, Postgres, Bash, Email, etc.
Airflow pipelines are lean and explicit. Parameterizing your scripts is built into the core of Airflow using the powerful Jinja templating engine.
Rich command line utilities make performing complex surgeries on DAGs a snap.
A rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.
Airflow is commonly used to process data, but has the opinion that tasks should ideally be idempotent (i.e., results of the task will be the same, and will not create duplicated data in a destination system).
Airflow is not designed to pass large quantities of data from one task to the next (though tasks can pass metadata using Airflow’s Xcom feature).
For high-volume, data-intensive tasks, a best practice is to delegate to external services specializing in that type of work.
Amazon
https://docs.aws.amazon.com/mwaa/latest/userguide/what-is-mwaa.html
Intro Videos
https://www.youtube.com/results?search_query=apache+airflow Among the list of videos:
VIDEO: 🌈Apache Airflow for beginners by Varya Karpenko references https://github.com/KarpenkoVarya/airflow_for_beginners which sets up a data pipline to get a fresh portion of Stack Overflow questions with tag pandas to our mailbox daily.
https://www.youtube.com/watch?v=i25ttd32-eo Airflow for Beginners - Run Spotify ETL Job in 15 minutes! by Karolina Sowinska referencing https://github.com/karolina-sowinska/free-data-engineering-course-for-beginners where main.py calls spotify_dag.py calls spotify_etl.py
https://github.com/apache/airflow is “Configuration as Code” (Python)
https://gtoonstra.github.io/etl-with-airflow/etlexample.html
Tutorials
For those looking to get started on their journey becoming an Airflow Developer please take a look at this course from Alexandra Abbas https://www.udemy.com/course/apache-airflow-course/
Install Airflow executables
VIDEO https://airflow.apache.org/docs/apache-airflow/stable/installation.html
Airflow was tested on Ubuntu Buster LTS.
https://airflow.apache.org/docs/apache-airflow/stable/installation.html
See https://arpitrana.medium.com/install-airflow-on-macos-guide-fc66399b2a9e
-
Create airflow folder under the root folder:
mkdir -p $HOME/airflow/dags
-
Define the path as the AIRFLOW_HOME environment variable in ~/.bash_profile
export AIRFLOW_HOME=$HOME/airflow-tutorial cd "$AIRFLOW_HOME
-
Airflow is installed in ~/.local/bin so in .bash_profile:
PATH=$PATH:~/.local/bin ls ~/.local/bin
WARNING: brew info airflow refers to a video watching app for AppleTV.
-
Since Airflow uses obsolete Python 3.7, instead of pyenv, use Conda:
pyenv install 3.7.10Upgrade Conda:
conda update -n base -c defaults conda
cd "$AIRFLOW_HOME" conda create --name airflow-tutorial python=3.7
At time of writing (12 SEP 2021):
/usr/local/anaconda3/lib/python3.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.26.6) or chardet (3.0.4) doesn't match a supported version! RequestsDependencyWarning) Collecting package metadata (current_repodata.json): done Solving environment: done ## Package Plan ## environment location: /usr/local/anaconda3/envs/airflow-tutorial added / updated specs: - python=3.7 The following packages will be downloaded: package | build ---------------------------|----------------- ca-certificates-2021.7.5 | hecd8cb5_1 113 KB certifi-2021.5.30 | py37hecd8cb5_0 139 KB libcxx-12.0.0 | h2f01273_0 805 KB libffi-3.3 | hb1e8313_2 44 KB ncurses-6.2 | h0a44026_1 749 KB openssl-1.1.1l | h9ed2024_0 2.2 MB pip-21.0.1 | py37hecd8cb5_0 1.8 MB python-3.7.11 | h88f2d9e_0 18.1 MB readline-8.1 | h9ed2024_0 333 KB setuptools-52.0.0 | py37hecd8cb5_0 721 KB sqlite-3.36.0 | hce871da_0 1.1 MB tk-8.6.10 | hb0a8c7a_0 3.0 MB wheel-0.37.0 | pyhd3eb1b0_1 33 KB xz-5.2.5 | h1de35cc_0 240 KB ------------------------------------------------------------ Total: 29.3 MB The following NEW packages will be INSTALLED: ca-certificates pkgs/main/osx-64::ca-certificates-2021.7.5-hecd8cb5_1 certifi pkgs/main/osx-64::certifi-2021.5.30-py37hecd8cb5_0 libcxx pkgs/main/osx-64::libcxx-12.0.0-h2f01273_0 libffi pkgs/main/osx-64::libffi-3.3-hb1e8313_2 ncurses pkgs/main/osx-64::ncurses-6.2-h0a44026_1 openssl pkgs/main/osx-64::openssl-1.1.1l-h9ed2024_0 pip pkgs/main/osx-64::pip-21.0.1-py37hecd8cb5_0 python pkgs/main/osx-64::python-3.7.11-h88f2d9e_0 readline pkgs/main/osx-64::readline-8.1-h9ed2024_0 setuptools pkgs/main/osx-64::setuptools-52.0.0-py37hecd8cb5_0 sqlite pkgs/main/osx-64::sqlite-3.36.0-hce871da_0 tk pkgs/main/osx-64::tk-8.6.10-hb0a8c7a_0 wheel pkgs/main/noarch::wheel-0.37.0-pyhd3eb1b0_1 xz pkgs/main/osx-64::xz-5.2.5-h1de35cc_0 zlib pkgs/main/osx-64::zlib-1.2.11-h1de35cc_3 Proceed ([y]/n)? _
-
Type y to proceed.
Downloading and Extracting Packages readline-8.1 | 333 KB | ##################################### | 100% certifi-2021.5.30 | 139 KB | ##################################### | 100% ncurses-6.2 | 749 KB | ##################################### | 100% openssl-1.1.1l | 2.2 MB | ##################################### | 100% libffi-3.3 | 44 KB | ##################################### | 100% python-3.7.11 | 18.1 MB | ##################################### | 100% pip-21.0.1 | 1.8 MB | ##################################### | 100% xz-5.2.5 | 240 KB | ##################################### | 100% setuptools-52.0.0 | 721 KB | ##################################### | 100% libcxx-12.0.0 | 805 KB | ##################################### | 100% wheel-0.37.0 | 33 KB | ##################################### | 100% sqlite-3.36.0 | 1.1 MB | ##################################### | 100% tk-8.6.10 | 3.0 MB | ##################################### | 100% ca-certificates-2021 | 113 KB | ##################################### | 100% Preparing transaction: done Verifying transaction: done Executing transaction: done # # To activate this environment, use # # $ conda activate airflow-tutorial # # To deactivate an active environment, use # # $ conda deactivate
-
Activate Conda:
conda activate "$AIRFLOW_HOME"
You should now see “(airflow-tutorial)” above the Terminal prompt.
NOTE: The above is instead of
pip install apache-airflow-upgrade-check -
Verify install:
airflow version 2>/dev/null
The 2>/dev/null is to ignore some errors:
/usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:287: FutureWarning: The hostname_callable setting in [core] has the old default value of 'socket:getfqdn'. This value has been changed to 'socket.getfqdn' in the running config, but please update your config before Apache Airflow 2.1. FutureWarning, /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:287: FutureWarning: The navbar_color setting in [webserver] has the old default value of '#007A87'. This value has been changed to '#fff' in the running config, but please update your config before Apache Airflow 2.1. FutureWarning, /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345: DeprecationWarning: The log_format option in [core] has been moved to the log_format option in [logging] - the old setting has been used, but please update your config. option = self._get_option_from_config_file(deprecated_key, deprecated_section, key, kwargs, section) /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345: DeprecationWarning: The simple_log_format option in [core] has been moved to the simple_log_format option in [logging] - the old setting has been used, but please update your config. option = self._get_option_from_config_file(deprecated_key, deprecated_section, key, kwargs, section) /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The hide_sensitive_variable_fields option in [admin] has been moved to the hide_sensitive_var_conn_fields option in [core] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The logging_config_class option in [core] has been moved to the logging_config_class option in [logging] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The logging_level option in [core] has been moved to the logging_level option in [logging] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The fab_logging_level option in [core] has been moved to the fab_logging_level option in [logging] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The colored_log_format option in [core] has been moved to the colored_log_format option in [logging] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The colored_console_log option in [core] has been moved to the colored_console_log option in [logging] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The colored_formatter_class option in [core] has been moved to the colored_formatter_class option in [logging] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The base_log_folder option in [core] has been moved to the base_log_folder option in [logging] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The dag_processor_manager_log_location option in [core] has been moved to the dag_processor_manager_log_location option in [logging] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The log_filename_template option in [core] has been moved to the log_filename_template option in [logging] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The log_processor_filename_template option in [core] has been moved to the log_processor_filename_template option in [logging] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The remote_logging option in [core] has been moved to the remote_logging option in [logging] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The task_log_reader option in [core] has been moved to the task_log_reader option in [logging] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The default_queue option in [celery] has been moved to the default_queue option in [operators] - the old setting has been used, but please update your config.
The expected response is:
usage: airflow [-h] GROUP_OR_COMMAND ... positional arguments: GROUP_OR_COMMAND Groups: celery Celery components config View configuration connections Manage connections dags Manage DAGs db Database operations jobs Manage jobs kubernetes Tools to help run the KubernetesExecutor pools Manage pools providers Display providers roles Manage roles tasks Manage tasks users Manage users variables Manage variables Commands: cheat-sheet Display cheat sheet info Show information about current Airflow and environment kerberos Start a kerberos ticket renewer plugins Dump information about loaded plugins rotate-fernet-key Rotate encrypted connection credentials and variables scheduler Start a scheduler instance sync-perm Update permissions for existing roles and optionally DAGs version Show the version webserver Start a Airflow webserver instance optional arguments: -h, --help show this help message and exit airflow command error: the following arguments are required: GROUP_OR_COMMAND, see help above.
-
Define
export AIRFLOW_VERSION=$( airflow version 2>/dev/null ) # 2.1.3 at time of writing echo "$AIRFLOW_VERSION"
Some installation tutorials said “1.10.10” which is now obsolete.
-
Install Airflow extras using pip:
pip install apache-airflow[gcp,statsd,sentry]==”$AIRFLOW_VERSION”
Alternately, for Zsh:
pip install ‘apache-airflow[gcp,statsd,sentry]’==”$AIRFLOW_VERSION”
That installs a lot.
Other tutorials:
https://www.youtube.com/watch?v=AHMm1wfGuHE Airflow tutorial 1: Introduction to Apache Airflow
### Configure airflow.cfg.
-
Initialize the airflow database while in the airflow-tutorial working directory.
cd "$AIRFLOW_HOME" airflow db init 2>/dev/null
NOTE: “airflow command error: argument GROUP_OR_COMMAND:
airflow initdb
command, has been removed, please useairflow db init
, see help above./usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:287: FutureWarning: The hostname_callable setting in [core] has the old default value of 'socket:getfqdn'. This value has been changed to 'socket.getfqdn' in the running config, but please update your config before Apache Airflow 2.1. FutureWarning, /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:287: FutureWarning: The navbar_color setting in [webserver] has the old default value of '#007A87'. This value has been changed to '#fff' in the running config, but please update your config before Apache Airflow 2.1. FutureWarning, /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345: DeprecationWarning: The log_format option in [core] has been moved to the log_format option in [logging] - the old setting has been used, but please update your config. option = self._get_option_from_config_file(deprecated_key, deprecated_section, key, kwargs, section) /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345: DeprecationWarning: The simple_log_format option in [core] has been moved to the simple_log_format option in [logging] - the old setting has been used, but please update your config. option = self._get_option_from_config_file(deprecated_key, deprecated_section, key, kwargs, section) /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The hide_sensitive_variable_fields option in [admin] has been moved to the hide_sensitive_var_conn_fields option in [core] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The logging_config_class option in [core] has been moved to the logging_config_class option in [logging] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The logging_level option in [core] has been moved to the logging_level option in [logging] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The fab_logging_level option in [core] has been moved to the fab_logging_level option in [logging] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The colored_log_format option in [core] has been moved to the colored_log_format option in [logging] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The colored_console_log option in [core] has been moved to the colored_console_log option in [logging] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The colored_formatter_class option in [core] has been moved to the colored_formatter_class option in [logging] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The base_log_folder option in [core] has been moved to the base_log_folder option in [logging] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The dag_processor_manager_log_location option in [core] has been moved to the dag_processor_manager_log_location option in [logging] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The log_filename_template option in [core] has been moved to the log_filename_template option in [logging] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The log_processor_filename_template option in [core] has been moved to the log_processor_filename_template option in [logging] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The remote_logging option in [core] has been moved to the remote_logging option in [logging] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The task_log_reader option in [core] has been moved to the task_log_reader option in [logging] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The default_queue option in [celery] has been moved to the default_queue option in [operators] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The statsd_on option in [scheduler] has been moved to the statsd_on option in [metrics] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The default_queue option in [celery] has been moved to the default_queue option in [operators] - the old setting has been used, but please update your config. DB: sqlite:////Users/wilsonmar/airflow-tutorial/airflow.db [2021-09-12 15:00:19,522] {db.py:702} INFO - Creating tables INFO [alembic.runtime.migration] Context impl SQLiteImpl. INFO [alembic.runtime.migration] Will assume non-transactional DDL. INFO [alembic.runtime.migration] Running upgrade -> e3a246e0dc1, current schema INFO [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 1507a7289a2f, create is_encrypted /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/alembic/ddl/sqlite.py:75 UserWarning: Skipping unsupported ALTER for creation of implicit constraintPlease refer to the batch mode feature which allows for SQLite migrations using a copy-and-move strategy. INFO [alembic.runtime.migration] Running upgrade 1507a7289a2f -> 13eb55f81627, maintain history for compatibility with earlier migrations INFO [alembic.runtime.migration] Running upgrade 13eb55f81627 -> 338e90f54d61, More logging into task_instance INFO [alembic.runtime.migration] Running upgrade 338e90f54d61 -> 52d714495f0, job_id indices INFO [alembic.runtime.migration] Running upgrade 52d714495f0 -> 502898887f84, Adding extra to Log INFO [alembic.runtime.migration] Running upgrade 502898887f84 -> 1b38cef5b76e, add dagrun INFO [alembic.runtime.migration] Running upgrade 1b38cef5b76e -> 2e541a1dcfed, task_duration INFO [alembic.runtime.migration] Running upgrade 2e541a1dcfed -> 40e67319e3a9, dagrun_config INFO [alembic.runtime.migration] Running upgrade 40e67319e3a9 -> 561833c1c74b, add password column to user INFO [alembic.runtime.migration] Running upgrade 561833c1c74b -> 4446e08588, dagrun start end INFO [alembic.runtime.migration] Running upgrade 4446e08588 -> bbc73705a13e, Add notification_sent column to sla_miss INFO [alembic.runtime.migration] Running upgrade bbc73705a13e -> bba5a7cfc896, Add a column to track the encryption state of the 'Extra' field in connection INFO [alembic.runtime.migration] Running upgrade bba5a7cfc896 -> 1968acfc09e3, add is_encrypted column to variable table INFO [alembic.runtime.migration] Running upgrade 1968acfc09e3 -> 2e82aab8ef20, rename user table INFO [alembic.runtime.migration] Running upgrade 2e82aab8ef20 -> 211e584da130, add TI state index INFO [alembic.runtime.migration] Running upgrade 211e584da130 -> 64de9cddf6c9, add task fails journal table INFO [alembic.runtime.migration] Running upgrade 64de9cddf6c9 -> f2ca10b85618, add dag_stats table INFO [alembic.runtime.migration] Running upgrade f2ca10b85618 -> 4addfa1236f1, Add fractional seconds to mysql tables INFO [alembic.runtime.migration] Running upgrade 4addfa1236f1 -> 8504051e801b, xcom dag task indices INFO [alembic.runtime.migration] Running upgrade 8504051e801b -> 5e7d17757c7a, add pid field to TaskInstance INFO [alembic.runtime.migration] Running upgrade 5e7d17757c7a -> 127d2bf2dfa7, Add dag_id/state index on dag_run table INFO [alembic.runtime.migration] Running upgrade 127d2bf2dfa7 -> cc1e65623dc7, add max tries column to task instance WARNI [unusual_prefix_c68bbead3b2adcf8c97e2b3de57b8cea620172da_example_kubernetes_executor_config] Could not import DAGs in example_kubernetes_executor_config.py: No module named 'kubernetes' WARNI [unusual_prefix_c68bbead3b2adcf8c97e2b3de57b8cea620172da_example_kubernetes_executor_config] Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes'] INFO [alembic.runtime.migration] Running upgrade cc1e65623dc7 -> bdaa763e6c56, Make xcom value column a large binary INFO [alembic.runtime.migration] Running upgrade bdaa763e6c56 -> 947454bf1dff, add ti job_id index INFO [alembic.runtime.migration] Running upgrade 947454bf1dff -> d2ae31099d61, Increase text size for MySQL (not relevant for other DBs' text types) INFO [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 0e2a74e0fc9f, Add time zone awareness INFO [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 33ae817a1ff4, kubernetes_resource_checkpointing INFO [alembic.runtime.migration] Running upgrade 33ae817a1ff4 -> 27c6a30d7c24, kubernetes_resource_checkpointing INFO [alembic.runtime.migration] Running upgrade 27c6a30d7c24 -> 86770d1215c0, add kubernetes scheduler uniqueness INFO [alembic.runtime.migration] Running upgrade 86770d1215c0, 0e2a74e0fc9f -> 05f30312d566, merge heads INFO [alembic.runtime.migration] Running upgrade 05f30312d566 -> f23433877c24, fix mysql not null constraint INFO [alembic.runtime.migration] Running upgrade f23433877c24 -> 856955da8476, fix sqlite foreign key INFO [alembic.runtime.migration] Running upgrade 856955da8476 -> 9635ae0956e7, index-faskfail INFO [alembic.runtime.migration] Running upgrade 9635ae0956e7 -> dd25f486b8ea, add idx_log_dag INFO [alembic.runtime.migration] Running upgrade dd25f486b8ea -> bf00311e1990, add index to taskinstance INFO [alembic.runtime.migration] Running upgrade 9635ae0956e7 -> 0a2a5b66e19d, add task_reschedule table INFO [alembic.runtime.migration] Running upgrade 0a2a5b66e19d, bf00311e1990 -> 03bc53e68815, merge_heads_2 INFO [alembic.runtime.migration] Running upgrade 03bc53e68815 -> 41f5f12752f8, add superuser field INFO [alembic.runtime.migration] Running upgrade 41f5f12752f8 -> c8ffec048a3b, add fields to dag INFO [alembic.runtime.migration] Running upgrade c8ffec048a3b -> dd4ecb8fbee3, Add schedule interval to dag INFO [alembic.runtime.migration] Running upgrade dd4ecb8fbee3 -> 939bb1e647c8, task reschedule fk on cascade delete INFO [alembic.runtime.migration] Running upgrade 939bb1e647c8 -> 6e96a59344a4, Make TaskInstance.pool not nullable INFO [alembic.runtime.migration] Running upgrade 6e96a59344a4 -> d38e04c12aa2, add serialized_dag table INFO [alembic.runtime.migration] Running upgrade d38e04c12aa2 -> b3b105409875, add root_dag_id to DAG INFO [alembic.runtime.migration] Running upgrade 6e96a59344a4 -> 74effc47d867, change datetime to datetime2(6) on MSSQL tables INFO [alembic.runtime.migration] Running upgrade 939bb1e647c8 -> 004c1210f153, increase queue name size limit INFO [alembic.runtime.migration] Running upgrade c8ffec048a3b -> a56c9515abdc, Remove dag_stat table INFO [alembic.runtime.migration] Running upgrade a56c9515abdc, 004c1210f153, 74effc47d867, b3b105409875 -> 08364691d074, Merge the four heads back together INFO [alembic.runtime.migration] Running upgrade 08364691d074 -> fe461863935f, increase_length_for_connection_password INFO [alembic.runtime.migration] Running upgrade fe461863935f -> 7939bcff74ba, Add DagTags table INFO [alembic.runtime.migration] Running upgrade 7939bcff74ba -> a4c2fd67d16b, add pool_slots field to task_instance INFO [alembic.runtime.migration] Running upgrade a4c2fd67d16b -> 852ae6c715af, Add RenderedTaskInstanceFields table INFO [alembic.runtime.migration] Running upgrade 852ae6c715af -> 952da73b5eff, add dag_code table INFO [alembic.runtime.migration] Running upgrade 952da73b5eff -> a66efa278eea, Add Precision to execution_date in RenderedTaskInstanceFields table INFO [alembic.runtime.migration] Running upgrade a66efa278eea -> da3f683c3a5a, Add dag_hash Column to serialized_dag table INFO [alembic.runtime.migration] Running upgrade da3f683c3a5a -> 92c57b58940d, Create FAB Tables INFO [alembic.runtime.migration] Running upgrade 92c57b58940d -> 03afc6b6f902, Increase length of FAB ab_view_menu.name column INFO [alembic.runtime.migration] Running upgrade 03afc6b6f902 -> cf5dc11e79ad, drop_user_and_chart INFO [alembic.runtime.migration] Running upgrade cf5dc11e79ad -> bbf4a7ad0465, Remove id column from xcom INFO [alembic.runtime.migration] Running upgrade bbf4a7ad0465 -> b25a55525161, Increase length of pool name INFO [alembic.runtime.migration] Running upgrade b25a55525161 -> 3c20cacc0044, Add DagRun run_type INFO [alembic.runtime.migration] Running upgrade 3c20cacc0044 -> 8f966b9c467a, Set conn_type as non-nullable INFO [alembic.runtime.migration] Running upgrade 8f966b9c467a -> 8d48763f6d53, add unique constraint to conn_id INFO [alembic.runtime.migration] Running upgrade 8d48763f6d53 -> e38be357a868, Add sensor_instance table INFO [alembic.runtime.migration] Running upgrade e38be357a868 -> b247b1e3d1ed, Add queued by Job ID to TI INFO [alembic.runtime.migration] Running upgrade b247b1e3d1ed -> e1a11ece99cc, Add external executor ID to TI INFO [alembic.runtime.migration] Running upgrade e1a11ece99cc -> bef4f3d11e8b, Drop KubeResourceVersion and KubeWorkerId INFO [alembic.runtime.migration] Running upgrade bef4f3d11e8b -> 98271e7606e2, Add scheduling_decision to DagRun and DAG INFO [alembic.runtime.migration] Running upgrade 98271e7606e2 -> 52d53670a240, fix_mssql_exec_date_rendered_task_instance_fields_for_MSSQL INFO [alembic.runtime.migration] Running upgrade 52d53670a240 -> 364159666cbd, Add creating_job_id to DagRun table INFO [alembic.runtime.migration] Running upgrade 364159666cbd -> 45ba3f1493b9, add-k8s-yaml-to-rendered-templates INFO [alembic.runtime.migration] Running upgrade 45ba3f1493b9 -> 849da589634d, Prefix DAG permissions. INFO [alembic.runtime.migration] Running upgrade 849da589634d -> 2c6edca13270, Resource based permissions. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/settings.py:395 DeprecationWarning: `session_lifetime_days` option from `[webserver]` section has been renamed to `session_lifetime_minutes`. The new option allows to configure session lifetime in minutes. The `force_log_out_after` option has been removed from `[webserver]` section. Please update your configuration. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/www/app.py:89 DeprecationWarning: Old deprecated value found for `cookie_samesite` option in `[webserver]` section. Using `Lax` instead. Change the value to `Lax` in airflow.cfg to remove this warning. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The logging_config_class option in [core] has been moved to the logging_config_class option in [logging] - the old setting has been used, but please update your config. /usr/local/anaconda3/envs/airflow-tutorial/lib/python3.7/site-packages/airflow/configuration.py:345 DeprecationWarning: The task_log_reader option in [core] has been moved to the task_log_reader option in [logging] - the old setting has been used, but please update your config. [2021-09-12 15:00:22,002] {manager.py:788} WARNING - No user yet created, use flask fab command to do it. INFO [alembic.runtime.migration] Running upgrade 2c6edca13270 -> 61ec73d9401f, Add description field to connection INFO [alembic.runtime.migration] Running upgrade 61ec73d9401f -> 64a7d6477aae, fix description field in connection to be text INFO [alembic.runtime.migration] Running upgrade 64a7d6477aae -> e959f08ac86c, Change field in DagCode to MEDIUMTEXT for MySql INFO [alembic.runtime.migration] Running upgrade e959f08ac86c -> 82b7c48c147f, Remove can_read permission on config resource for User and Viewer role [2021-09-12 15:00:25,041] {manager.py:788} WARNING - No user yet created, use flask fab command to do it. INFO [alembic.runtime.migration] Running upgrade 82b7c48c147f -> 449b4072c2da, Increase size of connection.extra field to handle multiple RSA keys INFO [alembic.runtime.migration] Running upgrade 449b4072c2da -> 8646922c8a04, Change default pool_slots to 1 INFO [alembic.runtime.migration] Running upgrade 8646922c8a04 -> 2e42bb497a22, rename last_scheduler_run column INFO [alembic.runtime.migration] Running upgrade 2e42bb497a22 -> 90d1635d7b86, Increase pool name size in TaskInstance INFO [alembic.runtime.migration] Running upgrade 90d1635d7b86 -> e165e7455d70, add description field to variable INFO [alembic.runtime.migration] Running upgrade e165e7455d70 -> a13f7613ad25, Resource based permissions for default FAB views. [2021-09-12 15:00:26,292] {manager.py:788} WARNING - No user yet created, use flask fab command to do it. INFO [alembic.runtime.migration] Running upgrade a13f7613ad25 -> 97cdd93827b8, Add queued_at column to dagrun table INFO [airflow.models.dagbag.DagBag] Filling up the DagBag from /Users/wilsonmar/airflow-tutorial/dags WARNI [unusual_prefix_c68bbead3b2adcf8c97e2b3de57b8cea620172da_example_kubernetes_executor_config] Could not import DAGs in example_kubernetes_executor_config.py: No module named 'kubernetes' WARNI [unusual_prefix_c68bbead3b2adcf8c97e2b3de57b8cea620172da_example_kubernetes_executor_config] Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes'] INFO [airflow.models.dag] Sync 33 DAGs INFO [airflow.models.dag] Creating ORM DAG for example_trigger_controller_dag INFO [airflow.models.dag] Creating ORM DAG for latest_only_with_trigger INFO [airflow.models.dag] Creating ORM DAG for tutorial INFO [airflow.models.dag] Creating ORM DAG for example_task_group_decorator INFO [airflow.models.dag] Creating ORM DAG for tutorial_taskflow_api_etl_virtualenv INFO [airflow.models.dag] Creating ORM DAG for example_python_operator INFO [airflow.models.dag] Creating ORM DAG for example_complex INFO [airflow.models.dag] Creating ORM DAG for example_external_task_marker_child INFO [airflow.models.dag] Creating ORM DAG for example_xcom_args INFO [airflow.models.dag] Creating ORM DAG for example_subdag_operator INFO [airflow.models.dag] Creating ORM DAG for example_xcom INFO [airflow.models.dag] Creating ORM DAG for example_short_circuit_operator INFO [airflow.models.dag] Creating ORM DAG for example_task_group INFO [airflow.models.dag] Creating ORM DAG for example_weekday_branch_operator INFO [airflow.models.dag] Creating ORM DAG for example_dag_decorator INFO [airflow.models.dag] Creating ORM DAG for example_trigger_target_dag INFO [airflow.models.dag] Creating ORM DAG for example_subdag_operator.section-1 INFO [airflow.models.dag] Creating ORM DAG for tutorial_etl_dag INFO [airflow.models.dag] Creating ORM DAG for latest_only INFO [airflow.models.dag] Creating ORM DAG for example_skip_dag INFO [airflow.models.dag] Creating ORM DAG for example_branch_operator INFO [airflow.models.dag] Creating ORM DAG for example_branch_datetime_operator_2 INFO [airflow.models.dag] Creating ORM DAG for example_bash_operator INFO [airflow.models.dag] Creating ORM DAG for example_nested_branch_dag INFO [airflow.models.dag] Creating ORM DAG for tutorial_taskflow_api_etl INFO [airflow.models.dag] Creating ORM DAG for example_external_task_marker_parent INFO [airflow.models.dag] Creating ORM DAG for example_branch_dop_operator_v3 INFO [airflow.models.dag] Creating ORM DAG for test_utils INFO [airflow.models.dag] Creating ORM DAG for example_xcom_args_with_operators INFO [airflow.models.dag] Creating ORM DAG for example_subdag_operator.section-2 INFO [airflow.models.dag] Creating ORM DAG for example_passing_params_via_test_command INFO [airflow.models.dag] Creating ORM DAG for example_kubernetes_executor INFO [airflow.models.dag] Creating ORM DAG for example_branch_labels INFO [airflow.models.dag] Setting next_dagrun for example_bash_operator to 2021-09-10 00:00:00+00:00 INFO [airflow.models.dag] Setting next_dagrun for example_branch_datetime_operator_2 to 2021-09-10 00:00:00+00:00 INFO [airflow.models.dag] Setting next_dagrun for example_branch_dop_operator_v3 to 2021-09-10 00:00:00+00:00 INFO [airflow.models.dag] Setting next_dagrun for example_branch_labels to 2021-09-10 00:00:00+00:00 INFO [airflow.models.dag] Setting next_dagrun for example_branch_operator to 2021-09-10 00:00:00+00:00 INFO [airflow.models.dag] Setting next_dagrun for example_complex to None INFO [airflow.models.dag] Setting next_dagrun for example_dag_decorator to None INFO [airflow.models.dag] Setting next_dagrun for example_external_task_marker_child to None INFO [airflow.models.dag] Setting next_dagrun for example_external_task_marker_parent to None INFO [airflow.models.dag] Setting next_dagrun for example_kubernetes_executor to None INFO [airflow.models.dag] Setting next_dagrun for example_nested_branch_dag to 2021-09-10 00:00:00+00:00 INFO [airflow.models.dag] Setting next_dagrun for example_passing_params_via_test_command to 2021-09-11 00:00:00+00:00 INFO [airflow.models.dag] Setting next_dagrun for example_python_operator to None INFO [airflow.models.dag] Setting next_dagrun for example_short_circuit_operator to 2021-09-10 00:00:00+00:00 INFO [airflow.models.dag] Setting next_dagrun for example_skip_dag to 2021-09-10 00:00:00+00:00 INFO [airflow.models.dag] Setting next_dagrun for example_subdag_operator to 2021-09-10 00:00:00+00:00 INFO [airflow.models.dag] Setting next_dagrun for example_subdag_operator.section-1 to None INFO [airflow.models.dag] Setting next_dagrun for example_subdag_operator.section-2 to None INFO [airflow.models.dag] Setting next_dagrun for example_task_group to 2021-09-10 00:00:00+00:00 INFO [airflow.models.dag] Setting next_dagrun for example_task_group_decorator to 2021-09-10 00:00:00+00:00 INFO [airflow.models.dag] Setting next_dagrun for example_trigger_controller_dag to 2021-09-10 00:00:00+00:00 INFO [airflow.models.dag] Setting next_dagrun for example_trigger_target_dag to None INFO [airflow.models.dag] Setting next_dagrun for example_weekday_branch_operator to 2021-09-10 00:00:00+00:00 INFO [airflow.models.dag] Setting next_dagrun for example_xcom to 2021-09-10 00:00:00+00:00 INFO [airflow.models.dag] Setting next_dagrun for example_xcom_args to None INFO [airflow.models.dag] Setting next_dagrun for example_xcom_args_with_operators to None INFO [airflow.models.dag] Setting next_dagrun for latest_only to 2021-09-10 00:00:00+00:00 INFO [airflow.models.dag] Setting next_dagrun for latest_only_with_trigger to 2021-09-10 00:00:00+00:00 INFO [airflow.models.dag] Setting next_dagrun for test_utils to None INFO [airflow.models.dag] Setting next_dagrun for tutorial to 2021-09-10 00:00:00+00:00 INFO [airflow.models.dag] Setting next_dagrun for tutorial_etl_dag to None INFO [airflow.models.dag] Setting next_dagrun for tutorial_taskflow_api_etl to None INFO [airflow.models.dag] Setting next_dagrun for tutorial_taskflow_api_etl_virtualenv to None INFO [airflow.models.dag] Sync 2 DAGs INFO [airflow.models.dag] Setting next_dagrun for example_subdag_operator.section-1 to None INFO [airflow.models.dag] Setting next_dagrun for example_subdag_operator.section-2 to None Initialization done
-
Exec:
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt" echo "$CONSTRAINT_URL"
Expected response:
https://raw.githubusercontent.com/apache/airflow/constraints-2.1.3/constraints-3.7.txt
It’s contents:
# Editable install with no version control (apache-airflow==2.1.3) APScheduler==3.6.3 ...
-
Do according to Airflow’s instructions:
pip install --upgrade "apache-airflow[postgres,google]==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
-
Download initial file:
curl ???
-
Run your own Airflow workflow:
python "$AIRFLOW_HOME/dags/tutorial.py"
Now both scheduler and webserver should be running on localhost:8080
Start
-
Execute
airflow scheduler
-
Start Airflow server:
airflow webserver
-
Start airflow with
python -m airflow
Airflow Pipeline
https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html
tasks named t1, t2, etc.
Imports
from datetime import timedelta from textwrap import dedent # The DAG object; we'll need this to instantiate a DAG from airflow import DAG # Operators; we need this to operate! from airflow.operators.bash import BashOperator from airflow.utils.dates import days_ago
Default Arguments (Args)
# These args will get passed on to each operator # You can override them on a per-task basis during operator initialization default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), # 'wait_for_downstream': False, # 'dag': dag, # 'sla': timedelta(hours=2), # 'execution_timeout': timedelta(seconds=300), # 'on_failure_callback': some_function, # 'on_success_callback': some_other_function, # 'on_retry_callback': another_function, # 'sla_miss_callback': yet_another_function, # 'trigger_rule': 'all_success' }
Instantiate
with DAG( 'tutorial', default_args=default_args, description='A simple tutorial DAG', schedule_interval=timedelta(days=1), start_date=days_ago(2), tags=['example'], ) as dag:
Tasks
Python bit-wise operators are used to specify dependencies:
first_task >> [second_task, third_task] third_task << fourth_task
Alternately, use the more explicit set_upstream and set_downstream methods:
first_task.set_downstream(second_task, third_task) third_task.set_upstream(fourth_task)