Exécuter le MapReduce dans un cluster Hadoop

Pour valoriser le volume astronomique de données générées dans l’ère du Big Data, la meilleure stratégie consiste à distribuer le stockage de données et à paralléliser leur traitement dans un cluster d’ordinateurs.  Dit comme cela, c’est très facile à appréhender, cependant la réalité est tout autre ! Car valoriser les données dans un cluster soulève 2 difficultés majeures :

Difficulté #1 pour paralléliser les traitements dans le cluster, les calculs qui seront appliqués sur les données doivent être divisés en tâches indépendantes, de manière à ce que si une tâche n’est pas traitée entièrement, que cela n’affecte pas les autres tâches. Ce n’est qu’à cette condition qu’on pourra exécuter un traitement de façon parallèle dans le cluster ;

Difficulté #2pour distribuer le stockage des données dans le cluster, les fichiers de données doivent être stockés de façon redondante sur les nœuds du cluster. Si on ne duplique pas les fichiers sur plusieurs nœuds, alors en cas de panne de l’un des nœuds, tous ses fichiers seront indisponibles jusqu’à ce que le nœud soit changé, ce qui va perturber les traitements. La redondance des données est donc indispensable pour distribuer le stockage des données dans un cluster et ainsi garantir sa disponibilité et sa tolérance aux pannes ;

L’essence même de la valorisation des données en Big Data repose sur les procédés employés pour résoudre ces 2 challenges.   Si vous souhaitez travailler dans le Big Data ou vous y réorienter, et que vous ne comprenez pas correctement d’un point de vue conceptuel les méthodes employées pour distribuer le stockage de données et paralléliser leur traitement dans un cluster, alors vous entrez dans la discipline du Big Data avec un sérieux handicap !

Le procédé utilisé pour soulever le premier défi, le parallélisme des traitement, est le MapReduce. Tandis que le procédé utilisé pour soulever le second défi, la distribution du stockage des données, est le HDFS

Le MapReduce et le HDFS se combinent dans un cluster Hadoop pour permettre de traiter les données à large échelle. A eux deux, ils représentent la fondation de tout le Big Data. 

Dans cette chronique exhaustive, nous allons vous expliquer en détails ce qu’est le MapReduce. Nous vous montrerons comment  grâce au mapreduce, on parvient à surmonter les difficultés de parallélisme des calculs dans un cluster. Et nous finirons par l’écriture d’un job MapReduce, et son exécution dans un cluster Hadoop. Le HDFS quant à lui sera abordé dans une autre chronique.  

1 – Définition complète du MapReduce

Pour découper les traitements en tâches parallèles dans le cluster, la solution consiste à utiliser le MapReduce. Mais, le mapreduce, qu’est ce que c’est véritablement ?

Le MapReduce est d’abord et avant tout un modèle algorithmique, c’est-à-dire une manière de penser le découpage d’un problème en tâches. Il consiste à découper le traitement d’un fichier de données en tâches indépendantes en suivant 2 phases : une phase Map, une et une phase Reduce. L’utilisateur spécifie une fonction de hachage[1] Map qui transforme les données d’entrée en paires de clés/valeurs, et une fonction de hachage Reduce qui agrège toutes les valeurs associées à la même clé. Une phase intermédiaire entre le Map et le Reduce, appelée shuffle trie les paires de clés/valeurs générées par clé.

Ce programme, écrit dans un style fonctionnel, est automatiquement parallélisé et exécuté dans un cluster. Une couche logicielle installée sur le cluster (YARN, Kubernetes ou Mesos) s’occupe des détails du partitionnement et du stockage des données, de la planification de l’exécution du programme à travers tous les nœuds du cluster, de la gestion des pannes, et de la gestion de la communication entre les nœuds du cluster.

Plus bas dans la chronique, nous expliquons de façon détaillée comment le MapReduce est exécuté sur le cluster.  En fait, avec le MapReduce, vous écrivez des programmes mono-tâches, et vous obtenez le parallélisme gratuitement ! Ceci permet aux programmeurs n’ayant aucune expérience en programmation parallèle et distribuée d’utiliser facilement les ressources du cluster.

