Internet, l’apparition de la 3G, 4G et actuellement de la 5G, les grandes installations scientifiques génèrent une immensité de données traitables. En 2020 une analyse a révélé que 40 zéctaoctets sont générés par an, 204 millions d’e-mails sont envoyés chaque minute. L’ensemble de ces données ainsi que les enjeux qui y sont associés représente la notion de Big Data.

Concevoir une infrastructure Big data aujourd’hui consiste à distribuer de manière intelligente et efficace le stockage et les traitements de ces données. Il s’agit principalement de repenser le stockage comme le fait d’utiliser systèmes de fichiers tels que HDFS de Hadoop et les algorithmes de traitement des données comme l’utilisation de MapReduce ou de Spark.

Dans ce tutoriel, nous allons vous expliquer comment utiliser Spark en Python via l’API PySpark. Ce tutoriel est la suite de la série de tutoriels sur la programmation en Big Data avec Python.

Qu’est-ce qu’Apache Spark ?

Apache Spark est un puissant moteur de traitement open-source, rapide et facile d’utilisation. Il est conçu pour les analyses sophistiquées de larges volumes de données de manière distribuée (cluster computing). On peut l’utiliser avec des API en Java, Scala, Python, R et SQL. Spark exécute des programmes jusqu’à 100 fois plus vite que Hadoop MapReduce en mémoire, ou 10 fois plus vite sur disque. Il peut être utilisé pour créer des applications de données en tant que bibliothèque, ou pour effectuer des analyses de données ad hoc de manière interactive.

Spark contient une pile de bibliothèques comprenant Spark SQL, MLlib pour l’apprentissage automatique, GraphX pour le traitement des graphes et Spark Streaming. Vous pouvez combiner ces bibliothèques de manière transparente dans une même application. De plus, Spark fonctionne sur un ordinateur portable, sur Hadoop, sur Apache Mesos, de manière autonome ou dans le cloud. Il peut accéder à diverses sources de données, notamment HDFS, Apache Cassandra, Apache HBase et S3.

Qu’est-ce que PySpark et quelles sont ses caractéristiques ?

PySpark est une interface pour Apache Spark en Python. Elle vous permet non seulement d’écrire des applications Spark à l’aide d’API Python, mais fournit également le shell PySpark pour analyser interactivement vos données dans un environnement distribué. PySpark supporte la plupart des fonctionnalités de Spark telles que Spark SQL, DataFrame, Streaming, MLlib (Machine Learning) et Spark Core.

fonctionnalités PySpark
Figure : architecture et composants d’Apache Spark

Pyspark est une alternative plus puissante que Pandas Python lorsqu’il s’agit du traitement de gros volume de données. Les principales caractéristiques qui font de Pyspark la librairie de traitement de données de Python la plus puissante sont les suivantes :

  • Calcul en temps réel : PySpark permet le calcul en temps réel sur une grande quantité de données car il se concentre sur le traitement en mémoire. Il présente une faible latence.
  • Support de plusieurs langages : la librairie PySpark est adapté à divers langages de programmation tels que Scala, Java, Python et R. Sa compatibilité en fait une librairie de choix pour le traitement d’énormes ensembles de données.
  • Mise en cache et constance du disque : la librairie PySpark offre une mise en cache puissante et une bonne constance du disque.
  • Traitement rapide : PySpark nous permet d’atteindre une grande vitesse de traitement des données, qui est environ 100 fois plus rapide en mémoire et 10 fois plus rapide sur le disque.
  • Fonctionne bien avec les RDD : le langage de programmation Python est dynamiquement typé, ce qui facilite le travail avec les RDD. Nous en apprendrons davantage sur les RDD en utilisant Python dans la suite du tutoriel.

Pourquoi devez-vous apprendre PySpark ?

Les données générées chaque jour contiennent des tendances du marché, les préférences des clients et d’autres informations commerciales utiles. Il est nécessaire d’extraire des informations précieuses de ces données brutes. Nous avons besoin d’un outil efficace, évolutif et flexible  pour effectuer différents types d’opérations sur les données volumineuses et en  tirer profit.

Apache Spark est l’un des outils les plus étonnants qui permettent de traiter les données volumineuses. Comme nous le savons, Python est l’un des langages de programmation les plus utilisés par les spécialistes de la donnée. En raison de sa simplicité et de son interface interactive, les spécialistes de la donnée lui font confiance pour effectuer des analyses de données, de l’apprentissage automatique et bien d’autres tâches sur les big data.

