Module d'onboarding — Airflow
Contenu pédagogique produit par Bill (QA). Destiné à onboarding.fastodata.com.
Public : ingénieur data avec Python OK. Format : théorie d'abord → exercices → quiz corrigé.
Contexte équipe : orchestration de pipelines data (alternative/complément à Kestra), souvent au-dessus de BigQuery/dbt.
Durée estimée : ~3-4 h.
Objectifs pédagogiques
À la fin, l'apprenant sait :
- Expliquer le rôle d'Airflow (orchestrateur de workflows) et son modèle DAG.
- Écrire un DAG avec la TaskFlow API et des opérateurs.
- Comprendre scheduling,
schedule,catchup, backfill, intervalles. - Gérer dépendances, retries, SLA, et passage de données (XCom).
- Distinguer Operators / Sensors / Hooks, et utiliser les connexions/variables.
- Lire l'UI, débugger une tâche, éviter les pièges classiques.
Module 1 — Le modèle mental Airflow
Airflow orchestre : il décide quoi lancer, quand, dans quel ordre, et que faire en cas d'échec. Il n'est pas un moteur de calcul — comme dbt délègue à BigQuery, Airflow délègue le travail réel aux systèmes cibles (BigQuery, Spark, conteneurs, scripts).
- Un DAG (Directed Acyclic Graph) = un workflow : un ensemble de tâches + leurs dépendances, sans cycle.
- Une task = une unité de travail (un Operator instancié).
- Un run (DagRun) = une exécution du DAG pour un intervalle de données donné (
data_interval).
⚠️ Piège central : Airflow est orienté intervalles de données, pas « maintenant ». Un DAG
@dailyqui tourne le 2 traite la journée du 1er (data_interval_start=1,end=2). C'est la source n°1 de confusion.
Module 2 — Écrire un DAG (TaskFlow API)
Style moderne (Airflow 2+), pythonique :
from airflow.decorators import dag, task
import pendulum
@dag(
schedule="@daily",
start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
catchup=False,
tags=["ventes"],
)
def pipeline_ventes():
@task
def extract() -> list[dict]:
return [{"id": 1, "amount": 100}]
@task
def transform(rows: list[dict]) -> int:
return sum(r["amount"] for r in rows)
@task
def load(total: int):
print(f"total chargé : {total}")
load(transform(extract())) # les dépendances se déduisent des appels
pipeline_ventes()
@tasktransforme une fonction Python en tâche ; les valeurs de retour passent en XCom automatiquement.- Les dépendances se déduisent de l'enchaînement des appels (
load(transform(extract()))).
Style classique (Operators) — toujours courant :
from airflow.operators.bash import BashOperator
t1 = BashOperator(task_id="dump", bash_command="echo go")
t2 = BashOperator(task_id="post", bash_command="echo done")
t1 >> t2 # t1 puis t2 (équivalent : t2 << t1)
a >> b = a avant b. [a, b] >> c = c attend a et b.
Module 3 — Scheduling, catchup, backfill
schedule:@daily,@hourly, un cron ("0 6 * * *"), untimedelta, ouNone(déclenché manuellement / par API).start_date: début de la 1re fenêtre. Le 1er run a lieu à la fin du 1er intervalle.catchup=True: au déploiement, Airflow rattrape tous les intervalles manqués depuisstart_date(peut lancer des centaines de runs !).catchup=False= ne lance que le plus récent. Réflexe :catchup=Falsepar défaut.- Backfill : rejouer volontairement une plage passée (
airflow dags backfill -s … -e …).
Variables d'intervalle utiles dans les templates : {{ data_interval_start }}, {{ data_interval_end }}, {{ ds }} (date logique YYYY-MM-DD).
Module 4 — Dépendances, retries, SLA, XCom
- Retries (par tâche) :
@task(retries=3, retry_delay=pendulum.duration(minutes=5))
def fragile(): ...
- default_args : valeurs par défaut pour toutes les tâches du DAG (owner, retries, email_on_failure…).
- XCom : petit canal de passage de données entre tâches (métadonnées, pas des gros volumes — la donnée lourde va en base/objet, pas en XCom).
- Trigger rules : par défaut une tâche démarre si tous ses parents ont réussi (
all_success). Autres :all_done,one_failed,none_failed_min_one_success… (utile pour les tâches de nettoyage/notif). - SLA / timeouts :
execution_timeoutcoupe une tâche trop longue ; les SLA alertent si un délai est dépassé.
Module 5 — Operators, Sensors, Hooks, Connections
- Operator : fait une action (
BashOperator,PythonOperator,BigQueryInsertJobOperator,KubernetesPodOperator…). - Sensor : attend une condition (fichier présent, partition arrivée, heure…). Toujours préférer le mode
reschedule(libère le worker entre deux checks) au modepoke. - Hook : la brique de bas niveau qui parle à un système externe (client BigQuery, S3, Postgres). Les Operators s'appuient dessus.
- Connections & Variables : secrets/configs gérés dans Airflow (UI/secrets backend), jamais en dur dans le code. Référencés par
conn_id/Variable.get(...).
Module 6 — UI, debug & bonnes pratiques
- Grid / Graph view : voir l'état des runs et tâches (success/failed/up_for_retry…).
- Logs par tâche : 1er endroit à regarder en cas d'échec.
- Clear une tâche/un run pour le rejouer.
- Bonnes pratiques :
- DAG idempotent : rejouer un intervalle ne doit pas dupliquer la donnée (cf. incremental dbt,
MERGE, partitions). - Code DAG léger au top-level (pas d'appel réseau/calcul à l'import — le scheduler parse les fichiers en boucle).
- Tâches petites et atomiques, nommées clairement.
catchup=Falsesauf besoin explicite de backfill.
- DAG idempotent : rejouer un intervalle ne doit pas dupliquer la donnée (cf. incremental dbt,
Exercices
- Écris un DAG
@hourly,catchup=False, qui : (a)extractune liste, (b)transformen agrégat, (c)load. En TaskFlow API. - Ajoute 3 retries + 2 min de delay sur la tâche
extract, et unexecution_timeoutde 10 min surtransform. - Un DAG quotidien doit traiter la veille. Quelles variables de template utilises-tu pour borner la requête sur le bon intervalle ?
- Tu déploies un DAG avec
start_dateil y a 6 mois. Que se passe-t-il aveccatchup=True? Comment l'éviter ? - Une tâche de notification doit s'exécuter même si une tâche amont a échoué. Quelle
trigger_rule?
Quiz final corrigé
Q1. Airflow exécute-t-il le calcul lui-même ?
→ Non. C'est un orchestrateur ; il délègue le travail aux systèmes cibles (BigQuery, Spark, conteneurs…).
Q2. Un DAG @daily start_date=1er janv : quand tourne le run du 1er janvier, et quel intervalle traite-t-il ?
→ Il se déclenche à la fin de l'intervalle (≈ 2 janv 00:00) et traite la journée du 1er (data_interval_start=1er, end=2).
Q3. Différence catchup=True vs False ?
→ True rattrape tous les intervalles manqués depuis start_date (risque de centaines de runs). False ne lance que le plus récent. Défaut conseillé : False.
Q4. À quoi sert XCom, et qu'est-ce qu'on n'y met PAS ?
→ Passer de petites valeurs/métadonnées entre tâches. On n'y met pas de gros volumes (ça va en base/stockage objet).
Q5. Operator vs Sensor vs Hook ?
→ Operator = fait une action. Sensor = attend une condition (préférer reschedule). Hook = client bas-niveau vers un système externe, utilisé par les Operators.
Q6. Pourquoi éviter le code lourd au top-level d'un fichier DAG ?
→ Le scheduler parse les fichiers en boucle ; tout calcul/appel réseau à l'import ralentit/instabilise le parsing. Le travail doit être dans les tâches.
Q7. Que veut dire « DAG idempotent » et pourquoi c'est crucial ?
→ Rejouer un même intervalle produit le même résultat sans doublon. Crucial car retries/backfills relancent des intervalles — sinon données dupliquées.
Pour aller plus loin
- Doc officielle Airflow (Concepts, TaskFlow, Scheduling).
- Comparaison Airflow ↔ Kestra (l'équipe utilise aussi Kestra — skills
kestra,kestra-plugins-reference). - Intégration BigQuery/dbt : opérateurs BigQuery,
BashOperator/dbtou Cosmos pour orchestrer dbt depuis Airflow.