Qu’est-ce que MapReduce et comment l’utiliser ?

Sur le marché actuel, qui est piloté par les données, les algorithmes et les applications collectent des données 24x7 sur les personnes, les processus, les systèmes et les entreprises, ce qui entraîne la génération de volumes de données considérables. Le défi consiste à définir la solution qui permettra de traiter ces volumes de données rapidement, efficacement et sans pertes de connaissance significatives.

C'est là qu'intervient le modèle de programmation MapReduce. Utilisé initialement par Google pour analyser ses résultats de recherche, MapReduce a gagné en popularité grâce à sa capacité à diviser et traiter plusieurs téra-octets de données en parallèle et à obtenir ainsi des résultats plus rapides.

MapReduce – Présentation générale

MapReduce est un modèle (ou structure) de programmation disponible dans les environnements Hadoop qui est utilisé pour accéder aux big data stockées dans le Hadoop File System (HDFS). MapReduce est un élément essentiel et fait partie intégrante du fonctionnement de l'environnement Hadoop.

MapReduce facilite les traitements concurrents en divisant les péta-octets de données en volumes plus petits et en les traitant en parallèle sur des serveurs standard dédiés à Hadoop. Pour résumer, MapReduce agrège les données de plusieurs serveurs et renvoie un résultat consolidé à l'application.

Pour plus de détails, consultez la page « Hadoop – Présentation générale » →

Par exemple, un cluster Hadoop de 20.000 serveurs (serveurs standard et peu coûteux) avec des blocs de données de 256 Mo peut traiter environ 5 To de données. Avec MapReduce, vous pouvez donc réduire le temps de traitement par rapport au traitement séquentiel d'un dataset aussi volumineux.

Avec MapReduce, plutôt que d'envoyer les données à l'endroit où se trouve l'application ou les algorithmes, les algorithmes sont exécutés sur le serveur où se trouvent déjà les données, ce qui a pour effet d'accélérer le traitement. L'accès aux données et le stockage des données se font sur disque – les entrées sont généralement stockées sous forme de fichiers contenant des données structurées, semi-structurées ou non structurées, et la sortie est également stockée dans des fichiers.

Autrefois, MapReduce était la seule méthode possible pour récupérer les données stockées dans le SFHD, mais ce n'est plus le cas aujourd'hui. Aujourd'hui, il existe d'autres systèmes tels que Hive et Pig qui permettent de récupérer les données du HDFS à l'aide de requêtes de type SQL (et ces tâches sont généralement exécutées en même temps que les tâches écrites à l'aide du modèle MapReduce afin de profiter des avantages uniques de MapReduce).

MapReduce – Principes de base

Au cœur de MapReduce se trouvent deux fonctions, Map et Reduce, qui sont séquencées l'une après l'autre.

  • La fonction Map transforme les entrées du disque en paires <key,value>, les traite et génère un autre ensemble de paires <key,value> intermédiaires en sortie.
  • La fonction Reduce transforme également les entrées en paires <key,value> et génère une des paires <key,value> en sortie.

Les types key et value varient en fonction du cas d'usage considéré. Toutes les entrées et sorties sont stockées dans le HDFS. La fonction Map est une étape obligatoire pour filtrer et trier les données initiales, mais la fonction Reduce est facultative.

<k1, v1> -> Map() -> list(<k2, v2>)
<k2, list(v2)> -> Reduce() -> list(<k3, v3>)

Les termes « Mapper » et « Reducer » désignent les serveurs Hadoop qui exécutent respectivement les fonctions Map et Reduce (ces serveurs peuvent avoir des spécifications identiques ou différentes).

Map

Les données d'entrée sont divisées en blocs plus petits. Chaque bloc est ensuite assigné à un Mapper pour traitement.

Par exemple, si un fichier contient 100 enregistrements à traiter, 100 Mappers peuvent s'exécuter ensemble et traiter un enregistrement chacun, ou 50 Mappers peuvent s'exécuter ensemble et traiter deux enregistrements chacun, et ainsi de suite. Le framework Hadoop décide du nombre de Mappers à utiliser en fonction du volume de données à traiter et de la taille des blocs de mémoire disponibles sur chaque serveur Mapper.

Reduce

Lorsque tous les Mappers ont terminé leur traitement, le framework mélange et trie les résultats avant de les transmettre aux Reducers (les Reducers ne peuvent pas démarrer avant que tous les Mappers aient terminé leur traitement). Les valeurs de sortie map affectées de la même valeur key sont assignées à un seul Reducer, qui agrège les valeurs map pour cette key.

