Présentation du partitionnement d'Apache Spark

Ce que vous devez savoir

Les « datasets distribués résilients » (Resilient Distributed Datasets, RDD) d'Apache Spark sont des ensembles de données diverses dont la taille est si grande qu'ils ne tiennent pas dans un seul nœud et doivent être partagés entre plusieurs nœuds. Apache Spark divise automatiquement les RDD et répartit les partitions entre différents nœuds. Ils sont évalués tardivement (c.-à-d. que l'exécution ne débutera pas avant qu'une action ne soit déclenchée, ce qui facilite leur gestion, réduit le nombre de calculs et ainsi améliore l'optimisation et la rapidité) et les transformations sont sauvegardées sous forme de graphes orientés acycliques (Directed Acyclic Graphs, DAG). Ainsi, avec chaque action réalisée sur les RDD, Apache Spark recalcule les DAG.

Il est important de comprendre les caractéristiques des partitions dans Apache Spark afin de pouvoir améliorer les performances, le débogage et le traitement des erreurs.

Voici quelques principes fondamentaux régissant le partitionnement :

  • Tous les nœuds d'un cluster Spark contiennent au moins une partition.
  • Le nombre de partitions utilisées dans Spark peut être configuré et il n'est pas souhaitable qu'il soit trop faible (ce qui réduit la concurrence et entraîne une asymétrie des données et une mauvaise utilisation des ressources) ou trop important (car la planification des tâches prendra alors plus de temps que leur exécution). Par défaut, il est défini sur le nombre total de cœurs sur tous les nœuds de l'exécuteur.
  • Dans Spark, les partitions ne regroupent pas plusieurs machines.
  • Les tuples se trouvant dans la même partition sont assurés d'être sur la même machine.
  • Spark attribue une tâche à chaque partition et chaque collaborateur ne peut traiter qu'une seule tâche à la fois.

Partitionnement par hachage ou partitionnement par intervalles dans Apache Spark

Apache Spark prend en charge deux types de partitionnement : le « partitionnement par hachage » et le « partitionnement par intervalles ». Le choix du partitionnement dépend de la façon dont les clés dans vos données sont réparties ou séquencées, ainsi que de l'action à effectuer sur vos données. De nombreux facteurs peuvent peser sur votre décision, notamment :

  • Les ressources disponibles — Le nombre de cœurs sur lesquels la tâche peut être exécutée.
  • Les sources de données externes — La taille des ensembles locaux, de la table Cassandra ou du fichier HDFS déterminent le nombre de partitions.
  • Les transformations utilisées pour déduire un RDD — Un certain nombre de règles régissent le nombre de partitions lorsqu'un RDD est déduit à partir d'un autre RDD.

Comme vous pouvez le voir, il y a de nombreux facteurs à prendre en compte lors de l'utilisation d'Apache Spark. Dans cet article de blog, je tiens à souligner l'importance d'une connaissance parfaite de vos données métiers, de leurs clés et leurs ressources physiques et surtout du réseau, du processeur et de la mémoire, pour le traitement dans Spark.

Étudions maintenant certains des écueils couramment liés à l'utilisation du partitionnement d'Apache Spark :

Données asymétriques et blocs de shuffle

Le traitement des données avec le partitionnement par défaut d'Apache Spark risque de se traduire par des données asymétriques, ce qui pourrait entraîner des problèmes de shuffle pendant les opérations d'agrégation ou un manque de mémoire dans un exécuteur unique.


Exemple de données asymétriques

Nous voyons ici que la clé « key-a » comporte un plus grand nombre de données dans la partition. Par conséquent, les tâches sur l'exécuteur Exec-5 prendront beaucoup plus de temps que celles des cinq autres clés. Autre chose importante à ne pas oublier : les blocs de shuffle Spark ne doivent pas faire plus de 2 Go (en interne car la taille MAX_SIZE de l'abstraction ByteBuffer est définie sur 2 Go). Par exemple, si une opération telle qu'une agrégation, une jointure ou une opération de cache est exécutée, un shuffle Spark surviendra. Or un petit nombre de partitions ou une asymétrie des données peut entraîner une taille de blocs de shuffle trop élevée. Ainsi, si vous avez constaté une erreur liée à un dépassement des limites MAX_SIZE en raison d'un shuffle, vous savez qu'elle peut être associée à des données asymétriques.

Partitionnez à bon escient

