Apache Spark pour les nuls

By 2 avril 2020 mai 4th, 2020 VeoNews

En général, lorsque vous pensez à un « ordinateur », vous pensez à une machine posée sur votre bureau à la maison ou au travail. Cette machine fonctionne parfaitement bien pour regarder des films ou travailler avec un tableur. Cependant, comme de nombreux utilisateurs le constateront probablement à un moment donné, il y a certaines choses que votre ordinateur n’est pas assez puissant pour effectuer. Un domaine particulièrement difficile est le traitement des données. Les machines individuelles n’ont pas assez de puissance et de ressources pour effectuer des calculs sur d’énormes quantités d’informations (ou l’utilisateur peut ne pas avoir le temps d’attendre que le calcul soit terminé).

Un cluster, ou groupe de machines, met en commun les ressources de nombreuses machines, ce qui nous permet d’utiliser toutes les ressources cumulées comme si elles ne faisaient qu’une. Or, un groupe de machines seul n’est pas puissant, vous avez besoin d’un cadre pour coordonner le travail entre elles. Spark est un outil qui permet de gérer et de coordonner l’exécution de tâches sur des données à travers un groupe d’ordinateurs.

Spark (ou Apache Spark) est un framework open source de calcul distribué in-memory pour le traitement et l’analyse de données massives. Il s’agit d’un ensemble d’outils structurés selon une architecture définie. Ces composants font de Spark une plate-forme unificatrice riche en fonctionnalités : elle peut être utilisée pour de nombreuses tâches qui devaient auparavant être accomplies avec plusieurs frameworks différents.

Schéma des modules Spark

Le groupe de machines que Spark utilisera pour exécuter des tâches sera géré par un gestionnaire de groupe comme le gestionnaire de groupe autonome de Spark, YARN, ou Mesos. Les demandes Spark sont ensuite soumises à ces gestionnaires de clusters qui accorderont des ressources à la demande afin que celle-ci puisse être effectuer.

Installer Spark localement

Si vous souhaitez télécharger et exécuter Spark localement, la première étape consiste à vous assurer que vous disposez de Java installé sur votre machine, ainsi qu’une version Python si vous souhaitez utiliser
Python. Ensuite, visitez la page de téléchargement officielle du projet, sélectionnez le type de paquet « Pre-built for
Hadoop 2.7 and later », et cliquez sur « Download Spark ». Il permet de télécharger un fichier TAR compressé que vous devrez ensuite extraire.

Télécharger Spark pour un cluster Hadoop

Spark peut fonctionner localement sans aucun système de stockage distribué, tel qu’Apache Hadoop. Toutefois, si vous souhaitez connecter la version Spark de votre ordinateur portable à un cluster Hadoop, assurez-vous télécharger la bonne version de Spark pour cette version de Hadoop. Mais à ce stade, il suffit de lancer Spark sur votre ordinateur portable pour commencer.

PySpark

Il est possible d’installer Spark pour Python simplement avec pip install pyspark

Cependant son utilisation nécessite bien les JARs Spark. Ce package seul n’est pas destiné à remplacer tous les autres cas d’utilisation et n’est adaptée qu’à l’interaction avec un cluster existant (qu’il s’agisse de Spark autonome, de YARN ou de Mesos), c’est-à-dire qu’il ne contient pas les outils nécessaires à la mise en place de son propre cluster Spark autonome, ce que permet la version complète de Spark téléchargé précédemment.

Lancement des consoles interactives de Spark

Vous pouvez lancer un shell interactif dans Spark pour plusieurs langages de programmation différents. Voici quelques exemples en Python et Scala.

Notez que, lorsque vous démarrez Spark dans ce mode interactif, vous créez implicitement une SparkSession qui gère l’application Spark. Lorsque vous le lancez par le biais d’une application autonome, vous devez créer explicitement la SparkSession  dans votre code.

Lancement de la console Python

Vous aurez besoin de Python 2 ou 3 installé pour lancer la console Python. Depuis le dossier d’installation de Spark, exécutez le code suivant :
./bin/pyspark

Après avoir fait cela, tapez spark et appuyez sur Entrée. La SparkSession devrait alors être affiché, dont nous allons parlé plus tôt dans cet article.

Lancement de la console Scala