Ainsi, la combinaison de Python et de Spark serait très efficace pour le monde du big data. C’est pourquoi la communauté Apache Spark a créé un outil appelé PySpark, qui est une API Python pour Apache Spark.

PySpark est utilisé dans plusieurs secteurs notamment dans :

L’industrie du divertissement

L’industrie du divertissement est l’un des plus grands secteurs qui se développe vers le streaming en ligne. La célèbre plateforme de divertissement en ligne Netflix utilise Apache Spark pour le traitement en temps réel afin de personnaliser les films en ligne ou les séries web pour ses clients. Elle traite environ 450 milliards d’événements par jour qui sont diffusés en continu sur l’application côté serveur.

 Le secteur commercial

Le secteur commercial utilise également le système de traitement en temps réel d’Apache Spark. Les banques et d’autres secteurs financiers utilisent Spark pour récupérer le profil de média social du client et l’analyser afin d’obtenir des informations utiles qui peuvent aider à prendre la bonne décision. Les informations extraites sont utilisées pour l’évaluation du risque de crédit, les publicités ciblées et la segmentation des clients.

Spark joue un rôle important dans la détection des fraudes et est largement utilisé dans les tâches d’apprentissage automatique.

Le secteur de la santé

Apache Spark est utilisé pour analyser les dossiers des patients ainsi que les données des rapports médicaux afin d’identifier les patients susceptibles de rencontrer des problèmes de santé après leur sortie de la clinique.

Commerce et E-commerce

Les principaux sites de commerce e-commerce comme Flipkart, Amazon, etc., utilisent Apache Spark pour la publicité ciblée. D’autres sites comme Alibaba proposent des offres ciblées, améliorent l’expérience client et optimisent les performances globales.

Démarrer avec PySpark

Dans cette partie de cet article nous verrons comment installer PySpark et nous étudierons les différentes structures proposées par Pandas.

Pour une meilleure maîtrise de cette librairie vous devez avoir une connaissance de base sur le langage de programmation python et une connaissance de l’architecture Apache Spark. Notre article sur la programmation Python pour la data vous donnera toutes les bases nécessaires pour mieux appréhender cet article.

Installation de PySpark

La façon la plus simple d’installer non seulement Pyspark, mais aussi Python et ses paquets les plus populaires (IPython, NumPy, Matplotlib, …) est avec Anaconda, une distribution Python multiplateforme (Linux, macOS, Windows) pour l’analyse de données et le calcul scientifique.

Les instructions d’installation pour Anaconda sont disponibles ici.

Après l’installation de Anaconda l’installation de PySpark se fait avec les commandes suivantes :

conda create -n pyspark_env
conda env list
conda activate pyspark_env
conda install pyspark

Ça y est ! PySpark est installé sur votre machine. Vous devez aussi avoir Apache Spark installé sur votre machine. Les instructions pour l’installation d’Apache Spark sont disponibles ici.

 On va pouvoir commencer les choses sérieuses. Dans la prochaine section nous parlerons de la structure de données la plus importante de PySpark.

PySpark RDD

Les RDDs sont la partie la plus essentielle de PySpark ou nous pouvons dire l’épine dorsale de PySpark. C’est l’une des structures de données fondamentales, qui peut traiter des données structurées et non structurées.

Pour interagir avec PySpark, vous créez des structures de données spécialisées appelées Resilient Distributed Datasets (RDD).

Les RDDs cachent toute la complexité de la transformation et de la distribution automatique de vos données sur plusieurs nœuds par un planificateur si vous travaillez sur un cluster.

Vous pouvez commencer à créer des RDD une fois que vous avez un SparkContext.

Vous pouvez créer des RDDs de plusieurs façons, mais l’une des plus courantes est la fonction PySpark parallelize().

parallelize()

parallelize() peut transformer certaines structures de données Python comme les listes et les tuples en RDDs, ce qui vous donne une fonctionnalité qui les rend tolérants aux pannes et distribués.

Pour mieux comprendre les RDD, analysons le code suivant. Le code suivant crée un itérateur de 10 000 éléments, puis utilise parallelize() pour distribuer ces données en 2 partitions :

from pyspark import SparkContext
sc = SparkContext()
ma_liste = range(10000)
rdd = sc.parallelize(ma_liste, 2)
nombres_impairs = rdd.filter(lambda x: x % 2 != 0)
nombres_impairs.take(5)

