Comprendre les RDD pour mieux Développer en Spark

Apache Spark est un moteur de calcul distribué In-Memory, le tout premier du genre. D’ailleurs, il est même le principal moteur In-Memory distribué du marché et de la fondation Apache. Spark a été conçu aussi bien pour le calcul Interactif que le calcul itératif. Spark affiche des performances qui sont 10 fois plus élevés que Hadoop sur les travaux itératifs (apprentissage statistique). 

Cette performance extraordinaire lui vient des RDD, Resilient Distributed Datasets. Dans ce tutoriel complet,  notre objectif est de  vous aider à comprendre les RDD afin que vous sachiez comment utiliser Spark de façon optimale pour tirer profit des données massives.

Contexte de base des RDD : de Hadoop à Spark

Les systèmes massivement parallèles classiques tels que Hadoop sont bâtis sur des modèles de calcul batch qui leur permettent de combiner les fonctionnalités du traitement sur disque à la scalabilité du cluster pour traiter des problèmes par définition simples à paralléliser.

Le MapReduce, le modèle batch pionnier dans ce domaine, est le modèle qui est à la source d’Hadoop. Malgré sa simplicité, il n’est pas adapté à toutes les problématiques, spécialement celles qui impliquent le traitement interactif et le traitement itératif. En effet,  Le MapReduce a été conçu pour s’exécuter comme un graphe acyclique direct à 3 vertices (le Map, le Shuffle et le Reduce). Il s’exécute toujours de la façon suivante : Map -> Shuffle -> Reduce. Comme la figure ci-après l’illustre, l’ordre d’exécution qui est imposé au cluster est toujours séquentiel et non-itératif.

spark mapreduce
Figure : le MapReduce est modèle orienté batch qui s’exécute de façon séquentiel et direct.

Même si les modèles batch comme le MapReduce permettent d’exploiter au maximum la caractéristique de « commodité » des clusters, ils ont pour principal défaut de ne pas être adaptés pour certaines applications, notamment celles qui réutilisent les données à travers de multiples opérations telles que la plupart des algorithmes d’apprentissage statistique, itératifs pour la plupart, et les requêtes interactives d’analyse de données.

Spark apporte une réponse satisfaisante à ces limites grâce à son abstraction de données principale appelée RDD (Resilient distributed dataset – jeu de données distribué et résilient).  Le RDD est une “collection” d’éléments partitionnée et distribuées à travers les nœuds du cluster.   Grâce au RDD, Spark parvient à exceller sur les tâches itératives et interactives tout en conservant la scalabilité et la tolérance aux pannes du cluster.

Caractéristiques des Resilients Distributed Datasets

le RDD, Resilient Distributed Dataset (Table Distribuée et Résiliente ou Jeu de données résilient et distribué) est une abstraction distribuée en mémoire qui permet aux développeurs d’effectuer des calculs parallèles en mémoire sur un cluster de façon complètement tolérante aux pannes. Le RDD a été crée pour résoudre le problème que pose les algorithmes itératifs et les calculs intéractifs au MapReduce.

Formellement, le RDD est une « collection » (au sens Scala du terme) d’éléments partitionnés et répartis entre les nœuds du cluster et accessible uniquement en lecture-seule.  

Il agit comme une abstraction de partage de données dans un cluster. Lorsqu’on parle de « distribué » dans la définition de RDD, on fait en réalité référence à « shared », partagé, atomique, car il utilise la mémoire cache pour persister les données en RAM pour réutilisation, évitant ainsi la réplication des données sur disque, nécessaire dans Hadoop pour garantir la disponibilité du cluster. C’est grâce à ce mécanisme qu’il est capable de fournir de la haute disponibilité et la tolérance aux pannes au cluster. Nous y reviendrons plus bas.

Etant donné qu’un RDD est une abstraction, elle n’a pas d’existence réelle, dès lors elle doit être explicitement créée ou instanciée  à travers des opérations déterministes sur des fichiers de données existants ou sur d’autres instances de RDD. ces opérations d’instanciation de RDD sont appellées des “transformations“.  Nous y reviendrons plus bas.

Il est important à ce stade de noter que Spark a été développée en Scala et que les RDD, en tant que collection, ont simplement hérité des caractéristiques des collections du langage Scala (tableau, liste, tuple), à savoir :

