Quelles nouveautés pour Apache Spark 2.3 ?

Plongeons nous dans les nouveautés proposées par la nouvelle distribution 2.3 d’Apache Spark.

Cette article est composé de recherches et d’informations issues des présentations suivantes du DataWorks Summit 2018 :

  • Apache Spark 2.3 boosts advanced analytics & deep learning par Yanbo Liang, Staff Software Engineer @ Hortonworks
  • ORC Improvement in Apache Spark 2.3 par Dongjoon Hyun, Principal Software Engineer @ Hortonworks Data Science Team

Sommaire

Spark 2.0 a introduit énormément de changement majeurs qui ont améliorés de plus de dix fois les performanxces.

Spark 2.3 est la dernières release en date de la branche 2.X, apportant les fonctionnalités suivantes :

  • Support des UDFs Pandas / Vectorizés dans PySpark
  • Support de représentation et lecture d’images dans les APIs DataFrame et Dataset
  • Optimisation de modèles Machine Learning en parallèle
  • Nouveau mode de streaming “Continuous processing” (expérimental)
  • Ajout des jointures stream-stream
  • Nouvelle API Datasource V2 (beta)
  • Support ORC natif
  • Spark dans Kubernetes (expérimental)

UDFs d’analyse avancés

La communauté Spark a énormément travaillé sur les UDFs Python.

Depuis Spark 0.7, les UDFs Python appliquaient les transformations de données ligne par ligne, ce qui entrainait beaucoup de sérialization et de l’overhead d’invocation. Bien sûr les jobs PySpark en souffraient en terme de performance et impactait les méthodes de développement de job des Data Scientists (utiliser Scala et apprendre un nouveau langage ou se faire accompagner par un développeur, ou encore accepter de mauvaises performances).

Spark 2.3 permet maintenant d’encapsuler et manipuler un DataFrame avec la librairie Pandas dans un UDF PySpark afin de profiter de vrai performances. Ces UDFs sont appelés Vectorized UDFs (UDFs Vectorisés, ou UDFs  Pandas). Ils sont le résultats de deux efforts complémentaires :

Les DataFrames sont maintenant sérializés avec Apache Arrow qui permet une bien meilleur conversion entre les formats Spark & Pandas (entre 10 et plusieurs centaines de fois plus rapide).

Les UDFs Pandas résultants sont réparties en deux catégories :

  • scalaires: applique une transformation à toutes les valeurs de la colonne, ex: v => v + 1.
  • map groupée: applique une logique “split – apply – combine” sur un DataFrame précédemment groupé, ex:  v => v - mean(v)  (pour chaque valeur de chaque groupe, soustraire la moyenne des valeurs du groupe).

Pour activer les UDFs Pandas, il faut indiquer la propriété Spark suivante :

Plus d’informations ici.

Apprentissage

Le Deep Learning peut être défini comme un “ensemble de techniques de Machine Learning capable d’apprendre des représentations utiles de fonctionnalités à partir d’images, de texte et de son”

MLlib est une API simple et concise permettant de créer des pipelines de Machine Learning exploitant la capacité de scaling et de traitement massif de données de Spark.

Support de représentation et lecture d’images dans les APIs DataFrame et Dataset

MLlib est une librairie de Machine Learning, et en version 2.3 elle peut également servir à traiter des graph TensorFlow ou des modèles Keras comme Transformer ou même comme fonction SparkSQL, afin d’être utilisée pour du Deep Learning.

Databricks a ouvert en open source un paquet Spark (spark-deep-learning) permettant justement de le faire et a récemment publié la version 0.3.0.

Cependant, pour pouvoir traiter des images en Deep Learning il faut être capable de les lire pour les interprêter, ce qui est exactement le sujet de SPARK-21866. En version 2.3 Spark est capable de lire une image à partir d’un chemin et de la parser en DataFrame avec le schéma suivant :

Le DataFrame résultant contient une Structure avec les métadonnées de l’image et les données binaires, le tout basé sur les conventions OpenCV, sur lesquels on peut appliqués des modèles entrainés.