parallelize() transforme cet itérateur en un ensemble distribué de nombres et vous offre toutes les possibilités de l’infrastructure de Spark.

Remarquez que ce code utilise la méthode filter() du RDD au lieu de la méthode filter() intégrée de Python, que vous avez vue précédemment. Le résultat est le même, mais ce qui se passe en coulisses est radicalement différent. En utilisant la méthode RDD filter(), cette opération se produit de manière distribuée sur plusieurs CPU ou ordinateurs.

Encore une fois, imaginez que c’est Spark qui fait le travail de multitraitement pour vous, le tout encapsulé dans la structure de données RDD.

take()

take() est un moyen de voir le contenu de votre RDD, mais seulement un petit sous-ensemble. Elle tire ce sous-ensemble de données du système distribué sur une seule machine.

Elle est important pour le débogage car il n’est pas toujours possible d’inspecter l’ensemble des données sur une seule machine. Les RDD sont optimisés pour être utilisés sur des données volumineuses. Dans un scénario réel, une seule machine peut ne pas avoir assez de RAM pour contenir l’ensemble de vos données.

Une autre façon de créer des RDDs est de lire un fichier avec textFile(). Les RDDs sont l’une des structures de données fondamentales pour l’utilisation de PySpark, de sorte que de nombreuses fonctions de l’API renvoient des RDDs.

L’une des principales distinctions entre les RDD et les autres structures de données est que le traitement est retardé jusqu’à ce que le résultat soit demandé. Les développeurs de l’écosystème Python utilisent généralement parle de lazy evaluation.

Vous pouvez empiler plusieurs transformations sur le même RDD sans qu’aucun traitement ne soit effectué. Cette fonctionnalité est possible parce que Spark maintient un graphe acyclique dirigé des transformations. Le graphe sous-jacent n’est activé que lorsque les résultats finaux sont demandés. Dans l’exemple précédent, aucun calcul n’a eu lieu jusqu’à ce que vous demandiez les résultats en appelant take().

Il existe plusieurs façons de demander les résultats d’un RDD. Vous pouvez demander explicitement que les résultats soient évalués et collectés sur un seul nœud de cluster en utilisant collect() sur un RDD. Vous pouvez également demander implicitement les résultats de différentes manières, l’une d’entre elles étant l’utilisation de count().

En résumé les RDD possèdent deux types de méthodes

  • Les transformations qui donnent en sortie un autre RDD
  • Les actions qui donnent en sortie autre chose qu’un RDD.

La liste complète des transformations et des actions possibles sur un RDD est disponible dans la documentation spark mais les tableaux suivants résument respectivement les transformations et les actions les plus fréquemment utilisés :

Les transformations

TransformationsRôle
map(func)applique une fonction à chacune des données
filter(func)permet d’éliminer certaines données comme dans notre exemple rdd.filter(lambda x : x % 2 != 0) permet d’éliminer les nombres pairs
flatMap(func)similaire à map, mais chaque élément d’entrée peut être mappé à 0 ou plusieurs éléments de sortie (donc, func devrait retourner une Seq plutôt qu’un seul élément)
distinct()supprime les doublons
groupsByKey()transforme des clés-valeurs (K,V) en (K,W) où W est un object itérable. Par exemple, (K,U) et (K,V) seront transformées en (K, [U,V])
reduceByKey(func)applique une fonction de réduction aux valeurs de chaque clé. Comme on a pu le voir, la fonction de réduction est appelée avec deux arguments. Si ce genre de chose vous intéresse, sachez qu’il est attendu que la fonction de réduction soit commutative et associative, c’est-à-dire que func(a,b) = func(b,a) et func(func(a,b),c) = func(a, func(b,c)). Par exemple, la fonction func peut renvoyer le maximum entre deux valeur
sortByKeyutilisée pour trier le résultat par clé
join(rdd)permet de réaliser une jointure, ce qui a le même sens que dans les bases de données relationnelles.
À noter qu’il existe également des fonctions OuterJoin et fullOuterJoin. Les jointures sont réalisées sur la clé

Les actions