Lazy computations : les calculs exécutés sur les RDD sont “paresseux”. Cela signifie que spark exécute les expressions uniquement lorsqu’elles sont nécessaires. Techniquement, c’est lorsqu’une action est déclenchée sur le RDD que celui-ci est exécuté.  Cela améliore grandement les performances de l’application.

– Immutable : les RDD sont immutables. Cela signifie qu’ils sont accessibles uniquement en lecture. On ne peut donc pas modifier un RDD. Cette caractéristique est très utile lors de la gestion d’accès concurrente aux données, spécialement dans un contexte de valorisation de données à large échelle…

– In-memory : les RDD sont exécutés en mémoire. Ils peuvent également être persistés en cache pour plus de rapidité. Les développeurs de Spark ont prévu la possibilité de choisir où persister les RDD (soit sur disque dur, soit en cache, soit en mémoire et sur disque) grâce à la propriété Storage.LEVEL.

Comment utiliser les RDD Spark en Scala ?

Spark expose ou met à disposition des utilisateurs, les RDD à travers une API développée en Scala (langage de base), en Java, ou en Python.  Les jeux de données dans les RDD sont représentées comme des objets (instances de classe) et les transformations sont invoquées en utilisant les méthodes de ces objets. D’ailleurs l’aspect fonctionnel de Scala se prête très bien à ce style d’opérations.

pour utiliser Spark, vous écrivez un programme pilote (un driver) qui implémente le flux de contrôle de haut-niveau de votre application et lance des opérations variées en parallèle. Spark fournit 2 abstractions principales pour la programmation parallèle en langage de programmation : les transformations de RDD et les opérations parallèles sur ces RDD.

En fait, utiliser les RDD revient à effectuer des transformations ( des opérations lazy qui créent un nouveau RDD) sur la base  soit des fichiers de données localisées ou non sur le HDFS, soit alors  des instances de RDD existants, et finit par l’utilisation des “actions“, qui sont quant à elles des fonctions qui retournent une valeur à l’application. Un exemple d’action inclut la méthode “count()” qui retourne le nombre d’éléments dans le jeu de données, “collect()” qui retourne les éléments eux-mêmes, “for each ” qui boucle les données dans une fonction fournie par l’utilisateur ou encore “persist()” qui persiste un RDD en mémoire. Le cœur de la programmation des RDD se situe vraiment sur l’utilisation des transformations et des actions. Ainsi, plus vous maîtrisez des transformations/actions, et plus votre maîtrise de Spark sera grande ! Dans notre formation “Maîtrisez Spark pour le Big Data avec Scala“, nous vous aidons à maîtriser les RDD, à effectuer ses transformations et ses actions. Le tableau suivant fournit un ensemble non-exhaustif des transformations et actions disponibles en programmation Spark.

Transformationsignification
Map()Crée un nouveau RDD avec les conditions spécifiées dans la fonction anonyme
Filter()Crée un nouveau RDD contenant les données répondant aux conditions de filtre
Flatmap()Crée un nouveau RDD au même titre que map(), à la seule différence que le nouveau RDD est applati.
Sample()Crée un nouveau RDD constitué d’un échantillon de données du RDD source.
Groupbykey()Effectue une opération GroupBy()
Reducebykey()Effectue une opération Reduce()
Union()Union de deux ou plusieurs RDD
Join()Jointure de deux ou plusieurs RDD
Sort()Tri du RDD
Tableau : liste des transformations courantes des RDD
Actionsignification
Count()Compte le nombre d’éléments présents dans le RDD
Collect()Transfère les données du RDD dans le nœud principal (ou driver) du cluster Spark
Reduce()Effectue une agrégation entre les éléments du RDD. Il combine les données en utilisant une fonction associative qui produit un résultat au programme pilote.
Save()Persiste les données du RDD sur le disque dur
Persist()“Persiste” les données du RDD (provenant du cache) en mémoire
Sum()Effectue une somme des éléments du RDD
Foreach()Permet d’effectuer des actions spécifiques pour chaque élément du RDD
Tableau : liste des actions courantes des RDD

