Les informations à collecter et à stocker sont de plus en plus nombreuses. L’énorme flux de données qui affluent de nos jours favorise l’émergence de nouveaux outils. Parmi eux figurent Apache Airflow. Dans l’écosystème de technologies Big Data, Apache Airflow tout comme Oozie, est l’orchestrateur et le planificateur automatique des tâches déployées sur le cluster.
Dans ce tutoriel exhaustif, nous allons vous présenter Apache Airflow et vous apprendre à l’utiliser pour déployer automatiquement vos projets en production.
Apache Airflow : qu’est-ce que c’est ?
Apache Airflow est un système de gestion de flux de travail ETL (Extraction, Transformation, Chargement) beaucoup utilisé pour la transformation de données. Il est codé en python et les workflow sont écrits via des scripts pythons.
L’utilisation de python permet au développeur de créer facilement des workflow grâce à l’importation des classes et des bibliothèques python.
Les workflow sont organisés et exprimés sous forme de Directed Acyclic Graphs (DAG). En réalité, il s’agit de la méthode choisie pour réaliser une tâche.
Tout a commencé en 2014 dans les locaux d’AirBnB. L’entreprise prend de l’ampleur et donc un volume considérable de données à traiter. L’entreprise californienne embauche des Data Scientists, des Data Analysts et des Data Engineers qui doivent travailler en étroite collaboration pour traiter toutes ces données.
Pour tout automatiser, ils écrivent des jobs batch planifiés, afin de pouvoir améliorer la qualité de travail. Dans cette optique, l’ingénieur des données, Maxime Beauchemin, crée un outil open-source intitulé Airflow. En Avril 2016, la Fondation Apache incube le projet et reçoit le statut de projet “top niveau” en janvier 2019. Il compte plus de 1400 contributeurs, 11 230 contributions, et 19 800 étoiles sur GitHub vers la fin de l’année de 2020.
Apache Airflow : sa fonction
Imaginez que vous avez à votre disposition un modèle de Machine Learning (ML) qui doit analyser le sentiment sur twitter. Ensuite, vous voulez appliquer ce ML sur vos amis sur twitter pour leur tweet quotidien. Comme vous allez sans doute le constater, un tel flux de données est complexe et très volumineux à gérer. Il vous faudra orchestrer et construire un pipeline complexe pour déployer ce modèle ML en production, de sorte qu’il produise des résultats continus.
De par son extensibilité, Airflow est excellent pour orchestrer ce type de tâches avec des dépendances compliquées sur plusieurs systèmes externes. Etant donné que chaque étape du pipeline est un code, il est facile d’adapter ces pipelines à vos propres besoins.
Airflow est utilisable pour n’importe quel pipeline de données en lot et il peut traiter plusieurs cas d’utilisation différents.
Quelques usages intéressant d’Airflow
Il existe de nombreux cas d’utilisation pertinents d’Airflow, vous pouvez par exemple :
- Rassembler quotidiennement les mises à jour des équipes de vente depuis Salesforce pour transmettre un message quotidien aux dirigeants de l’entreprise ;
- Organiser et lancer certaines tâches de Machine Learning qui s’exécutent sur un cluster Spark externe ;
- Ou encore charger et analyser à chaque heure, les données d’analyse d’applications ou de site Web dans un entrepôt de données.
Les composants d’Airflow
Plusieurs éléments sont combinés afin de former Airflow. Ici, nous allons voir un a un les composants en question.
- Web server : ce composant donne accès à une interface web (UI) d’où l’on peut monitorer les data pipelines, créer de nouvelle connexion vers des systèmes externes et vérifier si les jobs se terminent correctement ;
- Scheduler : il permet de planifier les tâches des pipelines ;
- Executor : lié au scheduler, il détermine le processus qui exécute chaque tâche à orchestrer ;
- Metastore : c’est au niveau de celui-ci que sont stockées toutes les données liées aux différents jobs, DAGs étant l’interface admin ;
- Worker : Il s’agit du processus qui exécute les tasks que l’exécuteur lui indique.
Architecture d’Apache Airflow
L’image ci-dessous représente l’architecture globale que possède Apache Airflow. Et comme vous allez le constater, chaque composant mentionné juste avant présente des corrélations.
Apache Airflow : les concepts de base
Avant d’entrer dans le vif dus sujet, il est important de connaitre les concepts sur lesquels se base Apache Airflow. Pour cela, voyons quelques notions qui englobe ce système.
DAG (Pipeline)
Un DAG est un graphe sans cycle dont les nœuds sont des tâches (Tasks). Les tâches se déroulent dans un ordre donné, en parallèle ou à la suite, et ce, sans risque de boucle infini. Cette suite de tâche forme ensuite un pipeline.
Un DAG est configuré à l’aide du code Python. Il faudra donc avoir des notions sur ce langage pour pouvoir l’aborder. Nous avons dédié un article entier sur la programmation Python ainsi que d’autres article traitant ce langage, si jamais vous ne le maitrisez pas encore.
Le terme Backfill désigne la possibilité de rejouer un DAG à partir d’un point donné dans le passé.
DagInstance
Un DAG en cours d’exécution ou déjà exécutés est appelé DagInstance. Il faut savoir qu’un DAG est exécuté régulièrement grâce à un trigger.
TaskInstance
Une tâche d’un DAG en cours d’exécution est appelée TaskInstance.
Task
C’est l’instance d’un operator. Un operator est une classe Python qui hérite le BaseOperator. La création d’une tâche passe par un operator. La fonction qui exécute l’operator est lancé lorsque la tâche est appelée.
Operators
C’est un concept permettant de classer les tâches par type ou catégorie. Il exécute des opérations telles que lancer un code Python, créer une instance EC2 sur Amazon ou encore lancer une requête HQL. Il existe plusieurs operators prédéfinis dont :
- PythonOperator : qui exécute une fonction Python ;
- EmailOperator : qui sert à l’envoie d’un mail ;
- BashOperator : qui exécute une commande bash ;
- DockerOperator : qui exécute une commande dans un container Docker ;
- MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator : pour executer des commandes SQL
- HttpOperator : qui effectue une requête sur un endpoint HTTP (hive, s3, redshift, etc.)
- Sensor : qui attend pendant un certain temps l’arrivée d’un fichier, d’une ligne de base de données, d’une clé S3, etc.
Sensors
Il est important de souligner certains points concernant ces opérateurs. En effet, ce sont des types particuliers d’opérateurs, mettant en pause l’exécution d’une tâche jusqu’à ce qu’une condition ou des conditions soient remplies. Par exemple, l’attente du dépôt d’un fichier sur HDFS ou tout simplement d’un timer.
Executors
Airflow dispose d’un ordonnanceur (scheduler) pour exécuter ses différentes tâches. Une tâche appelée s’exécute dans un endroit précis. Le mécanisme d’appel des tâches est géré par Airflow via executor. Il existe plusieurs executors que voici :
- LocalExecutor
Grâce à LocalExecutor, plusieurs tâches sont lancées en parallèle jusqu’à ce que l’utilisation des ressources de la machine locale soit requise. On parle de scaling verticale.
- SequentialExecutor (mode par défaut)
Les tâches sont lancées en série sur le même serveur où le Airflow est lancé. Il n’exécute qu’une seule instance de tâche à la fois, c’est-à-dire qu’il n’y a pas d’autres tâches lancées en parallèle. En développement, le mécanisme du parallélisme est très utile. Cependant, il montre très vite ses limites lors du passage à l’échelle.
- CeleryExecutor
Celery est un mécanisme de file d’attente des tâches asynchrones basées sur le transfert de message distribué. Il fonctionne en temps réel mais prend également en charge la planification.
Architecture d’Airflow en mode distribuée
Le schéma suivant montre le comportement d’Airflow en mode distribuée :
Le scheduler crée une nouvelle instance DagRun dans le dépôt Airflow metastore à chaque fois qu’un DAG spécifique est lancé et commence à exécuter les tâches individuelles dans le DAG. Il envoie des messages qui contient les informations sur la tâche et la fonction à exécuter (DAG_id, Task_id, etc.) dans un Queuing service.
L’utilisateur interagit avec le web server dans certains cas. Il peut déclencher manuellement l’exécution d’un DAG. Un DAGRun est créé et le scheduler refait le même processus qui est de déclencher des tâches individuelles.
Les Hookes
Sur Airflow, les Hookes permettent d’interagir avec les interfaces des autres systèmes. Grâce à ce dernier, vous pouvez vous connecter à des APIs et des bases de données externes comme Hive, S3, GCS, MySQL ou Postgres.
Hookes ne conserve pas les informations confidentielles comme les identifiants de connexion. Ces dernières sont stockées dans une base de métadonnées chiffrée associée à l’instance Airflow en cours.
Les plugins
La combinaison entre les Hookes et les opérateurs peut être décrite comme les plugins Airflow. Ils sont généralement utilisés pour accomplir certaines tâches spécifiques impliquant une application externe.
Par exemple, il peut s’agir du transfert de données à partir de Salesforce vers Redshift. Il existe de nombreuses collections open-source de plugins créés par la communauté d’utilisateurs, et chaque utilisateur peut créer des plugins pour répondre à ses propres besoins.
Les connexions
Airflow arrive à stocker des informations grâce aux connexions. Ces informations permettent de se connecter à des systèmes externes comme des identifiants ou des tokens d’API.
Toutes ces informations sont gérées directement depuis l’interface utilisateur de la plateforme. Les données sont chiffrées et stockées sous forme de métadonnées dans une base de données Postgres ou MySQL.
Apache Airflow : Premier pas
Apache Airflow propose une solution qui répond au défi croissant d’un environnement compliqué d’outils de gestion de données de script et de traitement d’analyse à gérer et coordonner.
C’est une solution open-source qui facilite la création, l’orchestration et le monitoring de vos différents traitements. Un projet initié par l’équipe d’Airbnb en 2014 et qui à ce jour arrive à gérer un nombre plus important de workflow.
Avec Airflow, on a la possibilité de créer des graphes acycliques (DAGs) grâce au langage Python, ce qui permet de créer des tâches connectées et dépendantes les unes des autres afin de réaliser vos flux.
Apache Airflow : Caractéristique
L’utilisation d’Apache Airflow offre plusieurs avantages qui vont sans doute faciliter la vie des travailleurs du Big Data. Parmi ces avantages, nous avons :
- Le fait qu’il soit open source : il est gratuit avec beaucoup d’utilisation active ;
- Sa facilité d’utilisation : même avec une connaissance de base en python vous êtes déjà prêt à déployer sur Airflow ;
- Son intégration robuste : il y a des opérateurs prêts à l’emploi pour vous aider à travailler avec Google Cloud Platform, Amazon AWS, Microsoft Azure, etc.
- La possibilité d’utiliser Python Standard pour coder : créez avec flexibilité des workflow simples et complexes grâce à Python ;
- L’interface utilisateur étonnante : vous pouvez surveiller et gérer vos flux de travail. Cela vous permettra de vérifier l’état des tâches terminées et en cours.
Installation d’Apache Airflow
Passons maintenant aux choses sérieuses. Nous allons installer Apache Airflow et pour cela, nous allons procéder étape par étape.
Etape #1 : Il faut tout d’abord installer pip sur votre système en tapant la ligne de commande suivante sur votre terminal. Ignorez la commande si pip est déjà installé sur votre système.
sudo apt-get install python3-pip
Etape #2 : Il faut définir un emplacement pour l’airflow dans votre système local. L’emplacement par défaut est ~/airflow. Cet emplacement est toutefois modifiable.
export AIRFLOW_HOME=~/airflow
Etape #3 : Installez apache airflow grâce à la commande suivante :
pip3 install apache-airflow
Etape #4 : Pour exécuter les workflows et les maintenir, airflow a besoin d’un backend de base de données. Tapez la commande suivante :
airflow initdb
Etape #5 : Lancez le serveur web grâce à la commande ci-après. Le port par défaut est 8080. Modifiez-le s’il est déjà utilisé.
airflow webserver -p 8080
Etape #6 : Démarrez le programme airflow
airflow scheduler
Maintenant, créez un dossier “dags” dans le répertoire airflow où vous définirez vos workflows ou vos DAG. Ouvrez le navigateur Web et tapez : http://localhost:8080/admin/.
Vous verrez une affichage comme ceci :
L’interface utilisateur
Maintenant qu’Airflow est déjà installé, voyons un peu comment se comporte l’interface d’administrateur.
Vue DAGS
Il s’agit de la vue par défaut de l’interface utilisateur. Tous les DAGs présents dans le système sont listés à ce niveau. C’est une vue résumée de DAG, on y trouve, par exemple, le nombre de fois ou un DAG a été exécuté avec succès ou a échoué, l’heure de la dernière exécution et d’autres liens utiles.
Vue graphique
Dans la vue graphique, on a une vue sur toutes les étapes de flux de travail avec leurs dépendances et leurs états actuels. On identifie ces différents états au moyen d’un code de couleur.
Vue par arbre
L’arborescence représente également le DAG. Si le pipeline prend du temps avant de s’exécuter, vous pouvez effectuer une vérification pour identifier quelle partie prend énormément de temps avant de s’exécuter puis vous pouvez travailler dessus.
Durée de la tâche
La comparaison de la durée d’exécution de vos tâches à différents intervalles de temps se fait dans cette vue. Grâce à cela, vous pouvez optimiser les algorithmes et comparer les performances.
Code
Vous pouvez voir dans cette vue le code qui sert à générer le DAG
Création du premier DAG
Dans cette section, vous allez d’abord créer un flux de travail qui consiste premièrement à imprimer « Obtenir des scores de cricket en direct » sur le terminal, puis imprimer le score en direct sur le terminal à l’aide d’une API.
Testons d’abord l’API et pour cela, vous devez installer la bibliothèque cricket-cli à l’aide de la commande suivante.
sudo pip3 install cricket-cli
Vous obtiendrez le score en tapant la commande suivante
Importation des bibliothèques
Vous pouvez créer le même workflow à l’aide d’Apache Airflow. Mais le code sera complètement en python pour définir un DAG. Pour cela, commencez par importer les bibliothèques nécessaires.
Dans ce cas, c’est la bibliothèque BashOperator qui sera utilisée, car notre flux de travail nécessite uniquement l’exécution des opérations Bash.
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
Définition des arguments de DAG
Vous devez passer un dictionnaire d’argument pour chacun des DAG. Voici quelques descriptions des arguments à passer :
- propriétaire : le nom du propriétaire du workflow, doit être alphanumérique et peut comporter des traits de soulignement mais ne doit contenir aucun espace ;
- start_date : date de début de votre workflow ;
- retry_delay : si une tâche échoue, combien de temps il faut attendre pour la réessayer ;
- email : votre adresse de messagerie, afin que vous puissiez recevoir un e-mail chaque fois qu’une tâche échoue pour une raison quelconque ;
- depend_on_past : à chaque fois que vous exécutez votre workflow, marquez-la comme True si les données dépendent de l’exécution passée sinon marquez-la comme False.
default_args = {
'owner': 'lakshay',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
Définition du DAG
Pour définir un DAG, créez un objet DAG et passez le dag_id qui est le nom du DAG. Le dag_id doit être unique.
Passez les arguments définis à la dernière étape. Puis, ajoutez une description et un schedule_interval qui exécuteront le DAG après l’intervalle de temps précisé.
# define the DAG
dag = DAG(
'live_cricket_scores',
default_args=default_args,
description='First example to get Live Cricket Scores',
schedule_interval=timedelta(days=1),
)
Définition des tâches
Pour cet airflow il faut deux tâches :
print : dans la première tâche, il faut imprimer « Obtenir des scores de cricket en direct !!! » sur le terminal à l’aide de la commande echo.
get_cricket_scores : dans celle-ci, il faut imprimer les scores de cricket en direct à l’aide de la bibliothèque installée.
Maintenant, tout en définissant la tâche, vous devez d’abord choisir le bon opérateur. Ici, les deux commandes sont basées sur un terminal. C’est le BashOperator qui sera utilisé. Le task_id sera un identifiant unique de la tâche visible sur les nœuds de Graph View de votre DAG. Passez la commande bash que vous souhaitez exécuter et enfin l’objet DAG auquel vous souhaitez lier cette tâche.
Enfin, créez le pipeline en ajoutant l’opérateur « >> » entre les tâches :
# define the first task
t1 = BashOperator(
task_id='print',
bash_command='echo Getting Live Cricket Scores!!!',
dag=dag,
)
# define the second task
t2 = BashOperator(
task_id='get_cricket_scores',
bash_command='cricket scores',
dag=dag,
)
# task pipeline
t1 >> t2
Mettre à jour le DAGS dans l’interface utilisateur Web
Maintenant, il faut actualiser l’interface utilisateur et vous verrez votre DAG dans la liste. Activez la bascule à gauche de chacun des DAG, puis déclenchez le DAG.
Cliquez sur le DAG et ouvrez la vue graphique et vous verrez quelque chose comme ceci :
Chacune des étapes du flux de travail sera dans une boîte séparée et sa bordure passe au vert foncé une fois que tout se passe avec succès.
Pour obtenir plus de détails, il faut cliquer sur le nœud « get_cricket scores ». Cela vous donnera une vue comme celle-ci :
Maintenant, cliquez sur Afficher le journal pour voir la sortie de votre code :
Ainsi vous avez réussi à créer votre premier DAG dans airflow.
Comment Installer Apache Airflow pour Exécuter KubernetesExecutor ?
Malgré tous les efforts fournis par la communauté open source pour créer un graphique Helm prêt pour la production et un Airflow sur K8s Operator, rien a été publié pour le moment. Il ne prend pas en charge KubernetesExecutor.
Pour arriver à lancer correctement KubernetesExecutor, il faut donc essayer d’autres méthodes.
Commencez par cloner le référentiel :
$ git clone https://github.com/chattarajoy/airflow-kube-helm
$ cd airflow-kube-helm
La grande partie des images Airflow Docker disponibles gratuitement ne sont pas fournies avec le plug-in Kubernetes préinstallé dans l’image. Le référentiel comporte un script permettant de créer facilement votre propre image :
$ ./examples/minikube/docker/build-docker.sh
.
Removing intermediate container ecdb3b6fea54
---> ced46d6eda5c
Successfully built ced46d6eda5c
Successfully tagged airflow:latest
La construction de l’image s’est effectuée avec succès. Pour installer le graphique Airflow sur votre configuration Kubernetes locale, il faut utiliser le Helm. Pour réussir cela, vous pouvez utiliser à la fois les Kubernetes de Minikube ou ceux de Docker Desktop.
Après que votre cluster Kubernetes local est en cours d’exécution, vous devez installer kubectl et installer Helm. Ce n’est qu’après cette opération que vous pouvez déployer Airflow.
$ cd
airflow $ kubectl create ns
airflow $ helm --namespace airflow airflow install .
Ci-dessus, se trouve la commande Helm qui déploie les modèles mentionnés dans le répertoire actuel vers le cluster Kubernetes actuel. Si vous avez besoin de plus d’informations sur les graphiques de barre, voir ici.
Afin de vérifier l’état de vos pods Airflow, lancez la commande ci-dessous et attendez que tous les pods se mettent en marche :
$ kubectl -n flux d'air get pods
Pour démarrer tous les pods, il faudra quelques minutes à Kubernetes. Quand tous les pods sont en cours d’exécution, vous pouvez accéder au serveur web Airflow. Il faut connaître le port sur lequel le serveur s’exécute et pour ce faire, tapez la commande suivante :
$ kubectl get svc -n flux d'air
En regardant le croisement de la ligne airflow web et la colonne PORT(S), vous verrez ceci 8080:30926. Le port à utiliser pour accéder au serveur web est le 30926, donc en tapant localhost:30926, vous verrez ceci :
Une fois sur l’interface utilisateur serveur web, vous pouvez déclencher l’exemple de DAG présent. Mais il est important que vous notiez que pour chaque tâche, Airflow crée un nouveau pod sur le cluster Kubernetes, c’est KubernetesExecutor en action.
Les grands avantages à utiliser Apache Airflow avec Kubernetes Executor
Les avantages d’utiliser Airflow avec Kubernetes peuvent être regroupés en 4 points :
- Haut niveau d’élasticité, le cluster est dynamique. Il s’adapte aux charges de travail ;
- Configuration du pod au niveau de la tâche, n’oubliez pas que pour chaque tâche, l’exécuteur Kubernetes crée un nouveau pod, donc pour chaque tâche, les ressources nécessaires peuvent être spécifiées. Les ressource comme : cpu/mémoire ainsi que l’image docker – dépendances
- Tolérance aux pannes, chaque travail d’airflow est isolé dans un pod donc en cas de d’échec d’exécution d’une tâche cela n’influence pas le travail. De plus, si le planificateur tombe en panne, en tirant parti de la fonctionnalité « Resource Version » de Kubernetes, lorsque le planificateur redémarre, nous pouvons redémarrer à partir de l’état où nous étions.
Déploiement plus simple, tout le déploiement peut être spécifié dans un seul fichier YAML. Vos dépendances sont également déchargées dans des conteneurs.
Airflow vs. Cron
La grande question est : qu’est-ce qui différencie airflow des tâches cron ? Il faut reconnaître qu’il est vraiment facile de lancer de manière régulière un script, un programme ou une commande.
Mais il y a de forte chance que votre tâche rencontrera plus tard un problème :
- un timeout sur une requête SQL ;
- un changement de clé dans le Json d’une API que vous interrogez ;
- un site web injoignable au moment d’un GET ;
- etc.
En cas d’erreur pour un script simple, il est possible de résoudre le problème depuis le code du script.
Tout devient un peu plus compliqué lorsque nous devons gérer plusieurs tâches dépendantes les unes des autres. Imaginez un instant qu’une tâche A a besoin des données créées par une tâche B avant de s’exécuter, donc la tâche A ne peut s’exécuter qu’après la tâche B.
En cron, la seule possibilité de réussir cela est de laisser un temps considérable entre le lancement des deux tâches pour que celle-ci s’exécute dans le bon ordre.
Le danger à ce niveau est que si pour une raison quelconque la tâche A ne s’exécute pas alors la tâche B va s’exécuter normalement mais avec des données corrompues.
Ce n’est qu’en ce moment qu’on trouve tout l’avantage d’Airflow. Airflow est capable de gérer toutes les dépendances entre les différentes tâches, on parle ainsi de workflow ou encore de pipeline.
En déclarant toutes les différentes tâches à Airflow, ce dernier s’occupe de leur exécution dans le bon ordre et de pouvoir relancer toutes les tâches dont l’exécution n’a abouti.
Voilà, nous arrivons à terme de cet article dans lequel nous avons abordé en long et en large Apache Airflow. A l’issu de celui-ci, vous avez toutes les cartes en mains afin d’appréhender ce système. Si vous souhaitez apprendre davantage sur le Big Data, nous vous proposons cette formation sur l’écosystème Hadoop. Cliquez sur le lien texte suivant pour accéder à la documentation officielle d’Apache airflow