ActionRôle
reduce(func)agrège les éléments de l’ensemble de données en utilisant une fonction func (qui prend deux arguments et en renvoie un). La fonction doit être commutative et associative afin qu’elle puisse être calculée correctement en parallèle
take(n)retourne un tableau avec les n premiers éléments du RDD
collect()retourne toutes les données contenues dans le RDD sous la forme d’une liste
count()retourne le nombre de données contenues dans le RDD

PySpark nous offre un module (PySpark SQL) qui nous permet d’interagir avec la composante d’Apache Spark charger des traitements de données structurées. Dans la section suivante nous parlerons de ce module : Spark SQL.

PySpark SQL

Spark introduit un module de programmation pour le traitement des données structurées appelé Spark SQL. Il fournit une abstraction de programmation appelée DataFrame, très similaire au RDD à la différence qu’il permet de stocker de manière distribuée les données structurées.

Les caractéristiques de Spark SQL

Les caractéristiques de Spark SQL sont les suivantes :

Intégré : Mélangez de manière transparente des requêtes SQL avec des programmes Spark. Spark SQL vous permet d’interroger des données structurées sous forme d’ensemble de données distribuées (RDD) dans Spark, avec des API intégrées en Python, Scala et Java. Cette intégration étroite permet d’exécuter facilement des requêtes SQL aux côtés d’algorithmes analytiques complexes.

Accès unifié aux données :  Chargez et interrogez des données provenant de diverses sources. Les Schema RDD fournissent une interface unique pour travailler efficacement avec des données structurées, notamment des tableaux Apache Hive, des fichiers parquet et des fichiers JSON.

Compatibilité Hive : Exécutez des requêtes Hive non modifiées sur des entrepôts existants. Spark SQL réutilise le front-end et le MetaStore de Hive, vous offrant une compatibilité totale avec les données, les requêtes et les UDF Hive existants. Il suffit de l’installer à côté de Hive.

Connectivité standard : Connectez-vous via JDBC ou ODBC. Spark SQL comprend un mode serveur avec une connectivité JDBC et ODBC standard.

Évolutivité : Utilisez le même moteur pour les requêtes interactives et longues. Spark SQL tire parti du modèle RDD pour prendre en charge la tolérance aux pannes en milieu de requête, ce qui lui permet d’évoluer vers des travaux de grande envergure. Vous ne vous souciez pas d’utiliser un moteur différent pour les données historiques.

Maintenant que nous avons fait un tour sur la définition et les caractéristiques de PySpark, passons à son étude complète.

Les classes du module PySpark SQL

Voici les classes que nous pouvons voir dans le module PySpark SQL :

pyspark.sql.SparkSession : Elle représente le point d’entrée principal pour les fonctionnalités DataFrame et SQL.

pyspark.sql.DataFrame : Elle représente une collection distribuée de données regroupées dans des colonnes nommées.

pyspark.sql.Column : Elle représente une expression de colonne dans un DataFrame.

pyspark.sql.Row : Elle représente une ligne de données dans un DataFrame.

pyspark.sql.GroupedData : Elle représente des méthodes d’agrégation, retournées par DataFrame.groupBy().

pyspark.sql.DataFrameNaFunctions : Il s’agit de méthodes de traitement des données manquantes (valeurs nulles).

pyspark.sql.DataFrameStatFunctions : Elle représente les méthodes pour la fonctionnalité de statistiques.

pyspark.sql.functions : Elle représente une liste de fonctions intégrées disponibles pour DataFrame.

pyspark.sql.types : Elle représente une liste de types de données disponibles.

pyspark.sql.Window : Elle est utilisée pour travailler avec les fonctions Window.

Créer un DataFrame PySpark

Vous pouvez créer manuellement un DataFrame PySpark en utilisant les méthodes afin de créer un DataFrame à partir d’un RDD, d’une liste ou d’un DataFrame existant.

Vous pouvez également créer un PySpark DataFrame à partir de sources de données telles que les formats TXT, CSV, JSON, ORV, Avro, Parquet, XML en lisant les systèmes de fichiers HDFS, S3, DBFS, Azure Blob, etc.

Dans cette section nous parlerons de comment créer DataFrame à partir de différentes sources.

Créer un DataFrame PySpark à partir d’un RDD

Une façon simple de créer manuellement un DataFrame PySpark est de le faire à partir d’un RDD existant.

from pyspark.sql import SparkSession
data = [("Java", "12"), ("Python", "17"), ("Scala", "18")]
spark = SparkSession.builder.appName('datatransitionnumerique.com').getOrCreate()
rdd = spark.sparkContext.parallelize(data) #création du RDD
df = rdd.toDF() #création du dataframe
df