Pour lancer la console Scala, exécutez la commande suivante :
./bin/spark-shell

Après avoir fait cela, tapez spark et appuyez sur Entrée. Comme en Python, vous verrez la SparkSession s’afficher.

Fonctionnement de Spark

 

Les applications Spark se composent d’un pilote (« driver process ») et de plusieurs exécuteurs (« executor processes »). Il peut être configuré pour être lui-même l’exécuteur (local mode) ou en utiliser autant que nécessaire pour traiter l’application, Spark prenant en charge la mise à l’échelle automatique par une configuration d’un nombre minimum et maximum d’exécuteurs.

Schéma du fonctionnement logique de Spark

Le driver (parfois appelé « Spark Session ») distribue et planifie les tâches entre les différents exécuteurs qui les exécutent et permettent un traitement réparti. Il est le responsable de l’exécution du code sur les différentes machines.

Chaque exécuteur est un processus Java Virtual Machine (JVM) distinct dont il est possible de configurer le nombre de CPU et la quantité de mémoire qui lui est alloué. Une seule tâche peut traiter un fractionnement de données à la fois.

Lors de l’exécution de l’application, Spark crée des jobs, des stages et des tasks. Sans aller trop loin dans le détail, les jobs se composent de stages, et les stages se composent de tâches (voir le schéma ci-dessous). Les stages sont généralement exécutés séquentiellement, tandis que les tâches peuvent être exécutées en parallèle dans le cadre d’un seul stage.

Schéma de l'architecture logique d'exécution

Schéma de l’architecture logique d’exécution

Afin que Spark puisse distribuer les calculs, il lui faut donc des exécuteurs (c’est-à-dire des machines de calcul), ce que proposent des plateformes cloud comme Google Cloud Plateform (GCP) ou Amazon Web Services (AWS).

Deux points clés à comprendre sur les applications Spark à ce stade sont :

  • Spark emploie un gestionnaire de groupe (cluster manager) qui assure le suivi des ressources disponibles.
  • Le processus de pilotage (driver process) est responsable de l’exécution le programme à travers les exécuteurs pour accomplir une tâche donnée.

Les exécuteurs, pour la plupart, exécuteront toujours du code Spark. Cependant, le conducteur peut être « piloté » à partir d’un certain nombre de langues différentes par l’intermédiaire des API de différents langages.

Spark est implémenté via des API dans plusieurs langages de programmation, à savoir Python, Java, Scala , SQL et R. Les API Spark permettent d’exécuter le code Spark. Pour faire simple, Spark présente des « concepts » fondamentaux dans chaque langage; ces concepts sont ensuite traduits en code Spark exécuté sur des cluster de machines.

Le framework étant écrit en Scala, il s’agit du langage privilégié pour les nouvelles fonctionnalités et qui est le plus efficace à utiliser pour travailler avec Spark.

PySpark est l’implémentation de Spark pour Python contenant les différents composants de Spark.

Concepts principaux

SparkSession

L’application Spark est contrôlé grâce à un processus de pilotage (driver process) appelé SparkSession. Une instance de SparkSession est la façon dont Spark exécute les fonctions définis par l’utilisateur dans l’ensemble du cluster. Une SparkSession correspond toujours à une application Spark. En Scala et Python, la variable est disponible sous spark lorsque vous démarrez la
console.

En Scala, taper spark donne ça :

res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@...

Et en Phyton cela ressemble à ça :

<pyspark.sql.session.SparkSession at 0x7efda4c1ccd0>

Dataframes

Un DataFrame représente simplement un tableau de données avec des lignes et des colonnes. La liste qui définit les colonnes et les types à l’intérieur de ces colonnes est appelée le schéma.

Il est possible de considérer DataFrame comme une feuille de calcul avec des colonnes nommées. Cependant une feuille de calcul se trouve sur un ordinateur à un endroit précis, alors qu’un DataFrame Spark peut s’étaler sur des milliers d’ordinateurs. La raison pour laquelle les données sont placées sur plus d’un l’ordinateur est assez simple : soit les données sont trop volumineuses pour tenir sur une seule machine, soit il serait simplement
trop long d’effectuer des calculs sur ces données sur une seule machine.

Attention donc à ne pas confondre les DataFrames Spark et ceux de Python (avec pandas) et R qui, bien que le même concept et facilement convertible de l’un à l’autre, ne sont stockés que sur une machine et non plusieurs.

