L’année 2020 marque le 10ème anniversaire de Spark en tant que projet Open Source. En dehors de Linux et MySQL, rare sont les projets open source dans la Data ayant connu autant d’engouement et de succès que Spark.
Le 10 Juin 2020, ce succès a été marqué par la sortie de la nouvelle version 3 d’Apache Spark. On peut dire aujourd’hui sans se tromper que Spark est le moteur de facto de traitement unifié de données à large échelle !
Dans cet article détaillé, nous allons voir ensemble les principales fonctionnalités de cette release, et nous expliquerons en quoi elles contribuent à asseoir la souveraineté de Spark en tant que moteur par défaut du traitement massivement parallèle de données.
Avant d’entrer dans le vif du sujet, il faut comprendre que cette nouvelle release a nécessité plus de 3400 tickets JIRA et 450 contributeurs. Ces tickets se sont répartis de la façon suivante : 46% des tickets étaient pour Spark SQL (ce qui en fait le composant majeur de la nouvelle release), seulement 16 % sur le Spark Core, 7% sur PySpark, 6% sur Spark MLib et 4% sur Spark Streaming.
Vous voyez qu’à ces 5 grands composants sur lesquels ont porté les modifications de la nouvelle release, on couvre plus de 80% des fonctionnalités de Spark 3.
Allons-y !
1 – Spark 3 tourne maintenant sur Hadoop 3.X.X
Spark 3 utilise maintenant Hadoop 3 (et non Hadoop 2 comme dans la release 2). Vous ne le savez peut-être pas, mais Spark a besoin des librairies d’Hadoop pour tourner. Quand bien même votre cluster Spark serait déployé sur Kubernetes, il vous faudra toujours lancer Hadoop pour utiliser Spark. D’ailleurs, les bibliothèques d’Hadoop sont déjà inclues dans les dépendances de Spark.
Spark utilise Hadoop principalement pour les opérations d’écriture (la sérialisation des objets) dans le cluster (par exemple lecture et écriture des fichiers parquets, orc, lecture/écriture dans des stockages objet comme S3 ou Azure BLOB) et l’interaction avec le système de fichiers distribué installé sur le cluster (notamment le HDFS).
Sans Hadoop, vous ne pourrez pas utiliser le système de fichier distribué de votre cluster, et cela est valable même si vous êtes dans le Cloud et que vous utilisez un système de stockage objet comme Amazon S3 ou Azure BLOB.
La release d’Hadoop 3 a emmené une performance significative sur le connecteur d’Amazon S3 et améliorer la compatibilité entre Hadoop et Spark.
Cette amélioration est dans le Spark Core, du coup vous n’aurez rien de spécial à faire pour l’utiliser. Elle est complètement transparente pour vous, en dehors de l’activation du « S3 Magic Committer« , qui se fait manuellement en rajoutant la config suivant lors de l’initialisation de la session Spark :
"spark.hadoop.fs.s3a.bucket..committer.magic.enabled": "true".
2 – Introduction d’un nouvel optimiseur SQL : l’AQE – Adaptative Query Execution
La plus grosse amélioration de Spark 3 est sans aucune équivalence, l’ajout d’un nouvel optimiseur SQL dans le moteur de Spark SQL (Spark Catalyst).
A ce jour, Spark SQL est le composant le plus important d’Apache Spark. C’est d’ailleurs grâce à lui que Spark s’est rapidement imposé comme le moteur de calcul de choix dans le Big Data.
D’abord, le Spark SQL donne aux développeurs la capacité d’utiliser et d’exécuter directement le SQL (compatible ANSI 2003) dans Spark. Ensuite, Il fournit une abstraction structurée sur le RDD appelée DataFrame/DataSet qui permet de manipuler les données simplement comme des tables. Grace à l’API DataFrame/DataSet, le développeur effectue les tâches de Data Engineering en appliquant simplement des transformations et actions sur une source de données abstraite en table. Et enfin, il fournit une panoplie de connecteurs JDBC qui permettent d’interagir avec plusieurs SGBD compatibles avec le protocole ODBC (notamment Oracle, SQL Server, MySQL, ElasticSearch, Cassandra, HBase, etc…).
Pour pouvoir exécuter le code SQL directement, Spark s’appuie sur un moteur d’exécution de requête SQL appelé Spark Catalyst. Sans entrer dans le détail de fonctionnement d’un moteur SQL, lorsque vous soumettez une requête SQL à Spark, Spark Catalyst génère un plan d’exécution optimisé de cette requête sous forme de bytescode intermédiaire, qui sera finalement envoyé à la JVM puis exécuté par le CPU.
Vous voyez donc que compte tenu du rôle de Spark SQL, la performance globale de Spark dépend principalement de l’optimisation réalisée par Spark Catalyst. C’est ce qui explique que pendant des années, les efforts d’amélioration de Spark se sont concentrés sur l’optimiseur SQL, dans le but de trouver le moyen de générer des plans d’exécution les plus courts possibles (c’est-à-dire les plans qui contiennent le moins d’étapes possibles, car plus il y’a d’étapes et plus le job prendra du temps pour s’exécuter). Ce n’est donc pas surprenant que 41% des améliorations de la nouvelle release 3 de Spark tourne sur son optimiseur SQL, en d’autres termes sur Spark Catalyst.
A titre de rappel, un moteur d’exécution de requête SQL utilise/s’appuie sur une ou plusieurs méthodes d’optimisation pour construire le plan d’exécution de la requête. La construction de ce plan en Spark implique la sélection de la méthode de jointure appropriée, la prise de décision concernant l’ordre d’exécution des tâches, et la prise de décision sur la stratégie de jointure appropriée (broadcast-hash vs sort-merge).
Jusqu’à la version 2.4 de Spark, l’amélioration la plus significative dans Spark Catalyst était l’ajout d’une méthode d’optimisation basée sur les coûts à la compilation (cost-based optimization). Cela signifie que de la version 2.0 jusqu’à la version 2.4, Spark Catalyst utilisait la méthode « cost-based optimization » pour déterminer le plan optimal d’exécution des requêtes SQL.
Jusqu’à présent, le fonctionnement de cette méthode repose sur des estimations calculés à la compilation (donc avant l’exécution) concernant des statistiques sur les données manipulées, comme par exemple le nombre de lignes de la source de données (row count), le nombre de valeurs distincts, le nombre de valeurs NULL, le min, le max, la taille en mémoire des données, etc. Sur la base de ces statistiques, la méthode permet à Spark de choisir parmi plusieurs plans, le plan le plus court (ou plan le plus optimal).
On est donc sur une méthode d’optimisation basée sur des règles d’association du type :
IF row_count(table1) < row_count(table2) THEN ACTION
IF table1.MEMORY < EXECUTOR.MEMORY THEN JOIN.METHOD = BROADCAST()
C’est cette méthode d’optimisation qui est utilisée par Spark Catalyst jusqu’à la version 2.4, et elle fonctionne très bien. Combiné aux capacités fonctionnelles de Scala, langage dans lequel Spark Catalyst a été développé, ses performances n’en sont qu’encore plus améliorées.
Malheureusement, quoique performante, la méthode d’optimisation basée sur les coûts souffre de deux inconvénients majeurs :
- Les statistiques sur lesquelles l’optimiseur se base pour construire le plan d’exécution des requêtes sont les statistiques des données avant l’étape de transformation nécessitant l’exécution de la requête SQL (ou de l’agrégation)
- Le plan d’exécution est généré une seule fois, à la compilation et c’est ce même plan qui est utilisé à tous les stages d’exécution de Spark.
Or justement, Spark n’est pas un moteur d’exécution traditionnel comme Teradata dans lequel le plan est exécuté une seule fois sur l’ensemble des données. Non ! Spark est un système distribué qui fonctionne sur un cluster tolérant aux pannes. A cause des contraintes du cluster, chaque étape d’un traitement est exécuté en stage (ou phase) séparé qui modifie potentiellement les données : c’est le paradigme MapReduce. Pour illustrer cela, supposons ce traitement Spark :
val session_s = Session_Spark(true)
val sc = session_s.sparkContext
val rdd_test : RDD[String] = sc.parallelize(List("Sue likes cats cats like cat food cats like to play"))
val rdd_compte = rdd_test
.flatMap(x => x.split(" "))
.map(m => (m, 1))
.reduceByKey((x, y) => x + y)
L’exécution de ce traitement dans le cluster va se faire en 3 stages, que nous pouvons globalement représenter dans le schéma suivant :
Vous remarquerez dans ce schéma qu’à chaque stage, la taille des données a changé, ainsi que le nombre de lignes, et c’est ça le problème de l’optimiseur basé sur les coûts. C’est le même plan qui est exécuté sur l’ensemble des étapes, or les statistiques des données initiales sur lesquelles il s’est basé pour générer le plan ne sont plus nécessairement les mêmes d’un stage après un autre. Cela crée ce que j’appelle des « vides d’optimisation » qu’on ne peut plus combler une fois que le plan a déjà été généré, et c’est là que l’AQE – Adaptative Query Execution intervient.
l’AQE – Adaptative Query Execution est une nouvelle méthode d’optimisation qui vient se rajouter à Spark Catalyst et se combiner aux deux autres méthodes d’optimisation déjà présentes (le Cost-based optimisation, et le Dynamic Partition Pruning) pour offrir la ré-optimisation au runtime (à l’exécution).
Qu’est ce que cela veut dire ?
Cela signifie que l’AQE améliore le workflow d’optimisation de Spark Catalyst en ajustant le plan d’exécution des requêtes au runtime, à chaque stage du job. Il re-calcule les statistiques des données à chaque stage du job et reconstruit si nécessaire le plan d’exécution des requêtes sur la base de ces statistiques. Ce qui est intéressant c’est que l’AQE effectue ces ré-optimisations en pleine exécution (runtime) du job Spark.
Vous imaginez donc à quel point l’AQE, combinée au Cost-based optimisation, et au Dynamic Partition Pruning, améliore la performance de Spark !
Selon le benchmark de TPC-DS (le standard de l’industrie en matière de Décisionnel) basé sur 3 To de données, l’utilisation de AQE dans Spark est 1,5 fois plus rapide sur 2 requêtes que l’utilisation de Spark sans AQE, et 1,1 fois sur 37 requêtes.
1,1 fois ce n’est pas très rapide pour une optimisation n’est ce pas ? C’est parce que l’AQE n’est pas systématiquement appliqué. Si le plan généré au début de la compilation est le meilleur plan sur l’ensemble des étapes du job Spark, alors l’optimisation AQE n’est plus nécessaire. En fait, l’optimisation AQE, est appliqué pour le moment par Spark Catalyst dans les 3 situations suivantes :
- L’optimisation dynamique du nombre de partitions après un Shuffle
- L’optimisation dynamique de la stratégie de jointure
- L’optimisation dynamique des partitions asymétriques
Voyons les implications profondes de chacun de ces cas dans la performance des jobs Spark à part.
L’optimisation dynamique du nombre de partitions après un Shuffle
Peut-être vous le savez déjà, mais le Shuffle est l’opération la plus coûteuse dans le traitement massivement parallèle. En effet, étant donné que les données sont partitionnées et réparties entre des nœuds complètement indépendants, lorsque vous voulez effectuer une opération qui nécessite l’utilisation de toutes les données et qui n’est pas commutative (par exemple le calcul d’une variance, les agrégations via le GROUP BY), le cluster est obligé de transférer à travers le réseau, toutes les données nécessaires par l’opérateur à une machine spécifique (ou à carrément redistribuer les données entre tous les nœuds comme dans le cas d’un GROUP BY).
C’est ce transfert de données inévitable, qui plombe la performance des jobs. Chaque fois qu’un stage « shuffle » est exécuté, il y’a modification du nombre de partitions. Celui-ci devient égal au nombre d’output du shuffle (comme vous pouvez le voir dans la figure 4), et c’est justement ce nombre de partitions qu’on ne peut pas forcément anticiper dès le début du job qui crée l’un des vides d’optimisation dès lors que le plan d’exécution est généré au début de la compilation.
Jusqu’à la version 2.4 de Spark, le seul moyen qu’on avait pour contrôler le nombre de partitions après un shuffle était via la propriété « shuffle.max.partitions« . Par défaut sa valeur est de 200. Il fallait fixer d’avance à la compilation, le nombre de partitions via la configuration « shuffle.max.partitions« . En utilisant la valeur par défaut de cette configuration à 200, Spark est toujours obligé de créer 200 partitions après un Shuffle, même si celles-ci sont trop petites et non nécessaires en fonction de la taille des données.
AQE est capable automatiquement de combiner, réduire, ou augmenter le nombre de partitions du shuffle à l’exécution, en fonction des besoins d’optimisation du job. En fonction de leur taille, il peut décider de combiner certaines partitions, conduisant ainsi à la réduction du nombre de tâches déclenchées par Spark pour les stages suivants. On n’a donc plus à s’inquiéter de ce problème là.
L’optimisation dynamique de la stratégie de jointure
Un autre problème qui plombe les performances dans un système distribué ce sont les jointures. Les jointures sont des problèmes de performances parce qu’elles entraînent automatiquement un shuffle.
Une manière d’éviter ce problème est d’utiliser un hash-Map Join (ou broadcast-join), qui consiste à charger une table en mémoire et l’utiliser en tant que table de hachage pour effectuer la jointure avec l’autre table qui est restée sur le disque dur. Ainsi, on évite les Shuffle.
Malheureusement, le broadcast-join n’est pas quelque chose de systématique, car elle dépend de la taille des données, qui doit être inférieur à la quantité de mémoire disponible dans l’exécuteur Spark. Or, même si on peut demander à ce que Spark force le broadcast-join par défaut via la configuration « spark.sql.autoBroadcastJoinThreshold« , on ne peut pas dès le début du Job, connaître la mémoire qui sera disponible ainsi que la taille des partitions à chaque stage du job. Du coup, cela devient un boulet, un autre vide d’optimisation qu’on traîne dans nos jobs Spark.
L’AQE supprime ce problème en recalculant à chaque stage du job, les statistiques sur les tailles des partitions et la quantité mémoire disponible, et en convertissant ainsi lorsque nécessaire une jointure en broadcast-join, et ce sur l’ensemble des stages du job.
L’optimisation dynamique des partitions asymétriques
Un autre problème général qu’on rencontre dans le traitement massivement parallèle c’est l’asymétrie des tailles de partition. En effet, étant donné que dans un cluster Shared-Nothing, qui est l’infrastructure de base des traitement Big Data, les données sont partitionnées et les traitements sont effectuées en tâches indépendantes sur ces partitions, une distribution anormale des données entre les partitions provoque automatiquement des retards. Certaines tâches vont s’achever bien plus tard que leur homologue pour le même job, entraînant un problème de performance général.
En fait, la stabilité des traitements dans un cluster dépend de l’uniformité de la taille des partitions. Si une seule partition est trop large par rapport aux autres, c’est le ralentissement assuré du job ! L’asymétrie ou non-uniformité dans la taille des partitions est un problème qui arrive fréquemment après un Shuffle, on se retrouve avec 200 partitions dans lesquelles 15 partitions (par exemple) contiennent 80% des données tandis que les 185 autres contiennent 15% des données.
Jusqu’à Spark 2.4, il n’y’avait quasiment rien qu’on pouvait faire même pas au compile-time pour régler ce problème, en dehors de fixer le nombre de partitions à un chiffre très bas. Mais maintenant, dans la version 3, étant donné que les statistiques sur les tailles de partitions sont recalculées à chaque stage, AQE peut automatiquement et dynamiquement détecter des partitions asymétriques, et les fractionner en partitions de taille plus petite. Dans l’autre sens, il peut identifier les partitions trop petites et les regrouper afin qu’elles aient une taille suffisamment importante à l’échelle du cluster.
Voilà les 3 optimisations majeures que AQE peut effectuer. Comme vous pouvez le constater, ces optimisations couvrent les situations qui sont à l’origine de plus de 80% des problèmes de performance dans un traitement exécuté sur cluster. En plus, la méthode d’optimisation est automatique. Ce qui signifie qu’elle nous débarrasse d’une partie du travail manuel et itératif qu’on devait effectuer pour optimiser la performance de nos jobs Spark.
L’AQE est désactivé par défaut dans la version Spark 3.0. Pour l’activer, il faut définir à « true« , la configuration « spark.sql.adaptive.enabled«
Attention !! A partir de Spark 3.2, Adaptive Query Execution est activé par défaut et vous n’avez plus besoin de la définir explicitement via la configuration.
3 – Intégration de Koala : l’implémentation PySpark de Panda Python
PySpark, l’API Python de Spark compte aujourd’hui plus de 5 millions de téléchargements sur PyPI. Cela montre à quel point PySpark est devenu important dans le marché du Big Data. Après AQE, les améliorations de PySpark font partie des améliorations les plus importantes de la release 3 d’Apache Spark.
Plus précisément, la release 3 de Spark apporte l’intégration de Koalas à Spark, et une meilleure interopérabilité entre la JVM et Python.
A titre de rappel, Koalas est une implémentation distribuée de la bibliothèque Pandas. Pandas est la bibliothèque de facto utilisée en Python pour les analyses de données. Elle est utilisée pour toutes les tâches de Data Engineering, notamment, le croisement et l’uniformisation de données, les jointures, la validation de données sur la base de contraintes sémantiques, ou encore la connexion aux bases de données distantes.
Par définition, toutes ces tâches requièrent l’utilisation de multiples bibliothèques : le SQL, le JDBC, une API de manipulation de JSON comme Jackson databind, etc… Panda Python regroupe dans une seule API l’analyse complexe de données. C’est une bibliothèque polyvalente basée sur NumPy qui fournit diverses structures de données et opérations pour le traitement de données numériques et des séries chronologiques. En plus de cela, les types et les structures de Panda sont souvent utilisées comme input pour les fonctions de plotting, l’analyse statistique en SciPy, et les algorithmes de machine learning en Scikit-learn. Ainsi, lorsqu’un utilisateur Python travaille avec PySpark, panda est toujours à côté.
Le problème de Panda lorsqu’on l’utilise dans un cluster Spark avec PySpark est double :
- c’est une bibliothèque par définition mono-thread qui s’exécute en seul processus.
- dû à la nature interprétée de Python, il interagit très mal avec la JVM (par exemple au niveau de la traduction des types en bytecode).
Koalas vient pour résoudre ces 2 problèmes en offrant une API Panda automatiquement distribuée en Spark et compatible avec les types de la JVM. Ainsi, lorsque vous utilisez Kaolas, vous profitez automatiquement de la puissance de Spark et de la flexibilité de Panda.
Pour réaliser cet exploit, Koalas s’appuie en arrière plan sur Apache Arrows, un structure de données In-Memory et orientée colonne spécialisée dans les analyses de données à large échelle. Il offre des fonctionnalités intéressantes comme la vectorisation des opérations (qui augmente drastiquement la performance de Pandas de l’ordre de 100), et l’échange de données entre la JVM et le driver/executers PySpark qui se fait quasiment sans sérialisation. C’est grâce à Apache Arrow que Panda que les types de Panda sont pris en charge facilement par la JVM, car celui-ci en effectue une conversion à postériori.
Avant la release 3 de Spark, notamment dans la version 2.3, version à partir de laquelle Koalas a été introduite, si vous vouliez switcher de Pandas à Spark ou de Koalas à Spark, il vous fallait appliquer des fonctions de conversion spécifiques pour obtenir un objet DataFrame approprié. En plus, il fallait systématiquement installer Koalas séparément. A partir de la release 3 de Spark, spécialement la 3.2, Koalas est intégré à Spark par défaut et vous n’avez plus besoin de l’installer en tant que bibliothèque additionnelle.
5 – les autres innovations intéressantes de Spark 3
AQE et PySpark Koalas sont clairement les innovations les plus importantes de la release 3 de Spark. Toutefois, il y’a également d’autres innovations qui à mon sens ne sont pas fondamentales, mais facilitent la vie dans le cadre d’un travail de Data Engineer ou Data Analyst :
- Intégration de RocksDB dans Spark Streaming : dans les versions antérieures, Spark Streaming persiste les états des traitements streaming en mémoire dans les RDD. Mais cela peut poser problème si la mémoire n’est pas suffisante suite à un chargement de données, provoquant la persistance des états sur le disque. Dans la nouvelle release, Spark offre désormais la possibilité d’utiliser RocksDB comme Kafka Streams pour la persistance des états. RocksDB est plus indiquée que les RDD pour la gestion des états. C’est vraiment cool ! Vous intégrez RocksDB dans votre application Spark Streaming via la configuration « spark.sql.streaming.stateStore.providerClass« : « org.apache.spark.sql.execution.streaming.state.RocksDbStateStoreProvider ».
- Nouvelle WebUI pour faciliter le monitoring et le Debuggage des jobs Spark : l’interface WebUI de Spark a été améliorée, favorisant une meilleure compréhension de l’exécution des jobs, et donc un meilleur debuggage. Spark Streaming bénéficie même d’une interface complète plus détaillée pour le monitoring des jobs streaming.
- Prise en compte des GPU dans le Deep Learning : dans les versions précédentes, Spark MLib est utilisé pour le Machine Learning, mais ne supporte pas les algorithmes de Deep Learning comme les algorithmes de reconnaissance faciale, le traitement d’images, ou encore les réseaux de neurone. Spark 3 supporte désormais le Deep Learning via un projet appelé Hydrogen (initiative de Databricks et non de l’open source). De plus, il permet d’accéler l’apprentissage des modèles en utilisant les GPU (Graphical Processing Unit). Pourquoi les GPU ? Parce que ceux-ci sont généralement utilisés pour les jeux vidéos et le traitement d’image. En combinant la puissance des GPU et des CPU, on obtient une puissance de calcul qui est assez importante pour réaliser des travaux de Deep Learning. A ce jour, de nombreuses cartes graphiques sont supportées, notamment les cartes graphiques de NVidia, AMD ou Intel. NVIDIA a particulièrement travaillé sur l’intégration de Spark avec ses GPU. Vous pouvez retrouver toutes ces fonctionnalités dans le lien afférent ci-bas.
- meilleure intégration de Kubernetes et support des GPU sur YARN : Spark supportait déjà Kubernetes dans les versions 2, mais son intégration était faible et les déploiements de Spark en production sur Kubernetes ne produisaient pas vraiment de performances acceptables, en comparaison avec les déploiements sur YARN. Dans cette nouvelle release, Spark introduit un nouveau Scheduler sur Kubernetes qui permet d’affecter dynamiquement les ressources aux jobs Spark. Aussi, Spark 3.X supporte l’utilisation des GPU dan les déploiements Kubernetes. Il en est de même également pour YARN, qui supporte désormais avec la version 3 de Spark, la planification et l’exécution des tâches sur noeuds avec GPU.
- Une meilleure intégration avec le standard ANSI de SQL : la compatibilité de Spark avec la norme ANSI SQL a été améliorée. Vous pouvez désormais aciver cette compatibilité via la configuration « spark.sql.parser.ansi.enabled« , et ainsi forcer Spark à évaluer les requêtes SQL selon toutes les normes définies dans le standard ANSI 92 ou 2003 de SQL. On note aussi dans le même registre, le rajout de l’instruction « REPARTITION » dans les expressions SQL pour définir dans la requête SQL, le nombre de partitions des résultats de la requête. Par exemple :
df.createOrReplaceTempView("TBL_CLIENTS")
val df_db = spark.sql("SELECT /*+ REPARTITION(40) */ * FROM TBL_CLIENTS")
println("Nombre de partitions après l'exécution de la requête :" + df_db.rdd.getNumPartitions)
Le nombre de partitions après l’exécution de cette requête sera de 40.
Toujours dans le même registre de l’intégration de SQL à Spark, on note le rajout d’une centaine de fonctions SQL supplémentaires dans la bibliothèque de Spark SQL. Par exemple : extract(), forall(), from_csv(), make_date(), make_interval(), make_timestamp(), map_entries(). Cela se rajoute à la centaine déjà présente dans la bibliothèque.
Voilà ! Nous avons couvert les principales features de la release 3 de Spark. Comme vous avez pu le voir, elles sont très intéressantes, et ouvrent le champs vers de nouvelles possibilités aussi bien dans le Data Engineering que dans la Data Science. Cette release montre aussi que la discipline de l’ingénieurie de données a encore de très longues années excitantes devant elle et que le meilleur reste à venir. Nous sommes une maison d’édition spécialisée dans le Big Data et nous formons au Big Data. Si vous souhaitez devenir Data Engineer ou vous réorienter vers ce métier, n’hésitez pas à nous contacter, vous êtes au bon endroit.
Liens intéressants relatifs aux features de Spark 3 :
- https://spark.apache.org/releases/spark-release-3-0-0.html
- https://issues.apache.org/jira/browse/SPARK-31412
- https://databricks.com/fr/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html
- https://databricks.com/fr/blog/2020/05/20/new-pandas-udfs-and-python-type-hints-in-the-upcoming-release-of-apache-spark-3-0.html