Il est possible de créer un DataFrame PySpark à partir d’un RDD grâce à l’utilisation de la méthode createDataFrame() .

from pyspark.sql import SparkSession
data = [("Java", "12"), ("Python", "17"), ("Scala", "18")]
columns = ["language","avg"]
spark = SparkSession.builder.appName('datatransitionnumerique.com').getOrCreate()
rdd = spark.sparkContext.parallelize(data) #création du RDD
df2 = spark.createDataFrame(rdd).toDF(*columns)
df2.show()

Créer un DataFrame PySpark à partir d’un Schéma

Si vous souhaitez spécifier les noms de colonnes avec leurs types de données, vous devez d’abord créer le schéma StructType, puis l’affecter lors de la création d’un DataFrame.

from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",7000),
    ("Maria","Anne","Jones","39192","F",8000),
    ("Jen","Mary","Brown","","F",10000)
  ]

schema = StructType([ \
    StructField("Prenom",StringType(),True), \
    StructField("Deuxième nom",StringType(),True), \
    StructField("Nom",StringType(),True), \
    StructField("identifiant", StringType(), True), \
    StructField("genre", StringType(), True), \
    StructField("Salaire", IntegerType(), True) \
  ])
 
df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show(truncate=False)

Créer un DataFrame PySpark à partir d’un datasource

Dans la pratique, on crée généralement les DataFrames PySpark à partir de fichiers sources de données tels que CSV, Text, JSON, XML, etc.

PySpark supporte par défaut de nombreux formats de données sans avoir à importer de bibliothèques. Pour créer un DataFrame, vous devez utiliser la méthode appropriée disponible dans la classe DataFrameReader.

Créer un dataframe PySpark à partir d’un CSV

La méthode csv() de l’objet DataFrameReader  est utilisée pour créer un DataFrame à partir d’un fichier CSV. Vous pouvez également fournir des options telles que le délimiteur à utiliser, si vous avez des données citées, les formats de date, le schéma d’inférence, et bien d’autres encore.

spark = SparkSession.builder.appName('datatransitionnumerique.com').getOrCreate()
df_fromcsv = spark.read.csv("fichier.csv")
Créer un dataframe PySpark à partir d’un fichier JSON

PySpark est également utilisé pour traiter des fichiers de données semi-structurés comme le format JSON. Vous pouvez utiliser la méthode json() du DataFrameReader pour lire le fichier JSON dans le DataFrame. Voici un exemple simple.

df2 = spark.read.json("fichier.json")

Il est bien beau de savoir créer un DataFrame mais ne serait-il plus important de savoir comment y réaliser des requêtes ? Dans la prochaine section nous étudierons les différentes opérations possibles sur un Dataframe PySpark.

Les opérations possibles sur un Dataframe PySpark

Les opérations de sélections

Ce sont les différentes méthodes qui nous permettent d’accéder aux données enregistrées dans un DataFrame PySpark.

show()

.show() est utilisée pour afficher le contenu du DataFrame dans un format de tableau de lignes et de colonnes. Par défaut, il n’affiche que 20 lignes, et les valeurs des colonnes sont tronquées à 20 caractères.

Regardons cet exemple :

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('datatransitionnumerique.com').getOrCreate()
columns = ["Reseaux sociaux","abonnes"]
abonnes = [("Facebook",300000),
    ("Instagram", 200000),
    ("Twitter", 400000),
    ("Tik Tok", 500000)]
df = spark.createDataFrame(abonnes,columns)
df.show()
select()

Vous pouvez sélectionner une ou plusieurs colonnes du DataFrame en passant les noms des colonnes que vous souhaitez sélectionner à la fonction select(). Comme le DataFrame est immuable, cela crée un nouveau DataFrame avec les colonnes sélectionnées.

Considérons le data frame suivant :

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('datetransitionnumerique.com').getOrCreate()
data = [("James","Smith",30,"France"),
    ("Michael","Rose",25,"Allemagne"),
    ("Robert","Williams",45,"France"),
    ("Maria","Jones",54,"Espagne")
  ]
columns = ["Prenom","Nom","age","Pays"]
df = spark.createDataFrame(data = data, schema = columns)
df.show(truncate=False)

Ci-dessous les moyens de sélectionner une, plusieurs ou toutes les colonnes de ce DataFrame :