En fait, le but véritable du MapReduce est de fournir aux développeurs, une abstraction qui masque la complexité des opérations liées au parallélisme, la distribution des traitements de données, la gestion de leur exécution dans le cluster, et la gestion des défaillances qui peuvent survenir dans le cluster pendant le traitement. Le mapreduce est un style de programmation qui facilite l’exploitation des ressources d’un cluster grâce à une approche “divide & conquer” où tout travail sera divisé en tâches et chaque tâche sera isolée dans un noeud et traitée par ce nœud. Maintenant que vous comprenez ce qu’est le mapreduce, entrons dans les détails de son fonctionnement.

Attention !!! Les phases Map, Shuffle et Reduce sont complètement indépendantes. Cela signifie que l’utilisateur n’est pas obligé d’écrire à chaque fois une phase Map, puis une phase Reduce. Il peut écrire une phase Map sans écrire de phase Reduce.


[1] Une fonction de hachage (hash function en anglais), est une fonction qui assigne à une donnée d’entrée une valeur particulière. La table de hachage, générée par une fonction de hachage, fait correspondre la donnée d’entrée à sa valeur assignée correspondante à l’aide d’une clé, 

2 – Détails des différentes phases du MapReduce

Comment arrive t’on grâce au mapreduce à diviser les traitements en tâches et ainsi obtenir le parallélisme à large échelle ? C’est à cette question que nous allons répondre dans cette partie.

Le Mapreduce est un modèle algorithmique qui fournit un style de programmation fonctionnelle tye “divide & conquer” qui découpe automatiquement les traitements de données en tâches et les isole sur les noeuds d’un cluster. Ce découpage est réalisé en 3 phases (ou 3 étapes) : une phase map(), une phase shuffle() et une phase reduce(). Regardons ensemble de très près chacune de ces phases.

2.1 – La phase Map

Dans cette phase, le fichier de données à traiter a déjà été partitionné dans le HDFS ou le système de fichiers distribué du cluster. A chaque partition des données est affectée une tâche map. Ces tâches Map se sont en réalité des fonctions qui transforment  la partition à laquelle chacune est assignée en paires de clé/valeurs. La façon dont les données d’entrée sont transformées en clé/Valeurs est à la discrétion de l’utilisateur. Attention, pour ceux qui travaillent dans le développement des bases de données, le terme « clé » peut générer de la confusion. Les clés générées ici ne sont pas les « clés » dans le sens de « clé primaire » des bases de données relationnelles, elles ne sont pas uniques, ce sont juste des nombres, des identifiants arbitraires qui sont affectés aux valeurs de la paire. La spécificité cependant est qu’à toutes les valeurs identiques de la partition sont affecté la même clé. Pour que vous compreniez mieux cela, prenons l’illustration du comptage de mots dans une pile de 3 documents. Chaque document dans la pile est une partition stockée dans un nœud du cluster, la fonction avec pour argument (clé, valeur) est définie de la façon suivante : M(k,v) = List (k,v) avec k la clé, qui est ici le mot contenu dans le document, et v, la valeur, qui représente ici la référence du document dans lequel est situé la clé. La figure suivante illustre l’exécution du traitement Map. 

phase map de l'exécution du mapreduce hadoop
Figure : exécution de la phase Map()

2.2 – La phase Shuffle

Une fois que toutes les tâches Map sont achevées (c’est-à-dire lorsque tous les nœuds du cluster ont fini d’exécuter la fonction Map qui leur a été assignée), la phase Shuffle démarre. Cette phase consiste d’une part à trier par clé, toutes les paires clé/valeurs générées par la phase Map, et d’autre part à regrouper dans une liste pour chaque clé, l’ensemble de ses valeurs éparpillées  à travers les nœuds auxquels a été assignée la fonction Map. Formellement, les paires de clés/valeurs à cette phase ont la forme suivante : (k,[v1,v2,v3,…………..,vn])