Vous trouverez l’exhautisvité des actions/transformations des RDD dans la documentation officielle de la programmation en RDD.

Concrètement, vous pouvez construire les RDD à partir de l’une des 4 voies suivantes :

  • à partir d’un fichier dans le système de fichier distribué, comme le HDFS
  • en parallélisant une collection Scala dans le programme pilote dont les partitions seront réparties dans le cluster
  • en transformant  un RDD existant.
  • en sérialisant un RDD existant. Par défaut, les RDD sont chargées en mémoire. Les partitions du RDD sont sérialisées/persistées sous-demande lorsqu’ils sont utilisés dans une opération parallèle (par exemple: en passant un bloc de fichier à une fonction map), et sont supprimées de la mémoire après utilisation.

Voici quelques exemples en Scala sur IntelliJ :

  • création d’un RDD à partir d’une collection scala sur le driver grâce à la méthode parallelize()
def main(args: Array[String]): Unit = {
      val spark : SparkSession = SparkSession.builder().master("local[1]")
          .appName("Session Sparl")
          .getOrCreate()

      val rdd_test : RDD[Int] = spark.sparkContext.parallelize(List(1,2,3,4,5))
      val rddCollect:Array[Int] = rdd.collect()
      println("Action : premier element: "+rdd.first())
      rddCollect.foreach(println)
  }

val rdd = spark.sparkContext.parallelize(Seq(("Math", 500), 
  ("Chimie", 400), ("Java", 300)))
rdd.foreach(println)
  • à partir d’une source de données grâce au RDD.textfile()
 println("read all text files from a directory to single RDD")
  val rdd = spark.sparkContext.textFile("C:/tmp/files/*")
  rdd.foreach(f=>{
    println(f)
  })

Exécution des RDD dans un cluster

Pour utiliser Spark, les développeurs écrivent ce que Berkeley appelle un “Spark driver” (pilote Spark), qui est un peu l’équivalent d’un “Job MapReduce” en Hadoop. Ce pilote Spark se connecte aux  processus Workers qui tournent au niveau des noeuds de données du cluster, ensuite le pilote définit une ou plusieurs RDD sur lesquelles il déclenche une ou plusieurs actions. La figure suivante illustre l’exécution d’un programme dans un cluster Spark.

exécution des rdd dans un cluster Spark
Figure : exécution d’une application spark dans un cluster.

A la différence d’un cluster Hadoop, les processus Workers dans Spark sont des processus d’une durée de vie qui ne s’achève pas à la fin de l’exécution du programme Pilote, ils sont continuellement actifs, c’est pourquoi ils peuvent persister les partitions de RDD en mémoire.

En réalité, c’est la persistance en mémoire cache des partitions du RDD qui est la clé de la performance et de l’efficacité de Spark. En effet, les Workers utilisent la mémoire cache pour persister les données dans les partitions de RDD pour réutilisation, évitant ainsi la réplication des données sur disque, nécessaire dans Hadoop pour garantir la tolérance aux pannes du cluster. A chaque partition du RDD distribué dans les nœuds du cluster, Spark mémorise suffisamment d’informations sur elle de manière à ce que si un nœud venait à tomber en panne, sa partition serait simplement reconstruite à l’aide de ces informations. Ces informations s’appellent le lignage ou lignée du RDD (RDD lineage).  

Le lignage des RDD

Gardez toujours à l’esprit qu’en tant que  collection d’objets partitionnés à travers les machines du cluster, les RDD sont résilients. Cela signifie que les partitions d’un RDD peuvent être reconstruites en cas de perte ou de panne sans que ses éléments/objets ne soient persistés sur un disque dur.  Pour comprendre celà, il faut déjà comprendre comment la majorité des systèmes de traitement de données à large échelle font pour être tolérant aux pannes.

Pour être tolérant aux pannes, la majorité des systèmes Big Data passe par la réplication des données dans le disque dur des autres machines du cluster. La reprise est donc effectuée simplement par restauration ultérieure de ces données.  Cette approche quoique pratique, crée un problème sérieux : assurer la tolérance d’un nœud demande un nœud supplémentaire dans le cluster, en d’autres termes, la réplication demande un doublement ou une multiplication des coûts d’infrastructure du cluster.