Comment éviter des données asymétriques et des blocs de shuffle ? En procédant à un partitionnement judicieux. Il est essentiel de procéder à un partitionnement judicieux afin de contrôler la sollicitation de la mémoire et de garantir une utilisation optimale des ressources sur les nœuds de l'exécuteur. Vous devez toujours connaître vos données (taille, type et répartition). Quelques bonnes pratiques à garder à l’esprit :

  • Sachez choisir les bons opérateurs pour des actions telles que reduceByKey ou aggregateByKey afin que votre pilote ne soit pas trop sollicité et que les tâches soient correctement exécutées sur les exécuteurs.
  • Si vos données arrivent dans un petit nombre de gros fichiers non fractionnables, le partitionnement dicté par InputFormat pourrait placer un grand nombre d'enregistrements dans chaque partition, tout en ne générant pas assez de partitions pour utiliser tous les cœurs disponibles. Dans ce cas, ayez recours à une répartition avec un nombre élevé de partitions après le chargement des données pour que les opérations qui suivent utilisent davantage le processeur du cluster.
  • De même, si les données sont asymétriques, il est recommandé de procéder à un repartitionnement à l'aide d'une clé adéquate permettant de répartir la charge uniformément.

Talend offre un composant tPartition permettant d'effectuer ce repartitionnement en fonction des clés que vous choisissez.

Comment parvenir au bon nombre de partitions ?

Apache Spark ne peut exécuter qu'une seule tâche simultanée pour toutes les partitions d'un RDD, dans la limite du nombre de cœurs dans votre cluster (voire 2 à 3 fois ce nombre). C’est pourquoi le « bon » nombre de partitions correspondra généralement au moins au nombre d'exécuteurs afin de garantir le parallélisme. Il est possible d'obtenir cette valeur calculée en appelant sc.defaultParallelism. La taille maximale d'une partition est limitée en fin de compte par la mémoire disponible d'un exécuteur.

Dans certains cas, il n'est pas possible de savoir quelle clé de repartitionnement doit être utilisée pour garantir une répartition uniforme des données. Ainsi, des méthodes telles que le « salting » qui permet d'ajouter une nouvelle clé « fictive » et de l'utiliser parallèlement à la clé actuelle pour obtenir une meilleure répartition des données peuvent être utilisées. Exemple :

  • Ajoutez un élément aléatoire à un gros RDD et créez une nouvelle clé de jointure avec lui, par exemple « clé de salting = clé de jointure réelle + clé aléatoire fictive, la clé fictive ayant une valeur comprise entre 1 et N et N étant le niveau de répartition ».
  • Ajoutez un élément aléatoire à un petit RDD à l'aide d'un produit cartésien (1-N), pour augmenter le nombre d'entrées et créer une nouvelle clé de jointure.
  • Joignez les RDD sur une nouvelle clé de jointure. Ils seront mieux répartis du fait d'un essaimage aléatoire.
  • Ôtez la clé fictive aléatoire de la clé de jointure pour obtenir le résultat définitif de la jointure.

Dans l'exemple ci-dessus, la clé fictive dans le dataset de recherche sera un produit cartésien (1-N) et, pour le dataset principal, elle sera une clé aléatoire (1-N) pour l’ensemble de données sources se trouvant sur chaque ligne, N étant le niveau de répartition.

Talend et Apache Spark

Talend Studio inclut des assistants et des outils graphiques qui génèrent un code natif ; vous pouvez ainsi être immédiatement opérationnel avec Apache Spark et Spark Streaming et même partitionner vos données correctement. Les techniques décrites ci-dessus peuvent être appliquées à l'aide du composant tMap de Talend. Vous pouvez également répondre aux besoins en repartitionnement à l'aide du composant tPartition de Talend si vous avez une bonne connaissance des données, et de notre composant tMap pour utiliser des techniques de salting et de génération de nombres aléatoires, au besoin, si vous avez peur que certaines données aient des clés asymétriques ou des valeurs nulles.

J'espère que ce bref article de blog vous aura appris quelque chose de nouveau sur les principes fondamentaux du partitionnement et du traitement dans Apache Spark. Pour savoir comment utiliser Talend et Apache Spark ensemble afin d'accélérer et d'adapter le traitement de vos big data, vous pouvez consulter la page consacrée à nos solutions. 

Références :

https://issues.apache.org/jira/browse/SPARK-6235

https://0x0fff.com/spark-architecture

https://www.youtube.com/watch?v=WyfHUNnMutg

http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/

http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/

https://stackoverflow.com/questions/40373577/skewed-dataset-join-in-spark

Prêt à faire vos premiers pas avec Talend ?