Optimisation de modèles Machine Learning en parallèle

Optimiser un modèle de Machine Learning consiste avant la 2.3 à entrainer séquentiellement le même modèle plusieurs fois avec différents paramètres d’entrée en utilisant les algorithmes CrossValidator ou TrainValidationSplit pour déterminer les paramètres donnant les meilleurs performances. Chaque exécution profite du parallélisme de Spark mais les jobs en eux même sont séquentiels. SPARK-19357 introduit le paramètre parallelism aux algorithmes qui permet de définir le nombre de jobs à exécuter en parallèle à un instant donné et donc de mieux exploiter les ressources du cluster Spark.

Plus d’informations sur ce sujet ici et ici.

Streaming

Spark 2.3 introduit plusieurs nouveauté côté Streaming qui améliorent de beaucoup ses capacités et permet de le rapprocher un peu plus de ce que propose Apache Flink.

Continuous processing (expérimental)

Jusqu’à maintenant Spark Streaming consistait en une série de micro-batch à faible latence (~100ms) avec des garanties de résistance à la panne et de réception “exactly-once”. Le nouveau mode de déclenchement expérimental introduit en version 2.3, Continuous Processing (ou Traitement Continu), permet d’avoir des temps de latences encore plus faible (~1ms) avec des garenties de résistance à la panne et de réception “at-least-once”.

Étant donnée que le Continuous Processing est un mode de déclenchement du moteur actuel et non un nouveau moteur à part entière, ses checkpoints sont compatibles avec le mode micro-batch.

Un stream Continuous Processing supporte toute fonction SparkSQL (hors aggrégation) mais seulement les opérations “map” (select, where, map, filter, …).

Quelques points à noter :

  • Une requête Continuous Processing lancera en parallèle autant de tâches Spark qu’il y a de nombre de partitions à lire, il faut donc fournir suffisement de coeurs sur le cluster (e.g 10 partitions Kafka = 10 coeurs).
  • Arrêter un stream en Continuous Processing produira peut être des warnings de fin de tâche à ignorer.
  • Il n’y a pas de ré-essai automatique des tâches échouées, ce qui résulte en une requête échouée qui doit être relancée manuellement depuis un checkpoint.

Jointures Stream-stream

Spark 2.0 a introduit la notion de jointure Stream-static, permettant de joindre un stream avec un DataFrame/DataSet statique (table de référence par ex.). Spark 2.3 permet maintenant de jointre deux streams entre eux.

Pour s’assurer que la jointure se fait bien à n’importe quel moment, les états passés de chaque stream sont bufferisés. Ainsi n’importe quelle ligne peut être matchée avec les lignes futures de l’autre stream. Un watermark de délai doit être défini sur les deux entrées afin que le moteur sache combien de temps un input peut être délayé et une contrainte de temps doit être indiquée lors de la requête de jointure (une condition de fenêtrage temporel ou un lien de fenêtre du stream). Ainsi la mémoire buffer de Spark ne risque pas d’être saturée.

La contrainte de temps et le délai sont optionels pour les requête INNER JOIN mais obligatoire pour les LEFT et RIGHT OUTER JOIN. Les requêtes de type FULL OUTER JOIN ne sont pas supportées.

Le point faible des jointures externes est que les lignes non associées (résultat NULL) seront traitées à expiration du délai watermark et, si le mode micro-batch est utilisé, les données du batch courant dont le délai a expiré seront uniquement traité au batch suivant (e.g. quand de nouvelles données arrivent). Ainsi une donnée avec un délai expiré peut être en attente plus longtemps que la durée du délai.

Plus d’informations sur le Continuous Processing et les jointures Stream-stream dans le guide de programmation Spark Structured Streaming

API Datasource V2 (beta)

L’API Datasource de Spark lui permet de s’intégrer avec de nombreuses sources de données (Hive, Avro, CSV, Parquet, …) avec des interactions plus fines qu’un simple processus de lecture et chargement en mémoire.

