De nombreux data scientists, data analysts, data engineers et utilisateurs de Business Intelligence s’appuient sur des requêtes SQL interactives pour explorer les données. Spark SQL est un module d’Apache Spark pour le traitement de données structurées.
Spark SQL fournit une abstraction de programmation appelée DataFrame et peut agir comme un moteur de requête SQL distribué. Ce module permet également aux requêtes Hadoop Hive non modifiées de s’exécuter 100 fois plus rapidement sur les déploiements et les données existants. En plus, il offre une forte intégration avec le reste de l’écosystème Spark (par exemple, en intégrant le traitement des requêtes SQL à l’apprentissage automatique).
Dans ce tutoriel exhaustif, nous allons vous apprendre tout ce que vous devez savoir pour exploiter les données massives et les bases de données à l’aide de Spark. A la fin de cette chronique, vous saurez l’utiliser dans vos projets Big Data.
Qu’est-ce que Spark SQL et quelles sont ses principales caractéristiques ?
Spark SQL est l’un des modules de Spark pour le traitement des données structurées. Contrairement à l’API de base spark RDD, l’interface offerte par spark SQL fournit à spark plus d’informations sur les structures de données et les calculs en cours, qu’il va utiliser pour effectuer des optimisations supplémentaires. Il existe de nombreuses façons d’utiliser spark SQL, notamment l’API SQL, l’API dataframe et l’API dataset. Il convient de noter que le moteur d’exécution est le même quelque soit la manière et le langage utilisés. Grâce à cette unification, les développeurs peuvent facilement passer d’une API à l’autre, ce qui rend le traitement des données plus souple. Concrètement, Spark SQL permettra aux développeurs de :
- Importer des données relationnelles à partir de fichiers Parquet et de tables Hive ;
- Exécuter des requêtes SQL sur les données importées et les RDDs existants ;
- Écrire facilement des RDDs vers des tables Hive ou des fichiers Parquet.
Spark SQL comprend également un optimiseur basé sur les coûts ( Cost-based Optimizer – appelé Catalyst), le stockage en colonnes et la génération de code pour accélérer les requêtes. En même temps, il peut évoluer vers des milliers de nœuds et des requêtes de plusieurs heures en utilisant le moteur Spark, qui offre une tolérance totale aux pannes en milieu de requête, sans avoir à se soucier d’utiliser un moteur différent pour les données historiques.
Les principales caractéristiques qui font de Spark SQL la librairie de traitement de données de Python la plus puissante sont les suivantes :
Intégration avec Spark
Les requêtes Spark SQL sont intégrées aux programmes Spark. Spark SQL nous permet d’interroger des données structurées à l’intérieur des programmes Spark, en utilisant SQL ou une API DataFrame. Elle peut être utilisée en Java, Scala, Python et R. D’ailleurs nous possédons une mini formation gratuite pour maitriser Spark avec Scala qui pourra sans doute vous aider.
Pour exécuter le calcul en continu, les développeurs écrivent simplement un calcul par lot contre l’API DataFrame / Dataset, et Spark incrémente automatiquement le calcul pour l’exécuter en continu. Grâce à cette conception puissante, les développeurs n’ont pas à gérer manuellement l’état, les défaillances ou la synchronisation de l’application avec les travaux par lots. Au lieu de cela, le travail en continu donne toujours la même réponse qu’un travail par lots sur les mêmes données.
Accès uniforme aux données
DataFrames et SQL prennent en charge un moyen commun d’accéder à diverses sources de données, comme Hive, Avro, Parquet, ORC, JSON et JDBC. Cela permet de joindre les données de ces sources. Cela est très utile pour accueillir tous les utilisateurs existants dans Spark SQL.
Compatibilité avec Hive
Spark SQL exécute des requêtes Hive non modifiées sur les données actuelles. Il réécrit le front-end Hive et le meta store, ce qui permet une compatibilité totale avec les données, les requêtes et les UDF Hive actuels.
Connectivité standard
La connexion se fait via JDBC ou ODBC. JDBC et ODBC sont les normes industrielles de connectivité pour les outils de business intelligence.
Performance et évolutivité
Spark SQL intègre un optimiseur basé sur les coûts, la génération de code et le stockage en colonnes pour rendre les requêtes agiles tout en calculant des milliers de nœuds à l’aide du moteur Spark, qui offre une tolérance de panne complète à mi-requête. Les interfaces fournies par Spark SQL donnent à Spark plus d’informations sur la structure des données et du calcul effectué. En interne, Spark SQL utilise ces informations supplémentaires pour effectuer une optimisation supplémentaire. Spark SQL peut lire directement à partir de plusieurs sources (fichiers, HDFS, fichiers JSON/Parquet, RDDs existants, Hive, etc.) Il assure l’exécution rapide des requêtes Hive existantes.
Pourquoi utilise-t-on Spark SQL ?
Spark SQL a été conçu à l’origine comme Apache Hive pour fonctionner au-dessus de Spark et est maintenant intégré à la pile Spark. Apache Hive avait certaines limites.
- Hive lance des travaux MapReduce en interne pour exécuter les requêtes ad-hoc. MapReduce est moins performant lorsqu’il s’agit d’analyser des ensembles de données de taille moyenne ( par exemple 200 Go).
- Hive n’a pas de capacité de reprise. Cela signifie que si le traitement s’arrête au milieu d’un flux de travail, vous ne pouvez pas reprendre là où il s’est arrêté.
- Hive ne peut pas déposer des bases de données cryptées en cascade lorsque la corbeille est activée, ce qui entraîne une erreur d’exécution. Pour surmonter ce problème, les utilisateurs doivent utiliser l’option Purge pour ignorer la corbeille au lieu de déposer.
Ces inconvénients ont donné lieu à la naissance de Spark SQL qui permet de surmonter ces inconvénients et remplacer Apache Hive. Mais la question qui se pose encore dans la plupart de nos esprits est la suivante : Spark SQL est-il une base de données ?
Spark SQL n’est pas une base de données mais un module utilisé pour le traitement des données structurées. Il fonctionne principalement sur les DataFrames qui sont l’abstraction de programmation et agissent généralement comme un moteur de requête SQL distribué.
Comment fonctionne Spark SQL ?
Voyons ce que Spark SQL a à offrir. Spark SQL estompe la frontière entre RDD et table relationnelle. Il offre une intégration beaucoup plus étroite entre le traitement relationnel et procédural, grâce à des API déclaratives DataFrame qui s’intègrent au code Spark. Elle permet également une optimisation plus poussée. L’API DataFrame et l’API Datasets sont les moyens d’interagir avec Spark SQL.
Avec Spark SQL, Apache Spark est accessible à un plus grand nombre d’utilisateurs et améliore l’optimisation pour les utilisateurs. Spark SQL fournit des API DataFrame qui effectuent des opérations relationnelles à la fois sur des sources de données externes et sur les collections distribuées intégrées de Spark. Il introduit un optimiseur extensible appelé Catalyst, qui permet de prendre en charge un large éventail de sources de données et d’algorithmes dans le domaine des Big-Data.
Spark fonctionne à la fois sur des systèmes Windows et des systèmes de type UNIX (par exemple, Linux, Microsoft, Mac OS). Il est facile de l’exécuter localement sur une machine – il suffit que java soit installé dans le PATH de votre système, ou que la variable d’environnement JAVA_HOME pointe vers une installation Java.
L’architecture de Spark SQL comprend les composants suivants :
1. Data Source API (Application Programming Interface)
Il s’agit d’une API universelle pour le chargement et le stockage de données structurées.
- Elle a un support intégré pour Hive, Avro, JSON, JDBC, Parquet, etc.
- Prends en charge l’intégration de tiers par le biais de paquets Spark
- Prise en charge des sources intelligentes.
2. L’API DataFrame
Un Dataframe représente une collection de données distribuées immuables. Son objectif principal est de permettre aux développeurs de ne se préoccuper que de ce qu’ils doivent faire lorsqu’ils sont confrontés au traitement des données, et non de la manière de le faire, et de laisser une partie du travail d’optimisation au framework spark lui-même. Le dataframe contient des informations de schéma, c’est-à-dire qu’il peut être considéré comme une donnée avec un nom et un type de champ, ce qui est similaire à une table dans une base de données relationnelle, mais la couche inférieure a fait beaucoup d’optimisation. Une fois le dataframe créé, vous pouvez utiliser SQL pour le traitement des données.
Les utilisateurs peuvent construire un dataframe à partir de diverses sources de données, telles que des fichiers de données structurés, des tables dans le répertoire de stockage, des bases de données externes ou des RDD existants. L’API dataframe prend en charge Scala, Java, Python et R.
3. L’API Dataset
Dataset est une nouvelle interface ajoutée dans spark 3.0 et une extension de dataframe. Elle présente les avantages du RDD (entrée de type forte, prise en charge de puissantes fonctions lambda) et les avantages du moteur d’exécution optimisé de spark SQL. Vous pouvez construire un dataset à partir d’un objet JVM, puis utiliser une transformation de fonction (map),flatMap,filter). Il convient de noter que l’API dataset est disponible en scala et en Java, et que python ne prend pas en charge l’API dataset.
En outre, l’API dataset peut réduire l’utilisation de la mémoire. Comme le framework spark connaît la structure de données du dataset, il peut économiser beaucoup d’espace mémoire lors de la persistance du dataset.
4. L’optimiseur Catalyst
L’optimiseur Catalyst utilise deux types de plans :
Plan logique : définit le calcul sur le jeu de données, mais n’a pas défini la manière d’exécuter le calcul. Chaque plan logique définit une série de propriétés (champs de requête) et de contraintes (conditions d’occurrence) requises par le code utilisateur, mais ne définit pas la manière d’exécuter le calcul.
Plan physique : un plan physique est généré à partir d’un plan logique, qui définit la manière d’effectuer les calculs et est exécutable. Par exemple, une jointure dans un plan logique est convertie en une jointure de fusion de tri dans un plan physique. Il est à noter que Spark génère plusieurs plans physiques et sélectionne ensuite le plan physique ayant le coût le plus bas.
Dans spark SQL, toutes les opérations des opérateurs sont converties en ast (abstract syntax tree) et transmises à l’optimiseur de catalyseur. L’optimiseur est construit sur la base de la programmation fonctionnelle de Scala. Catalyst supporte les stratégies d’optimisation basées sur les règles et sur les coûts.
Le plan de requête de spark SQL comprend quatre étapes (voir la figure ci-dessous) :
1. Analyse
2. Optimisation logique
3. Plan physique
4. Génération du code et compilation de la partie requête en bytecode Java.
Attention : Dans la phase de planification physique, Catalyst génère plusieurs plans, calcule le coût de chaque plan, puis compare le coût de ces plans, c’est la stratégie basée sur le coût. Dans les autres phases, il s’agit d’une stratégie d’optimisation basée sur les règles.
Analyse
Unresolved Logical Plan -> Logical Plan. Le plan de requête de spark SQL commence par l’ast (abstract syntax tree) retourné par le parseur SQL ou l’objet dataframe construit par l’API. Dans les deux cas, il y aura des références d’attributs non traitées (un champ de requête peut ne pas exister, ou le type de données est erroné). Lorsque le type d’un champ de propriété ne peut pas être déterminé ou ne peut pas être mis en correspondance avec la table d’entrée, il est appelé Untreated. Spark SQL utilise les règles du catalyst et l’objet catalogue (qui peut accéder aux informations de la table de la source de données) pour traiter ces propriétés. La première étape consiste à construire un Unresolved Logical PlanTree, puis à appliquer une série de règles, et enfin à générer un Logical Plan.
Optimisation logique
Logical Plan ->Optimized Logical Plan. Dans l’étape d’optimisation logique, des stratégies d’optimisation basées sur des règles sont utilisées, telles que la poussée des prédicats, l’écrêtage des projections, etc. Après une certaine optimisation des colonnes, le plan logique optimisé est généré.
Plan physique
Optimized Logical Plan ->Physical Plan. Dans la phase de plan physique, spark SQL va générer plusieurs plans d’exécution physiques à partir du plan logique optimisé, puis utiliser le modèle de coût pour calculer le coût de chaque plan physique, et enfin sélectionner un plan physique. A ce stade, spark SQL utilise la jointure de diffusion s’il est déterminé qu’une table est très petite (qui peut être persistée en mémoire).
Il convient de noter que le planificateur physique utilise également des stratégies d’optimisation basées sur des règles, telles que le pipelining des opérations de projection et de filtrage dans un opérateur spark map. En outre, les opérations de la phase de planification logique sont poussées du côté de la source de données (le prédicat push-down et la projection push-down sont pris en charge).
Génération de code
L’étape finale de l’optimisation des requêtes consiste à générer le bytecode Java et à utiliser les guillemets Quasi pour terminer le travail.
Après l’analyse ci-dessus, nous avons une compréhension préliminaire de l’optimiseur Catalyst. Comment les autres composants de spark interagissent-ils avec l’optimiseur de catalyseur ? Les détails sont présentés dans la figure suivante :
Comme le montre la figure ci-dessus : Les pipelines ML, le streaming structuré et les graphframes utilisent tous des API de type dataframe / dataset. Et ils bénéficient tous de l’optimiseur de catalyseur.
Démarrer avec Spark SQL
Pour une meilleure maîtrise de cette librairie vous devez avoir une connaissance de base sur le langage de programmation Python. Notre article sur la programmation Python pour la data vous donnera toutes les bases nécessaires pour mieux appréhender cet article.
Création d’une session Spark
Sparksession est l’entrée de programmation de l’API de dataset et de dataframe. Elle est supportée à partir de spark 2.0. Elle est utilisée pour unifier le hivecontext et le sqlcontext. Grâce à une entrée de session spark, l’utilisabilité de spark est améliorée. Le code suivant montre comment créer une sparksession :
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
Création d’un dataframe
Après avoir créé une session Spark, vous pouvez l’utiliser pour créer un dataframe à partir d’un RDD existant, d’une table hive ou d’une autre source de données. L’exemple suivant permet de créer un dataframe à partir d’une source de données de type fichier JSON :
df = spark.read.option("multiline","true").json("student.json")
# Afiifchage du contenu du dataframe
df.show()
Cet exemple affichera donc :
Opérations de base sur les dataframes
Après avoir créé un dataframe, vous pouvez effectuer quelques opérations sur les colonnes, comme le montre le code suivant :
# Selection de la colonne "firstname"
df.select("firstname").show()
# Sélection de tout le dataframe, mais en incrémentant tout les ages de 1
df.select(df['firstname'], df['age'] + 1).show()
# Sélection des étudiants de plus de 29 ans
df.filter(df['age'] > 29).show()
# Comptage des étudiants par groupe d'age
df.groupBy("age").count().show()
Utilisation d’une requête SQL dans un programme
L’opération ci-dessus useDSL (langage spécifique au domaine). Vous pouvez également utiliser SQL pour exploiter directement le cadre de données, comme indiqué ci-dessous :
# Enregistrement du DataFrame comme une vue temporaire
df.createOrReplaceTempView("student")
sqlDF = spark.sql("SELECT * FROM student WHERE firstname = 'Sylvie'")
sqlDF.show()
Interopérer avec les RDD
Spark SQL prend en charge deux méthodes différentes pour convertir des RDD existants en dataframe. La première méthode utilise la réflexion pour déduire le schéma d’un RDD qui contient des types d’objets spécifiques. Cette approche basée sur la réflexion conduit à un code plus concis. Elle fonctionne bien lorsque vous connaissez déjà le schéma lors de l’écriture de votre application Spark.
La deuxième méthode pour créer un dataframe est une interface programmatique. Elle vous permet de construire un schéma et de l’appliquer à un RDD existant. Bien que cette méthode soit plus verbeuse, elle vous permet de construire des ensembles de données lorsque les colonnes et leurs types ne sont pas connus avant l’exécution.
Voyons le fichier people.txt suivant :
Tom, 29
Bob, 30
Sylvie, 19
Déduction du schéma à l’aide de la réflexion
Spark SQL peut convertir un RDD d’objets Row en un DataFrame, en déduisant les types de données. Les rangs sont construits en passant une liste de paires clé/valeur comme kwargs à la classe Row. Les clés de cette liste définissent les noms des colonnes de la table, et les types sont déduits par échantillonnage de l’ensemble des données, de manière similaire à l’inférence effectuée sur les fichiers JSON.
from pyspark.sql import Row
sc = spark.sparkContext
# Chargement du fichier texte et conversion de chaque ligne en Row.
lines = sc.textFile("people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
# Déduire le schéma et enregistrer le DataFrame comme une table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")
# Exécution d'une requête
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
# Les résultats des requêtes SQL sont des objets Dataframe.
# rdd retourne le contenu sous la forme d'un :class:`pyspark.RDD` de :class:`Row`.teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
print(name)
# Name: Sylvie
Spécification programmatique du schéma
Lorsqu’un dictionnaire de kwargs ne peut pas être défini à l’avance (par exemple, la structure des enregistrements est codée dans une chaîne de caractères, ou un ensemble de données textuelles sera analysé et les champs seront projetés différemment pour différents utilisateurs), un DataFrame peut être créé par programme en trois étapes.
- Créer un RDD de tuples ou de listes à partir du RDD original ;
- Créer le schéma représenté par un StructType correspondant à la structure des tuples ou des listes dans le RDD créé à l’étape 1.
- Appliquer le schéma au RDD via la méthode createDataFrame fournie par SparkSession.
Prenons cet exemple :
# Import data types
from pyspark.sql.types import StringType, StructType, StructField
sc = spark.sparkContext
# Chargement du fichier texte et conversion de chaque ligne en Row.
lines = sc.textFile("people.txt")
parts = lines.map(lambda l: l.split(","))
# Chaque est convertit en tuple
people = parts.map(lambda p: (p[0], p[1].strip()))
# schema is encodé en string.
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
# Appllication du schema au RDD.
schemaPeople = spark.createDataFrame(people, schema)
# Création d'une vue temporaire
schemaPeople.createOrReplaceTempView("people")
# SQL peut être exécuté sur des DataFrames qui ont été enregistrés comme une table.
results = spark.sql("SELECT name FROM people")
results.show()
Les Source de données de spark SQL
Spark SQL prend en charge l’exploitation de diverses sources de données par le biais de l’interface dataframe. Il peut utiliser la transformation relationnelle et la vue temporaire pour opérer sur le dataframe. Les sources de données courantes sont les suivantes :
Source de données fichier
Les sources de données fichier peuvent être des fichiers :
- Parquet ;
- JSON ;
- CSV ;
- Orc ;
#Code pour lire un fichier parquet
parquetFile = spark.read.parquet("student.parquet")
#Code pour lire un fichier JSON
peopleDF = spark.read.json("student.parquet")
#Code pour lire un fichier CSV
df = spark.read.format("csv")
.load("student.csv")
Source de données hive
Spark SQL prend également en charge la lecture et l’écriture de données stockées dans Apache Hive. Cependant, comme Hive a un grand nombre de dépendances, ces dépendances ne sont pas incluses dans la distribution Spark par défaut. Si les dépendances Hive peuvent être trouvées sur le classpath, Spark les chargera automatiquement. Notez que ces dépendances Hive doivent également être présentes sur tous les nœuds de travail. Ils devront avoir accès aux bibliothèques de sérialisation et de désérialisation Hive (SerDes) afin d’accéder aux données stockées dans Hive.
La configuration de Hive se fait en plaçant votre fichier hive-site.xml, core-site.xml (pour la configuration de la sécurité) et hdfs-site.xml (pour la configuration HDFS) dans conf/.
Lorsque l’on travaille avec Hive, il faut instancier SparkSession avec le support Hive. Cela comprend aussi la connectivité à un métastore Hive persistant, le support des serdes Hive et les fonctions Hive définies par l’utilisateur. Les utilisateurs qui n’ont pas de déploiement Hive existant peuvent toujours activer le support Hive. Lorsqu’il n’est pas configuré par le hive-site.xml, le contexte crée automatiquement metastore_db dans le répertoire courant et crée un répertoire configuré par spark.sql.warehouse.dir, qui est par défaut le répertoire spark-warehouse dans le répertoire courant où l’application Spark est démarrée. Notez que la propriété hive.metastore.warehouse.dir dans hive-site.xml est dépréciée depuis Spark 2.0.0. Utilisez plutôt spark.sql.warehouse.dir pour spécifier l’emplacement par défaut de la base de données dans l’entrepôt. Vous devrez peut-être accorder le privilège d’écriture à l’utilisateur qui démarre l’application Spark.
from os.path import abspath
from pyspark.sql import SparkSession
from pyspark.sql import Row
# warehouse_location indique l'emplacement par défaut des bases de données et des tables gérées.
warehouse_location = abspath('spark-warehouse')
spark = SparkSession \
.builder \
.appName("Python Spark SQL Hive integration example") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.enableHiveSupport() \
.getOrCreate()
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Les requêtes sont exprimées en HiveQL
spark.sql("SELECT * FROM src").show()
# +---+-------+
# |key| value|
# +---+-------+
# |238|val_238|
# | 86| val_86|
# |311|val_311|
# ...
# Les requêtes d'agrégation sont également prises en charge.
spark.sql("SELECT COUNT(*) FROM src").show()
# +--------+
# |count(1)|
# +--------+
# | 500 |
# +--------+
# Les résultats des requêtes SQL sont eux-mêmes des DataFrames et supportent toutes les fonctions normales.
sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
#Les éléments des DataFrames sont de type Row, ce qui permet d'accéder à chaque colonne par ordre ordinal.
stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
for record in stringsDS.collect():
print(record)
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# ...
# Vous pouvez également utiliser les DataFrames pour créer des vues temporaires dans un SparkSession.
Record = Row("key", "value")
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
recordsDF.createOrReplaceTempView("records")
# Les requêtes peuvent alors joindre les données de DataFrame aux données stockées dans Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
# +---+------+---+------+
# |key| value|key| value|
# +---+------+---+------+
# | 2| val_2| 2| val_2|
# | 4| val_4| 4| val_4|
# | 5| val_5| 5| val_5|
# ...
Source de données JDBC
Spark SQL comprend également une source de données qui peut utiliser JDBC pour lire les données d’autres bases de données. Cette fonctionnalité devrait avoir la priorité sur l’utilisation de JdbcRDD. En effet, les résultats sont renvoyés sous forme de dataframe, facilement traitable dans spark SQL ou connectés à d’autres sources de données. Les sources de données JDBC sont également plus faciles à utiliser avec Java ou Python. En effet, elles ne nécessitent pas que les utilisateurs fournissent des classtags.
Vous pouvez utiliser l’API des sources de données pour charger les tables d’une base de données distante sous forme de dataframe ou de vue temporaire spark SQL. Les utilisateurs peuvent spécifier les propriétés de connexion JDBC dans les options de la source de données. L’utilisateur et le mot de passe sont généralement fournis en tant que propriété pour se connecter à la source de données. En plus des propriétés de connexion, spark prend en charge les options insensibles à la casse suivantes :
Nom des propriétés | Explications |
---|---|
url | URL JDBC à laquelle se connecter. |
dbtable | Table JDBC lue ou écrite. |
query | Le contenu de la requête. |
driver | Le nom de la classe du pilote jdbc utilisé pour se connecter à l’URL. |
partitionColumn, lowerBound, upperBound | Si vous spécifiez ces options, vous devez les spécifier toutes. De plus, numPartitions doit être spécifié. |
numPartitions | Le nombre maximum de partitions dans une table en lecture/écriture qui peut être utilisée pour le traitement parallèle. Cela détermine également le nombre maximal de connexions JDBC simultanées. Si le nombre de partitions à écrire dépasse cette limite, on peut utiliser l’appel coalesce(numPartitions) avant l’écriture pour le réduire à cette limite. |
queryTimeout | La valeur par défaut est 0, le délai d’interrogation. |
fetchsize | La taille d’extraction de JDBC, qui détermine le nombre de lignes à extraire à la fois. Cela peut contribuer à améliorer les performances des pilotes JDBC. |
batchsize | La valeur par défaut est 1000, la taille du lot JDBC, qui peut contribuer à améliorer les performances des pilotes JDBC. |
isolationLevel | Niveau d’isolation de la transaction pour la connexion actuelle. Il peut s’agir de NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ ou SERIALISABLE, ce qui correspond à la définition de l’objet de connexion de JDBC, et la valeur par défaut est le niveau d’isolation de transaction standard READ_UNCOMMITED. Cette option est uniquement disponible pour l’écriture. |
sessionInitStatement | Après l’ouverture de chaque session de base de données vers une base de données distante, cette option exécute une instruction SQL personnalisée pour mettre en œuvre le code d’initialisation de la session avant de commencer à lire les données. |
pushDownPredicate | Option pour activer ou désactiver la poussée des prédicats vars les sources de données JDBC. La valeur par défaut est true. Dans ce cas, spark pousse le filtre vers la source de données JDBC autant que possible. |
# Remarque : le chargement et la sauvegarde JDBC peuvent être réalisés via les méthodes load/save ou jdbc.
# Chargement de données à partir d'une source JDBC
jdbcDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.load()
jdbcDF2 = spark.read \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
# Spécifier les types de données des colonnes du dataframe en lecture
jdbcDF3 = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.option("customSchema", "id DECIMAL(38, 0), name STRING") \
.load()
# Sauvegarder les données dans une source JDBC
jdbcDF.write \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.save()
jdbcDF2.write \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
# Spécifier les types de données des colonnes de la table de création en écriture
jdbcDF.write \
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
Le serveur Thrift et le spark SQL CLI
Vous pouvez utiliser JDBC / ODBC ou la ligne de commande pour accéder à spark SQL. De cette façon, les utilisateurs peuvent directement utiliser SQL pour exécuter des requêtes sans écrire de code.
Serveur Thrift JDBC/ODBC
Le serveur Thrift JDBC / ODBC correspond au hiveserver2 de hive. Vous pouvez utiliser beeline pour accéder au serveur JDBC. Le script Start existe dans le répertoire SBIN de spark- thriftserver.sh, utilisez ce script pour démarrer le serveur JDBC / ODBC :
./sbin/start-thriftserver.sh
Lorsque vous utilisez beeline pour accéder au serveur JDBC / ODBC, beeline vous demandera le nom d’utilisateur et le mot de passe. En mode non sécurisé, il suffit de saisir le nom d’utilisateur et le mot de passe vierge.
beeline> !connect jdbc:hive2://localhost:10000
Spark SQL CLI
Spark SQL cli est un outil pratique pour exécuter le service d’historique Metastore en mode local. Il permet également d’exécuter des requêtes saisies à partir de la ligne de commande. Notez que spark SQL cli ne peut pas communiquer avec le serveur JDBC du thrift.
Pour lancer spark SQL cli, il suffit d’exécuter la commande suivante dans le répertoire bin de Spark :
./spark-sql
Cet article décrit principalement spark SQL, y compris l’introduction de spark SQL. Il permet également d’appréhender l’utilisation de base de l’API dataframe & dataset, le principe de base de l’optimiseur catalyst, la programmation spark SQL, la source de données spark SQL et l’intégration avec hive, thrift server et spark SQL cli.
Nous espérons que vous avez compris l’intérêt de Spark SQL dans le marché et son intérêt dans votre carrière. Clairement SparkSQL est un outil à prendre en considération si vous souhaitez travailler dans le Big Data. C’est un outil qui s’apprivoise très rapidement. Si vous avez des questions par rapport à l’utilisation de ce module n’hésitez pas à la poser dans les commentaires. Pour aller plus loin dans vos compétences dans le Big Data, nous vous offrons cette formation sur l’écosystème Hadoop.