Au niveau du Nœud central du cluster, un processus qui gère l’exécution des tâches Map est actif. Ce processus, souvent appelé processus maître, connait le nombre de tâches Reduce qu’il va y avoir. Appelons r  ce nombre. L’utilisateur, lorsqu’il spécifie la fonction MapReduce, spécifie également le nombre de tâches Reduce qui sera nécessaire pour le traitement efficace de sa fonction. r processus exécutant les r tâches Reduce vont être déclenchées au niveau de r nœuds du cluster. Ces r nœuds, chacun à partir d’une fonction de hachage, vont créer un fichier chacune pour sa tâche Reduce et vont attribuer à ce fichier un nombre entre 0 et r – 1.  Par ailleurs, chaque clé qui est générée à l’issue de la phase Map est hachée (c’est-à-dire est associée à un nombre entre 0 et r-1, cela permet de lui affecter à une tâche Reduce précise). Les processus Reduce vont à l’aide de ce hachage être capables d’identifier tous les fichiers Map qui correspondent à leur tâche Reduce   et effectuer une jointure verticale[2] de ces fichiers. Enfin, dans le fichier joint, le cluster trie et réorganise les paires clé/valeurs sous forme de listes de clé/valeurs. C’est cette liste qui se présente formellement sur la forme (k,[v1,v2,v3,…………..,vn]) (k,v1),(k,v2),(k,v3),…,(k,vn) sont les paires de clé/valeurs, avec k la clé venant de toutes les tâches Map. La figure ci-après illustre la phase Shuffle et ce qui se passe dans les r fichiers où se produisent le Shuffle.


[2] Une jointure verticale est une concaténation de fichiers en lignes. Elle se différence de la jointure classique qui concatène les fichiers de données en colonnes selon une clé unique.

Phase Shuffle - Hadoop mapreduce
Figure : Phase Shuffle avec r = 2 (2 tâches Reduce). Après le tri, la liste de clé/valeurs est générée dans le fichier qui va être passé au Reduce. Les valeurs y sont rangées par clé sous forme de liste. Le hachage des fichiers Map permet d’identifier leur tâche Reduce et de les regrouper. Cela se traduit par le r0 et r1 que nous avons ajoutés en première ligne de chaque fichier Map.

2.3 – La phase Reduce

La phase Shuffle s’achève avec la construction des r fichiers contenant les listes de clés/valeurs qui vont servir d’arguments à la fonction Reduce. Le but de cette phase est d’agréger les valeurs des clés reçues par le Shuffle et de joindre verticalement l’ensemble des r fichiers pour obtenir le résultat final. L’utilisateur définit dans la fonction Reduce l’agrégat qu’il veut utiliser, par exemple la somme, le comptage,…etc., et ce qu’il souhaite faire des résultats : soit les afficher à l’aide d’une instruction « print », soit les charger dans une base de données ou soit les envoyer à un autre job MapReduce. Prenons l’exemple précédent et supposons qu’on souhaite à l’aide du Reduce compter le nombre d’occurrence de chaque mot dans l’ensemble de la pile des 3 documents. La figure ci-après illustre le résultat.

Phase reduce - hadoop mapreduce
Figure : exécution de la phase reduce. phase Reduce. Dans cette phase, une fonction d’agrégation (ici count<value>) est appliquée à la liste des valeurs reçues par le shuffle.

Maintenant, vous avez compris conceptuellement comment le mapreduce parvient à découper les traitements à exécuter sur un cluster en tâches parallèles. Vous avez également compris que la programmation MapReduce consiste à écrire 2 fonctions : Map (),  Reduce (),  et le système gère pour vous l’exécution parallèle, et la coordination des tâches dans le cluster. Mais, quel est le lien avec Hadoop, et comment s’exécute t’il dans un cluster Hadoop ? C’est ce que nous allons voir dans la partie suivante. En attendant, si vous souhaitez voir des cas d’usages plus complets du mapreduce, nous vous recommandons cette chronique : Hadoop vs Teradata : les approches technologiques d’interrogation d’une base de données en Big Data

3 – Détails d’exécution du MapReduce dans un cluster Hadoop

Soyez attentif, car dans cette partie, nous allons présenter succinctement les étapes d’exécution d’un traitement MapReduce dans un cluster Hadoop. Les étapes d’exécution du MapReduce dans un cluster Hadoop sont assez similaires aux étapes citées dans la partie précédente. Mais il existe tout de même quelques changements stratégiques dans Hadoop.