Par exemple, pour créer un simple DataFrame avec une colonne number contenant 1 000 lignes avec des valeurs de 0 à 999. Cette plage de nombres représente une collection distribuée.
// Scala
val myRange = spark.range(1000).toDF("number")

# Python
myRange = spark.range(1000).toDF("number")

Partitions

Pour permettre à chaque exécuteurs de travailler en parallèle, Spark décompose les données en morceaux appelés des partitions. Une partition est un ensemble de rangées qui se trouvent sur une machine physique du cluster.
Les partitions de DataFrame représentent la manière dont les données sont physiquement réparties dans le groupe de machines pendant l’exécution.

Si il n’y a qu’une seule partition, Spark aura un parallélisme de un, même si le cluster est composés de milliers d’exécuteurs. Si il y a plusieurs partitions mais un seul exécuteur, Spark aura toujours un parallélisme de un car il n’y a qu’une seule ressource de calcul.

Une chose importante à noter est qu’avec les DataFrames, les partitions s ne sont pas manipulés (pour la plupart) manuellement ou individuellement. Seul des transformations de haut niveau des données sont spécifiés et Spark détermine comment ce travail sera réellement exécuté sur le cluster. Ils existent cependant des API plus bas niveau tel que les Resilient Distributed Datasets (RDD) qui permettent cela.

Transformations

Dans Spark, les structures de données de base sont immuables, ce qui signifie qu’elles ne peuvent pas être modifiées après avoir été créé. Pour « changer » un DataFrame, il faut indiquer à Spark comment le modifier. Ce sont les transformations.

Par exemple, récupérons tous les nombres paires du DataFrame créé précédemment :

// Scala
val divisBy2 = myRange.where("number % 2 = 0")

# Python
divisBy2 = myRange.where("number % 2 = 0")

Notez que ces opérations ne renvoient aucun résultat. C’est parce que nous n’avons spécifié qu’une transformation abstraite, et Spark n’agira pas sur les transformations tant que nous n’aurons pas appelé une action (décrit plus bas). Il y a deux types de transformations : celles qui spécifient des dépendances étroites, et celles qui spécifient des dépendances larges.

Les transformations consistant en des dépendances étroites (appellées également transformations étroites ou narrow transformations) sont celles pour lesquelles chaque partition d’entrée ne contribuera qu’à une seule partition de sortie. Dans le code précédent, le where spécifie une dépendance étroite, où une seule partition contribue à au maximum une partition de sortie.

A l’inverse, une dépendance large (ou transformation large) aura des partitions d’entrée qui contribuent à de nombreuses partitions de sortie.

Spark effectue donc une lazy evalutation qui signifie que Spark attendra le tout dernier moment pour exécuter le graphique des instructions de calcul. Dans Spark, au lieu de modifier les données immédiatement lorsque une opération est définie, un plan de transformations à appliquer aux données sources est élaboré. En attendant le dernier moment pour exécuter le code (c’est-à-dire la prochaine action), Spark compile ce plan à partir du DataFrame brut à un plan physique qui fonctionnera aussi efficacement que possible dans l’ensemble du cluster.

Actions

Les transformations permettent de construire un plan de transformation logique. Pour déclencher le calcul, il faut utiliser une action. Une action demande à Spark de calculer un résultat à partir d’une série de transformations.

L’action la plus simple est le count, qui nous donne le nombre total d’enregistrements dans un DataFrame. Par exemple,
divisBy2.count()

La sortie du code précédent doit être 500. Bien sûr, le comptage n’est pas la seule action. Il y a trois types d’actions :

  • Actions de visualisation des données
  • Actions visant à collecter des données sur les objets natifs dans le langage de programmation respectif
  • Actions d’écriture sur les sources de données de sortie

En précisant cette action, nous avons lancé une tâche Spark qui gère la transformation de filtrage (une transformation étroite), puis une agrégation (une transformation large) qui effectue les comptages sur chaque partition, puis une collecte, qui renvoie notre résultat à un objet natif au langage correspondant.

Sources

The Data Scientist’s Guide to Apache Spark™, Databricks, 2017

Spark – The Definitive Guide – Big Data processing made simple, Bill Chambers and Matei Zaharia, 2018