Combine et Partition

Il existe deux étapes intermédiaires entre Map et Reduce : Combine et Partition.

Combine est un processus facultatif. Un Combiner est en fait un Reducer complémentaire qui fonctionne séparément sur chaque serveur Mapper. Le Reducer poursuit la réduction des données de chaque Mapper sous une forme simplifiée avant de les transmettre en aval – ce qui facilite et accélère les opérations de mélange et le tri, dans la mesure où le volume de données à traiter a été réduit. Dans bien des cas, en raison des actions cumulatives et associatives exécutées par la fonction Reduce, la classe du Combiner est celle du Reducer (si nécessaire, le Combiner peut être associé à une classe distincte).

Partition est le processus qui traduit les paires <key, value> générées par les Mappers en d'autres paires <key, value> qui sont injectées dans le Reducer. Partition définit le mode de présentation des données au Reducer et assigne ce mode à un Reducer donné.

Le Partitioner par défaut détermine la valeur de hachage de clé transmis par le Mapper et assigne une partition en fonction de cette valeur. Il y a autant de partitions que de Reducers. Lorsque le partitionnement est terminé, les données de chaque partition sont transmises à un Reducer donné.

Exemple d'exécution de MapReduce

Prenons l'exemple d'un système d'e-commerce qui reçoit chaque jour un million de demandes de traitement (paiements). Un tel volume de demandes peut contenir plusieurs exceptions de type « paiement refusé par une passerelle de paiement », « article non disponible en stock », « adresse incorrecte », etc. Un développeur décide d'analyser les logs des quatre derniers jours pour déterminer le nombre d'occurrences des différentes exceptions.

Exemple de cas d'usage

L'objectif consiste à isoler les cas d'usage qui déclenchent le plus grand nombre d'exceptions et de prendre des mesures de correction en conséquence. Par exemple, si la même passerelle de paiement affiche fréquemment une exception, est-ce à cause d'un service peu fiable ou d'une interface mal écrite ? Si l'exception « article non disponible en stock » est souvent déclenchée, cela signifie-t-il que le service de calcul des stocks doit être amélioré ou que les stocks doivent être augmentés pour certains articles ? Ou les deux ?

Le développeur peut poser des questions pertinentes, puis déterminer la marche à suivre en fonction des données reçues en réponse. MapReduce est un modèle de programmation parfaitement adapté à ce type d'analyse sur des logs volumineux (plusieurs millions d'enregistrements). Plusieurs Mappers peuvent traiter ces logs simultanément. Par exemple, un Mapper peut traiter le log d'un jour donné (ou même un sous-ensemble de ce log en fonction de la taille du log et du bloc de mémoire disponible sur le serveur de ce Mapper).

Map

Pour simplifier, supposons que le framework Hadoop utilise seulement quatre Mappers : Mapper 1, Mapper 2, Mapper 3 et Mapper 4.

La valeur entrée dans le Mapper est un enregistrement du fichier log. key peut être une chaîne de caractères telle que « nom du fichier log + numéro de ligne dans ce fichier ». Le Mapper traite chaque enregistrement du fichier log et génère des paires de valeurs key. Dans cet exemple, nous utiliserons simplement la valeur de remplissage « 1 » pour ces valeurs. La sortie des Mappers se présente par exemple comme suit :

Mapper 1 -> , , , ,
Mapper 2 -> , , ,
Mapper 3 -> , , , ,
Mapper 4 -> , , ,

En supposant qu'un Combiner tourne sur chaque Mapper, soit Combiner 1 .... Combiner 4 et qui calcule le nombre d'occurrences de chaque exception (soit la même fonction que le Reducer), l'entrée de Combiner 1 sera :

, , , ,

Combine

La sortie du Combiner 1 sera :

, ,

La sortie des autres Combiners sera :

Combiner 2 :
  Combiner 3 :
  Combiner 4 :

Partition

Le Partitioner répartit ensuite les données des Combiners entre les Reducers. Par ailleurs, les données sont triées pour le Reducer.

L'entrée des Reducers sera la suivante :

Reducer 1 : {3,2,3,1}
  Reducer 2 : {1,2,1,1}
  Reducer 3 : {1,1,2}

Si aucun Combiner n'était impliqué, l'entrée des Reducers serait la suivante :