Le traitement MapReduce écrit par l’utilisateur s’appelle un job MapReduce dans la terminologie du cluster Hadoop et s’exécute en 7 étapes :

  1. Au départ, l’utilisateur configure le Job MapReduce : il écrit la fonction Map, la fonction Reduce, spécifie le nombre r de tâches Reduce, le format de lecture du fichier d’entrée, le format de sortie des r fichiers Reduce, éventuellement la taille des blocs du fichier d’entrée et le facteur de réplication[1]. Une fois que tout cela est fait et qu’il déclenche l’exécution du job, l’orchestrateur de ressources (YARN ou Kubernetes) démarre les r tasktrackers dans des containers au niveau des noeuds qui vont effectuées les r tâches Reduce que l’utilisateur a spécifiées ;
  2. Le HDFS partitionne le fichier d’entrée en blocs de taille fixe, généralement 64 Mo par bloc (sauf si l’utilisateur a spécifié une taille de bloc différente à la première étape). Ensuite, le HDFS réplique ces blocs selon le facteur de réplication définie par l’utilisateur (3 par défaut) et les distribue de façon redondante dans des nœuds différents dans le cluster.  Le fait de partitionner le fichier d’entrée en blocs de taille fixe permet de répartir de façon équilibrée la charge de traitement parallèle entre les nœuds du cluster, ce qui permet aux tâches de s’achever à peu près au même moment dans l’ensemble des nœuds  du cluster ;
  3. Par défaut, le l’orchestrateur déclenche M tasktrackers sur les M nœuds de données dans lesquels ont été répartis les M partitions du fichier d’entrée, pour exécuter les tâches Map, soit un tasktracker Map pour chaque bloc de fichier. Après, étant donné que chaque tasktracker s’exécute dans un container au niveau du noeud, il n’est pas exclu que plusieurs containers soient déclenchés dans le même noeud. Chaque tasktracker lit le contenu de sa partition par rapport au format d’entrée spécifié par l’utilisateur, le transforme par le processus de hachage définie dans la fonction Map en paires de clés/valeurs. Ce processus de hachage s’effectue en mémoire locale du nœud ;
  4. Périodiquement, dans chaque nœud, les paires de clés/valeurs sont sérialisées dans un fichier sur le disque dur local du nœud. Ensuite ce fichier est partitionné en r régions (correspondant aux r tâches Reduce spécifiées par l’utilisateur) par une fonction de hachage qui va assigner à chaque région  une clé qui correspond à la tâche Reduce à laquelle elle a été assignée.  Les informations sur la localisation de ces régions  sont transmises à l’orchestrateur, qui fait suivre ces informations aux r tasktrackers qui vont effectuer les tâches Reduce ;
  5. Lorsque les r tasktrackers Reduce sont notifiés des informations de localisation, ils utilisent des appels de procédures distantes (protocole RPC) pour lire depuis le disque dur des nœuds sur lesquels les tâches Map se sont exécutées, les régions des fichiers Map leur correspondant. Ensuite, ils les trient par clé. Notez au passage que le tri s’effectue en mode batch dans la mémoire du tasktracker Reduce. Si les données sont trop volumineuse, alors cette étape peut augmenter de façon significative le temps total d’exécution du job ;
  6. Les tasktrackers Reduce itèrent à travers toutes les données triées et pour chaque clé unique rencontrée, ils la passent avec sa valeur à la fonction Reduce écrite par l’utilisateur. Les résultats du traitement de la fonction Reduce sont alors sérialisés dans le fichier ri (avec i l’indice de la tâche Reduce) selon le format de sortie spécifié par l’utilisateur. Cette fois-ci, les fichiers ne sont pas sérialisés dans le disque dur du nœud tasktracker, mais dans le HDFS, ceci pour des raisons de résilience (tolérance aux pannes) ;
  7. Le job s’achève là, à ce stade, les r fichiers Reduce sont disponibles et Hadoop applique en fonction de la demande de l’utilisateur, soit un « Print Ecran », soit leur chargement dans un SGBD, soit alors leur passage comme fichiers d’entrée à un autre job MapReduce ;