df.select("Prenom","Nom").show()
df.select(df.Prenom,df.Nom).show()
df.select(df["Prenom"],df["Nom"]).show()

#By using col() function
from pyspark.sql.functions import col
df.select(col("Prenom"),col("Nom")).show()

#Select columns by regular expression
df.select(df.colRegex("`^.*nom*`")).show()

Toutes ces sélections renvoient le Data Frame suivant :

.collect()

.collect() est une opération d’action qui est utilisée pour récupérer tous les éléments de l’ensemble de données (de tous les nœuds) vers le nœud pilote. Nous devrions utiliser collect() sur les petits ensembles de données, généralement après filter(), group(), etc. La récupération de plus grands ensembles de données entraîne une erreur OutOfMemory.

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('datatransitionnumerique.com').getOrCreate()
data = [("James","Smith",30,"France"),
    ("Michael","Rose",25,"Allemagne"),
    ("Robert","Williams",45,"France"),
    ("Maria","Jones",54,"Espagne")
  ]
columns = ["Prenom","Nom","age","Pays"]
df = spark.createDataFrame(data = data, schema = columns)
df.collect()

df.collect() récupère tous les éléments d’un DataFrame sous la forme d’un tableau de type Row. Ce code produit l’affichage ci-dessous :

[Row(Prenom='James', Nom='Smith', age=30, Pays='France'),
 Row(Prenom='Michael', Nom='Rose', age=25, Pays='Allemagne'),
 Row(Prenom='Robert', Nom='Williams', age=45, Pays='France'),
 Row(Prenom='Maria', Nom='Jones', age=54, Pays='Espagne')]
.filter() ou .where()

La méthode PySpark filter() est utilisée pour filtrer les lignes d’un RDD/DataFrame sur la base d’une condition donnée ou d’une expression SQL, vous pouvez également utiliser la clause where() au lieu de la fonction filter() si vous êtes un habitué du SQL, ces deux fonctions fonctionnent exactement de la même manière.

Reprenons le DataFrame précédent :

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('datatransitionnumerique.com').getOrCreate()
data = [("James","Smith",30,"France"),
    ("Michael","Rose",25,"Allemagne"),
    ("Robert","Williams",45,"France"),
    ("Maria","Jones",54,"Espagne")
  ]
columns = ["Prenom","Nom","age","Pays"]
df = spark.createDataFrame(data = data, schema = columns)
df.show(truncate=False)

Le code ci-dessous affiche les personnes vivant en France

df.filter(df.Pays == "France").show(truncate=False)

Si vous êtes un habitué du SQL, vous pouvez utiliser cette connaissance dans PySpark pour filtrer les lignes de DataFrame avec des expressions SQL.

df.filter("Prenom == 'James'").show()

En PySpark, pour filtrer les lignes d’un DataFrame en fonction de plusieurs conditions, vous pouvez utiliser une colonne avec une condition ou une expression SQL. Ci-dessous un exemple simple utilisant la condition AND (&), vous pouvez l’étendre avec des expressions conditionnelles OR(|), et NOT( !) si nécessaire.

df.filter( (df.Pays  == "France") & (df.age  > 40) ).show(truncate=False)  

Vous pouvez également filtrer les lignes du DataFrame en utilisant les méthodes startswith(), endswith() et contains() de la classe Column.

df.filter(df.Pays.startswith("A")).show()
df.filter(df.Prenom.contains("M")).show()
.orderBy() ou .sort()

La méthode .sort() ou orderBy() de PySpark DataFrame trie un DataFrame par ordre croissant ou décroissant sur la base d’une ou plusieurs colonnes, vous pouvez également utiliser les fonctions de tri SQL de PySpark.

Reprenons toujours le DataFrame utilisé l’exemple précédent :

df.sort("Prenom").show(truncate=False)

Imaginons que nous souhaitons faire un tri par ordre décroissant des âges sur le DataFrame, nous pouvons utiliser la méthode desc de la classe Column.

df.sort(df.age.desc()).show(truncate=False)

Les opérations d’aggrégations

PySpark fournit des fonctions d’agrégation standard intégrées, définies dans l’API de DataFrame, qui s’avèrent pratiques lorsque nous devons effectuer des opérations d’agrégation sur les colonnes de DataFrame. Les fonctions d’agrégation opèrent sur un groupe de lignes et calculent une seule valeur de retour pour chaque groupe.