Spark lui passe par un autre mécanisme pour gérer les problèmes de pannes dans le cluster lors du traitement, la restauration parallèle. Spark enregistre périodiquement des points de contrôle (checkpoints) de l’état des RDD en les répliquant de façon asynchrone dans d’autres nœuds du cluster Spark. Lorsqu’il y a panne de nœud, le système détecte les RDD manquants et relance leur lignage à partir du dernier état enregistré. Comme le RDD est une abstraction nativement distribuée, la réexécution du lignage se fait en parallèle sur tous les nœuds dont les RDD sont manquants, ce qui est plus rapide que la restauration classique qui a lieu par transfert de l’état du nœud de sauvegarde au nœud de calcul. Un mécanisme qui est capable de restaurer automatiquement les données perdues en cas d’échec ou de panne de système sans réplication est qualifié d’auto-résilient.

Les RDD sont auto-résilients, c’est-à-dire qu’ils peuvent restaurer automatiquement les données perdues en cas d’échec ou panne de système sans les répliquer. La restauration se fait par réexécution parallèle du lignage du RDD, c’est-à-dire par réexécution de l’ensemble des opérations qui ont permis de construire le RDD ;

Formellement, le lignage de RDD fait référence à l’ensemble des opérations qui ont permis de construire le RDD. Il permet de restaurer le RDD en cas de panne. Le lignage de RDD est similaire à la restauration d’une base de données relationnelle qui consiste à rejouer non pas les données, mais les commandes SQL du log qui ont permis d’obtenir ces données.

flux d'exécution du RDD dans le cluster Spark
Figure : flux d’exécution d’un RDD dans le cluster Spark

Les RDD réussissent la restauration automatique en traçant l’ensemble des opérations nécessaires pour obtenir les données (son lignage), un peu comme le log (journal de transactions) d’un SGBDR. Le log restaure la base de données non pas en stockant toutes les données échangées dans les transactions, mais en stockant les commandes SQL qui permettent d’obtenir ces données. Les opérations du RDD sont tracées dans son lignage sous forme d’un graphe de dépendance similaire à celui du MapReduce. Combiné à sa capacité à stocker les données en mémoire, le RDD est l’élément qui permet à Spark d’exécuter des traitements en mémoire avec des latences très basses.

lignage du RDD
Figure : illustration du lignage d’un traitement MapReduce classique

Attention !!! le lignage n’enregistre pas les données, mais le flux d’opérations qui permettent d’obtenir les données. C’est vrai que nous l’avons déjà dit plus haut, mais c’est important à garder à l’esprit car les autres systèmes enregistrent les états sous forme de données, et c’est ce qui fait que si le nœud contenant la base de données de sauvegarde tombe en panne, le système ne pourra pas restaurer l’état. Le lignage du RDD ne stocke pas de données, ce qui fait que même lorsque le nœud contenant la réplique de l’état du RDD tombe en panne, Spark est toujours en capacité de le restaurer. Le lignage est très important, c’est la clé de voûte de la tolérance aux pannes de Spark.

CONCLUSION

Là où les systèmes classiques de Big Data s’appuient sur un modèle batch acyclique direct (à l’exemple du MapReduce), qui ne sont pas appropriés pour les calculs itératifs comme la majorité des algorithmes de data science ou de machine learning/deep learning (réseau de neurones, clustering, k-means, regression logistique, etc…), Spark s’appuie sur une abstraction particulière appelée le RDD. Le RDD est une collection d’éléments partitionnées et distribuées dans le cluster. Là où la tolérance aux pannes dans les systèmes classiques est obtenue par réplication des données dans le cluster, en Spark celle-ci est obtenue par en traçage de l’ensemble des opérations nécessaires pour obtenir les données présentes dans le RDD. C’est pourquoi ceux-ci sont qualifiés d’auto-résilients et sont la fondation de la performance du framework Apache Spark.

En Big Data, la maîtrise de Spark est obligatoire dans la plupart des offres d’emploi. Nous mettons à votre disposition, une formation pointue qui vous permettra de devenir un spécialiste dans le développement d’applications Spark. Vous trouverez cette formation sur ce lien : Maîtrisez Spark pour le Big Data avec Scala.

>