La figure suivante récapitule en visuel ces 7 étapes. Attention, prenez votre temps pour lire le graphique et bien le comprendre.


[1] Le facteur de réplication c’est le nombre de fois qu’un bloc de fichier est répliqué dans le cluster par le HDFS. Il est définit par l’utilisateur et par défaut est égal à 3. Par exemple, un facteur de réplication égal à 3 signifie que chaque bloc de fichier est répliqué 3 fois dans 3 différents nœuds du cluster par le HDFS.

exécution du mapreduce dans un cluster hadoop - hadoop mapreduce
Figure : étapes d’exécution d’un job MapReduce dans un cluster Hadoop. La couleur jaune traduit les traitements, la verte représente la RAM, le blanc représente les opérations d’accès à la données et les cylindres bleus les fichiers Map.

A partir de la version 2 d’Hadoop, ce schéma d’exécution change un peu, car désormais Hadoop supporte plusieurs modèles algorithmique au dela du mapreduce. Cela est possible grâce au remplacement du JobTracker par un orchestrateur de ressources, notamment YARN.

Fondamentalement, YARN sépare le rôle d’exécution des jobs MapReduce (via les taskTackers) de celui du suivi de la progression de l’exécution des tâches dans le cluster en deux processus indépendants : un processus gestionnaire appelé Resource Manager, qui gère l’utilisation des ressources du cluster, et un processus applicatif appelé Application Master, qui gère le cycle de vie des jobs tournant sur le cluster. L’idée du YARN est que le processus Application Master négocie avec le processus Ressource Manager les ressources du cluster (mémoire, CPU, Disque dur, Réseau, etc.), décrites en termes de containers ; chaque container ayant une limite mémoire qu’il ne peut pas dépasser. Des jobs de calcul distribués tels que les jobs MapReduce, s’exécutent à l’intérieur de ces containers. Les containers sont surveillés par des processus appelés Node Managers, exécutés sur les nœuds de données pour s’assurer que l’application ou le job qui s’exécute dans le container n’utilise pas plus de ressources que celles qu’il lui a été accordées.

Contrairement au JobTracker qui gère seul tous les jobs MapReduce qui sont exécutés dans le cluster, avec le YARN chaque instance d’application ou job possède une Application Master, qui s’exécute pour la durée du job. En d’autres termes, il y’a autant de « négociateur de ressources » (Resources Negotiator) ou Application Master, qu’il y’a de jobs.

C’est grâce à ce mode de fonctionnement que les orchestrateurs de ressources tels que YARN parviennent à faire coexister dans le même cluster différents modèles de calcul distribué et c’est également grâce à cela qu’ils peuvent exécuter de multiples jobs simultanément, là où le JobTracker était obligé d’exécuter plusieurs jobs de façon séquentielle. C’est important de noter que tous les orchestrateurs de ressources fonctionne quasiment exactement de cette façon, que ce soit Kubernetes ou Mesos.

La figure suivante illustre l’exécution du mapreduce dans un cluster Hadoop dans lequel YARN est utilisé comme orchestrateur de ressources.

exécution du mapreduce dans un cluster Hadoop contenant YARN
Figure : A partir de Hadoop 2, les instances ou tâches des jobs sont exécutés chacun dans des containers orchestrés par un orchestrateur de ressources type YARN ou Kubernetes

Voilà ! Ceci marque la fin de cette chronique. Si vous êtes arrivés à ce stade, cela signifie que vous avez compris comment le mapreduce permet de résoudre le problème de parallélisme de traitement et comment il est exécuté sur des clusters Hadoop modernes. Nous avons volontairement omis de nombreux détails importants pour vous aider à focaliser votre attention sur les éléments essentiels du traitement massivement parallèle avec le Mapreduce. Vous vous doutez bien que nous ne pouvons pas aborder un sujet si complexe en une seule chronique. Nous vous recommandons de vous procurer le premier ouvrage de ce projet pour vous affermir dans les principes du traitement massivement parallèle, l’exécution du mapreduce dans un cluster hadoop et les architectures de traitement appropriées pour valoriser les données massives. Cliquez sur le bouton suivant pour vous le procurer.

>