.avg()

La fonction avg() renvoie la moyenne des valeurs de la colonne d’entrée.

Considérons le DataFrame suivant :

import pyspark
spark = SparkSession.builder.appName('datatransitionnumerique.com').getOrCreate()

data = [("James", "Ventes", 3000),
    ("Michael", "Ventes", 4600),
    ("Robert", "Ventes", 4100),
    ("Maria", "Finance", 3000),
    ("James", "Ventes", 3000),
    ("Scott", "Finance", 3300),
    ("Jen", "Finance", 3900),
    ("Jeff", "Marketing", 3000),
    ("Kumar", "Marketing", 2000),
    ("Saif", "Ventes", 4100)
  ]
schema = ["nom", "departement", "salaire"]
df = spark.createDataFrame(data=data, schema = schema)
df.show(truncate=False)
print("avg: " + str(df.select(avg("salaire")).collect()[0][0]))

La valeur de retour de cet exemple est donc :

avg: 3400.0

.count()

La fonction count() renvoie le nombre d’éléments dans une colonne.

Considérons le DataFrame précédent  et le code suivant :

from pyspark.sql.functions import  count
print("count: "+str(df.select(count("salaire")).collect()[0]))

Le code ci-dessus renvoie :

count: Row(count(salaire)=10)
.max()

La fonction max() renvoie la valeur maximale d’une colonne.

Reprenons toujours l’exemple précédent en incluant le code suivant :

from pyspark.sql.functions import  max
df.select(max("salaire")).show(truncate=False)

Cela renvoie :

Vous pouvez trouver l’ensemble des opérations possibles sur DataFrame ici.

Vous l’aurez remarqué, PySpark et Pandas ont des similitudes. Dans la prochaine section, nous verrons les différences entre ces deux outils.

PySpark vs Pandas

Pandas et Spark DataFrame sont conçus pour le traitement des données structurelles et semi-structurelles. Les deux partagent certaines propriétés similaires (structure, fonctions). Les quelques différences entre Pandas et un PySpark DataFrame sont les suivantes :

  •  Les opérations sur un DataFrame Pyspark s’exécutent en parallèle sur différents nœuds du cluster mais, dans le cas de Pandas, ce n’est pas possible.
  •  Les opérations dans un DataFrame PySpark sont paresseuses par nature (lazy evaluation) mais, dans le cas de pandas, nous obtenons le résultat dès que nous appliquons une opération.
  •  Dans un DataFrame PySpark, nous ne pouvons pas modifier le DataFrame en raison de sa propriété immuable, nous devons le transformer. Mais dans Pandas, ce n’est pas le cas.
  •  Les opérations complexes sont plus faciles à réaliser dans Pandas que dans un DataFrame PySpark.

Voilà ! Nous sommes arrivés au terme de ce tutoriel complet sur l’API Py Spark. Nous espérons que vous avez compris l’intérêt de ce module dans le marché et son intérêt dans votre carrière. Clairement PySpark et un outil à prendre en considération si vous souhaitez travailler dans le Big Data. C’est un outil qui s’apprivoise très rapidement. Si vous avez des questions par rapport à l’utilisation de ce module n’hésitez pas à la poser dans les commentaires. Pour aller plus loin dans vos compétences dans le Big Data, nous vous offrons cette formation sur l’écosystème Hadoop.


Juvénal JVC

Juvénal est spécialisé depuis 2011 dans la valorisation à large échelle des données. Son but est d'aider les professionnels de la data à développer les compétences indispensables pour réussir dans le Big Data. Il travaille actuellement comme Lead Data Engineer auprès des grands comptes. Lorsqu'il n'est pas en voyage, Juvénal rédige des livres ou est en train de préparer la sortie d'un de  ses livres. Vous pouvez télécharger un extrait de son dernier livre en date ici : https://www.data-transitionnumerique.com/extrait-ecosystme-hadoop/

  • MANIRAKIZA Idi Sosthène dit :

    Merci pour le tuto mais j’ai eu une erreur « RuntimeError: Java gateway process exited before sending its port number », comment résoudre. thanks

    • Juvénal JVC dit :

      Bonjour Sosthène,
      à quel niveau as-tu l’erreur ? Peux-tu nous envoyer une capture d’écran et ton code source ?

      Merci,

      Juvénal

  • Joseph dit :

    Top

  • >