Reducer 1 : {1,1,1,1,1,1,1,1,1}
  Reducer 2 : {1,1,1,1,1}
  Reducer 3 : {1,1,1,1}

Nous avons ici un exemple très simple, mais lorsqu'il est question de traiter plusieurs téra-octets de données, l'amélioration de la bande passante du processus des Combiners est significative.

Reduce

Ensuite, chaque Reducer calcule le nombre total des exceptions, par exemple :

Reducer 1 :
  Reducer 2 :
  Reducer 3 :

Les données générées montrent que l'exception A est déclenchée plus souvent que les autres et donc qu'elle justifie un examen plus approfondi. Si l'analyse porte sur plusieurs semaines ou plusieurs mois de données, MapReduce montre alors toutes ses qualités.

Comment implémenter MapReduce

Les programmes MapReduce ne se limitent pas à Java : ils peuvent également être écrits en C, C+++, Python, Ruby, Perl, etc. Voici un exemple de la fonction principale d'un job MapReduce :

public static void main(String[] args) throws Exception {

JobConf conf = new JobConf(ExceptionCount.class);
conf.setJobName("exceptioncount");

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setCombinerClass(Reduce.class);

conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));

JobClient.runJob(conf);

}

Les différents paramètres (nom de la classe MapReduce, classes Map, Reduce et Combiner, types d'entrée et de sortie, chemins des fichiers d'entrée et de sortie) sont définis dans la fonction main. La classe Mapper étend MapReduceBase et implémente l'interface Mapper. La classe Reducer étend MapReduceBase et implémente l'interface des Reducers.

Pour examiner un exemple de code plus détaillé, consultez les didacticiels Hadoop.

Didacticiels MapReduce proposés par Talend

Bien que MapReduce soit une approche agile et résiliente pour résoudre les problèmes des big data, sa complexité inhérente signifie qu'il faut un certain temps aux développeurs pour acquérir de l'expertise. Pour pouvoir appliquer MapReduce à des big data, les entreprises doivent bénéficier de personnel qualifié et d'une infrastructure performante.

C'est là qu'intervient la solution d'intégration de données de Talend. Cette solution définit un cadre/framework idéal pour regrouper les différents outils utilisés dans l'écosystème Hadoop : Hive, Pig, Flume, Kafka, HBase, etc. Talend Studio définit un environnement doté d'une interface utilisateur qui permet aux utilisateurs de charger des données dans le HDFS et de les extraire du HDFS.

Regardez cette vidéo qui présente Talend Studio. →

Plus particulièrement, avec MapReduce, Talend Studio facilite de nombreuses opération, dont : création de jobs qui peuvent s'exécuter dans le cluster Hadoop, définition de paramètres tels que les classes Mapper et Reducer, formats d'entrée et de sortie, etc.

Dès que vous créez un job Talend MapReduce (opération différente de la définition d'un job dans Apache Hadoop), celui-ci peut être déployé en tant que service, exécutable ou job autonome qui tourne en mode natif sur le cluster des big data. Le job génère un ou plusieurs jobs Hadoop MapReduce qui, à leur tour, exécutent l'algorithme MapReduce.

Avant d'exécuter un job MapReduce, la connexion Hadoop doit être configurée. Pour plus de détails sur l'utilisation des solutions Talend qui permettent de configurer les jobs MapReduce, consultez ces didacticiels.

Utiliser MapReduce pour résoudre les problèmes de traitement des big data

Le paradigme de programmation MapReduce peut être utilisé avec tout problème complexe qui peut être résolu par parallélisation.

Un site de médias sociaux pourrait, par exemple, utiliser MapReduce pour déterminer le nombre de nouvelles inscriptions reçues le mois dernier en provenance de différents pays, afin de constater l'évolution de sa popularité dans différentes régions géographiques. Autre exemple : un trader pourrait effectuer ses rapprochements par lots plus rapidement et déterminer également quels scénarios entraînent le plus souvent la rupture des opérations. De leur côté, les moteurs de recherche pourraient utiliser MapReduce pour déterminer le nombre de pages vues, et les spécialistes du marketing pour effectuer des analyses d'opinion.

Pour plus de détails sur MapReduce et découvrir des cas d'usage semblables à ceux présentés ci-dessus, n'hésitez pas à télécharger la version d'évaluation gratuite de Talend Studio.

Prêt à faire vos premiers pas avec Talend ?