Cependant la V1 a quelques faiblesses : l’API a quelques dépendances dans les APIs haut niveau (DataFrame et SQLContext), il n’y a aucune garantie transactionelle à l’écriture, et elle est compliquée à étendre s’il faut des optimisations. Ces différents points ont motivé l’équipe de Databricks à designer une API V2 avec les objectifs suivants en tête :

  • Écrite en Java.
  • Aucune dépendance sur les APIs haut-niveau (DataFrame, RDD, …).
  • Facile à implémenter, permet d’ajouter de nouvelles optimisations tout en gardant la rétro-compatibilité.
  • Permet de remonter des informations physiques (taille, partitionnement, …).
  • Support des sources / puits streaming.
  • API d’écriture puissante, transactionelle et flexible.
  • Aucun changment pour l’utilisateur final.

Pour la sortie de Spark 2.3 les fonctionalitées suivantes sont disponibles et testables (l’API est toujours en développement et plus de fonctionnalitée seront disponibles en 2.4) :

  • Support de scan colonnaire et par ligne.
  • “Column pruning and filter push-down”.
  • Permet de remonter des statistiques basiques et du partitionneleent de données.
  • API d’écriture transactionelle.
  • Support des sources / puit streaming pour les modes micro-batch et continu.

Une analyse p lus profonde de l’API Datasource V2 est disponible ici.

Support ORC natif

Avant que soit lancé le projet Apache ORC, Spark (depuis sa version 1.4) utilisait du legacy code basé sur Hive 1.2.1 pour lire et écrite les fichiers ORC. Lorsque le projet est arrivé le module a été amélioré mais gardait toujours une dépendance avec Hive et des erreures différenciant l’ORC produit par Spark et les spécifications (charactères unicodes / points dans les noms de colonnes, évolutions de schéma, fichiers ORC vides lorsque’ la partition du DataFrame est vide, …).

Spark 2.3 corrige ces points en ajoutant un nouveau reader ORC vectorisé dit native (ou natif) dans le module sql/core qui est entièrement indépendant de Hive. Le nouveau OrcFileFormat (voir SPARK-20682) améliore les performances par un facteur 2 à 5.

Pour pouvoir l’utiliser, il faut soit indiquer le paramètre spark.sql.orc.impl=native  et spécifier le format source / cible comme orc, ou bien indiquer le format org.apache.spark.sql.execution.datasources.orc (voir SPARK-20728). En attendant Spark 2.4, l’implémentation ORC par défaut rester Hive pour maintenir la compatibilité avec les anciennes données.

Spark est également capable de convertir des sources de données en utilisant le reader vectorisé, il suffit d’indiquer le paramètre  spark.sql.hive.convertMetastoreOrc=true  (défaut = false, nécessite le reader natif).

Enfin le module supporte l’évolution de schéma. Plutôt que de supprimer et recréer une table comme jusqu’à maintenant, on peut ajouter une nouvelle colonne à la fin, cacher une colonne ou en changer le type et la position.

Également, le module ne supporte pas de transaction ACID (pas d’activité ou de besoin exprimé à ce sujet au sein de la communauté Spark) ou le bucketting (Spark a divergé de l’implémentation de Hive et il y a des difficultés pour merger les deux, peu d’évolutions sur le sujet).

Spark dans Kubernetes (expérimental)

Il y a un nouveau scheduler expérimental permettant de soumettre des jobs Spark à un cluster managé par Kubernetes au lieu de YARN, Mesos ou Spark Standalone. Le driver et les exécuteurs sont déployés dans des Pods avec une image Docker spécifiée via le paramètre de configuration spark.kubernetes.container.image=/path/to/img  (un Dockerfile example est fourni avec la release de Spark 2.3). Application dependencies should be either added to a custom-build Docker image or referenced remotely.

Plus d’informations dans SPARK-18278 et la documentation.

Sources


Also published on Medium.

À propos de l'auteur :

Consultant Big Data @ Adaltas depuis 2015, j'aime découvrir de nouvelles choses et expérimenter avec les nouvelles technologies en plus de mon métier de tous les jours

Laisser un commentaire