Traitement des données en parallèle et performance

Chapitre 7 : Traitement des données en parallèle et performance

Ce chapitre couvre

  • Traitement des données avec des flux parallèles
  • Analyse de performance des flux parallèles
  • Le framework fork/join
  • Division d’un flux de données à l’aide d’un Spliterator

Dans les trois derniers chapitres, vous avez vu comment la nouvelle interface Stream vous permet de manipuler des collections de données de manière déclarative. Nous avons également expliqué que le passage de l’itération externe à l’itération interne permet à la bibliothèque Java native de prendre le contrôle du traitement des éléments d’un flux. Cette approche évite aux développeurs Java d’implémenter explicitement les optimisations nécessaires pour accélérer le traitement des collections de données. De loin le bénéfice le plus important est la possibilité d’exécuter sur ces collections un pipeline d’opérations qui utilise automatiquement les architectures multicœurs des ordinateurs actuels.

Par exemple, avant Java 7, le traitement d’une collection de données en parallèle était extrêmement lourd. Tout d’abord, vous deviez diviser explicitement la structure de données contenant vos données en sous-parties. Deuxièmement, vous deviez affecter chacune de ces sous-parties à un Thread différent. Troisièmement, vous deviez les synchroniser pour éviter les effets liés à un thread trop rapide, attendre l’achèvement de tous les threads, et enfin combiner les résultats partiels. Java 7 a introduit un framework appelé fork/join pour effectuer ces opérations de manière plus cohérente et de manière moins sujette aux erreurs. Nous explorons ce cadre dans la section 7.2.

Dans ce chapitre, vous découvrirez comment l’interface Stream vous donne la possibilité d’exécuter des opérations en parallèle sur une collection de données sans trop d’efforts. Il vous permet de transformer un flux séquentiel en un flux parallèle. De plus, vous verrez comment Java peut produire cette magie ou, plus concrètement, comment les flux parallèles fonctionnent sous le capot en utilisant le framework fork/join introduit en Java 7. Vous découvrirez également qu’il est important de savoir comment les flux parallèles travaillent en interne, car si vous ignorez cet aspect, vous pourriez obtenir des résultats inattendus (et très probablement erronés) en les utilisant à mauvais escient.

En particulier, nous démontrerons que la façon dont un flux parallèle est divisé en segments, avant de traiter les différents blocs en parallèle, peut dans certains cas être à l’origine de ces résultats incorrects et apparemment inexplicables. Pour cette raison, vous apprendrez comment prendre le contrôle de ce processus de séparation en implémentant et en utilisant votre propre Spliterator.

7.1. Flux parallèles

Dans le chapitre 4, nous avons brièvement mentionné que l’interface Stream vous permet de traiter ses éléments en parallèle de manière très pratique : il est possible de transformer une collection en un flux parallèle en invoquant la méthode parallelStream sur la source de la collection. Un flux parallèle est un flux qui divise ses éléments en plusieurs segments, en traitant chaque segment avec un thread différent. Ainsi, vous pouvez partitionner automatiquement la charge de travail d’une opération donnée sur tous les cœurs de votre processeur multicœur et les garder tous occupés. Voyons un exemple simple.

Supposons que vous deviez écrire une méthode acceptant un nombre n comme argument et renvoyant la somme de tous les nombres de 1 à l’argument donné. Une approche simple (peut-être naïve) consiste à générer un flux infini de nombres, en le limitant au nombre passé, puis à réduire le flux résultant avec un BinaryOperator qui ne fait que sommer deux nombres, comme ceci :

 

Dans les termes Java plus traditionnels, ce code est équivalent à son équivalent itératif:

Cette opération semble être un bon candidat pour tirer parti de la parallélisation, en particulier pour les grandes valeurs de n. Mais par où commencer ? Est-ce que vous synchronisez sur la variable résultat ? Combien de threads utilisez-vous ? Qui fait la génération de nombres ? Qui les ajoute ?

Ne vous inquiteé pas pour tout cela. C’est un problème beaucoup plus simple à résoudre si vous adoptez des flux parallèles.

7.1.1. Transformer un flux séquentiel en un flux parallèle

Vous pouvez faire en sorte que l’ancien processus de réduction fonctionnelle (c’est-à-dire, sommation) se déroule en parallèle en transformant le flux en un processus parallèle. En appellant la méthode parallel() sur le flux séquentiel :

Dans le code précédent, le processus de réduction utilisé pour additionner tous les nombres dans le flux fonctionne d’une manière similaire à ce qui est décrit dans la section 5.4.1. La différence est que le flux est divisé en plusieurs parties. Il en résulte que l’opération de réduction peut fonctionner sur les différents segments indépendamment et en parallèle, comme le montre la figure 7.1. Enfin, la même opération de réduction combine les valeurs résultant des réductions partielles de chaque sous-flux, produisant ainsi le résultat du processus de réduction sur l’ensemble du flux initial.

Notez qu’en réalité, appeler la méthode parallel sur un flux séquentiel n’implique aucune transformation concrète sur le flux lui-même. En interne, un indicateur booléen est défini pour signaler que vous voulez exécuter en parallèle toutes les opérations qui suivent l’invocation à parallèle. De même, vous pouvez transformer un flux parallèle en un flux séquentiel en invoquant simplement la méthode sequential sur celui-ci. Notez que vous pourriez penser que vous avez un contrôle total sur les opérations que vous voulez effectuer en parallèle et celles qui se déroulent séquentiellement tout en traversant le flux en combinant ces deux méthodes. Par exemple, vous pourriez faire quelque chose comme :

Mais le dernier appel à parallel ou sequential gagne et affecte le pipeline globalement. Dans cet exemple, le pipeline sera exécuté en parallèle car c’est le dernier appel du pipeline.



Configuration du pool de threads utilisé par les flux parallèles

En regardant la méthode parallel du flux, vous pouvez vous demander d’où viennent les threads utilisés par le flux parallèle, combien il y en a, et comment vous pouvez personnaliser le processus.

Les flux parallèles utilisent en interne le ForkJoinPool par défaut (vous en apprendrez plus sur le framework fork/join dans la section 7.2), qui a par défaut autant de threads que de processeurs, renvoyés par Runtime.getRuntime().AvailableProcessors().

Mais vous pouvez changer la taille de ce pool en utilisant la propriété système java.util.concurrent.ForkJoinPool.common.parallelism, comme dans l’exemple suivant:

C’est un paramètre global, donc cela affectera tous les flux parallèles dans votre code. Inversement, il n’est actuellement pas possible de spécifier cette valeur pour un seul flux parallèle.



Revenant à l’exercice de sommation des nombres, nous avons dit que vous pouvez vous attendre à une amélioration significative des performances dans sa version parallèle en l’exécutant sur un processeur multicœur. Vous avez maintenant trois méthodes qui exécutent exactement la même opération de trois façons différentes (style itératif, réduction séquentielle et réduction parallèle), alors voyons quelle est la plus rapide.

7.1.2. Mesurer les performances du flux

Nous avons prétendu que la méthode de sommation parallélisée devrait fonctionner mieux que les méthodes séquentielles et itératives. Néanmoins, en génie logiciel, deviner n’est jamais une bonne idée. En particulier lorsque vous optimisez les performances, vous devez toujours suivre trois règles d’or : mesurer, mesurer, mesurer. À cette fin, vous pouvez développer une méthode très similaire au harnais de base que vous avez utilisé dans la section 6.6.2 pour comparer les performances du partitionnement des nombres premiers et non premiers, comme indiqué dans la figure suivante.

Ici, cette méthode prend comme arguments une fonction et un long. Elle applique la fonction 10 fois sur le long passé à la méthode, enregistre le temps pris par chaque exécution en millisecondes, et renvoie la durée la plus petite (l’exécution la plus rapide). En supposant que vous regroupiez toutes les méthodes que vous avez développées précédemment dans une classe nommée ParallelStreams, vous pouvez utiliser ce harnais pour vérifier combien de temps la fonction d’addition séquentielle prend pour additionner les 10 premiers millions de nombres naturels :

Notez que les résultats doivent être pris avec un grain de sel. De nombreux facteurs influenceront le temps d’exécution, comme le nombre de cœurs supportés par votre machine. Vous pouvez essayer ceci sur votre propre machine en exécutant le code disponible sur le dépôt du tuto. En l’exécutant sur un MacBook Pro Intel i7 2,3 GHz quad-core, il imprime les éléments suivants :

Vous devriez vous attendre à ce que la version itérative utilisant une boucle for traditionnelle s’exécute beaucoup plus rapidement car elle fonctionne à un niveau beaucoup plus bas et, plus important, n’a pas besoin d’effectuer une boxing ou unboxing des valeurs primitives. Si vous essayez de mesurer sa performance avec

vous obtiendrez

Faisons de même avec la version parallèle de cette fonction

et voici ce qui se passe:

C’est assez décevant : la version parallèle de la méthode de sommation est beaucoup plus lente que la version séquentielle. Comment pouvez-vous expliquer ce résultat inattendu ? Il y a en fait deux problèmes mélangés :

  • Iterate génère des objets wrappés, qui doivent être unboxed en nombres avant de pouvoir être ajoutés.
  • Iterate est difficile à diviser en morceaux indépendants à exécuter en parallèle.

Le second problème est particulièrement intéressant, car vous devez garder un modèle mental selon lequel certaines opérations de flux sont plus parallélisables que d’autres. En particulier, l’opération iterate est difficile à diviser en blocs pouvant être exécutés indépendamment car l’entrée d’une application de fonction dépend toujours du résultat de l’application précédente, comme illustré à la figure 7.2.

Cela signifie que dans ce cas précis, le processus de réduction ne se déroule pas comme décrit dans la figure 7.1: la liste complète des nombres n’est pas disponible au début du processus de réduction, rendant impossible la partition efficace du flux en segments à traiter en parallèle. En signalant le flux comme parallèle, vous ajoutez simplement au traitement séquentiel la tache supplémentaire d’allouer chaque opération de somme sur un thread différent.

Cela démontre à quel point la programmation parallèle peut être délicate et parfois contre-productive. En cas d’utilisation abusive (par exemple, utilisation d’une opération non parallèle, comme iterate), cela peut réellement aggraver les performances globales de vos programmes. Il est donc nécessaire de comprendre ce qui se passe en coulisses lorsque vous invoquez cette méthode parallel apparemment magique.

En utilisant des méthodes plus spécialisées

Alors, comment pouvez-vous tirer parti de vos processeurs multicœurs et utiliser le flux pour effectuer une somme parallèle de manière efficace ? Nous avons discuté d’une méthode appelée LongStream.rangeClosed au chapitre 5. Cette méthode a deux avantages par rapport à iterate :

  • LongStream.rangeClosed fonctionne directement sur les nombres longs primitifs, donc il n’y a pas de frais de boxe et de déballage.
  • LongStream.rangeClosed produit des plages de nombres, qui peuvent être facilement divisées en blocs indépendants. Par exemple, la plage 1-20 peut être divisée en 1-5, 6-10, 11-15 et 16-20.

Voyons d’abord comment il fonctionne sur un flux séquentiel pour voir si le surcoût associé à unboxing est pertinent :

Cette fois, la sortie est:

Le flux numérique est beaucoup plus rapide que la version séquentielle précédente, générée avec la méthode iterate, car le flux numérique évite tous les coûts liés aux opérations inutiles d’autoboxing et de boxing effectué par le flux non spécialisé. C’est une preuve que le choix des bonnes structures de données est souvent plus important que la parallélisation de l’algorithme qui les utilise. Mais que se passe-t-il si vous essayez d’utiliser un flux parallèle dans cette nouvelle version ?

Maintenant, en passant cette fonction à votre méthode de test:

vous obtenez:

Enfin, vous obtenez une réduction parallèle plus rapide que sa contrepartie séquentielle, car cette fois l’opération de réduction peut effectivement être exécutée comme le montre la figure 7.1. Cela démontre également que l’utilisation de la bonne structure de données et son fonctionnement en parallèle garantissent les meilleures performances.

Néanmoins, gardez à l’esprit que la parallélisation ne vient pas gratuitement. Le processus de parallélisation vous oblige lui-même à partitionner récursivement le flux, à affecter l’opération de réduction de chaque sous-flux à un thread différent, puis à combiner les résultats de ces opérations en une seule valeur. Mais le déplacement de données entre plusieurs cœurs est également plus onéreux que prévu, il est donc important que le travail à effectuer en parallèle sur un autre cœur prenne plus de temps que le temps nécessaire pour transférer les données d’un cœur à un autre. En général, il existe de nombreux cas où il n’est pas possible ou pratique d’utiliser la parallélisation. Mais avant d’utiliser un flux en parallèle pour rendre votre code plus rapide, vous devez vous assurer que vous l’utilisez correctement. Il est inutil de produire un résultat en moins de temps si le résultat est faux. Regardons un piège assez populaire.

7.1.3. Utilisation correcte des flux parallèles

La principale cause des erreurs générées par une mauvaise utilisation des flux parallèles est l’utilisation d’algorithmes qui mutent un état partagé. Voici une autre façon d’implémenter la somme des n premiers nombres naturels mais en faisant muter un accumulateur partagé :

Il est assez commun d’écrire ce genre de code, en particulier pour les développeurs qui sont familiers avec les paradigmes de programmation impérative. Ce code ressemble beaucoup à ce que vous avez l’habitude de faire quand vous itérez de façon impérative une liste de nombres : vous initialisez un accumulateur et parcourez les éléments de la liste un par un, en les ajoutant à l’accumulateur.

Quel est le problème avec ce code ? Malheureusement, c’est irrémédiablement codé de la mauvaise façon, parce que c’est fondamentalement séquentiel. Vous avez une course de données sur chaque accès à la varible total. Et si vous essayez de résoudre ce problème avec la synchronisation, vous perdrez tout le bénéfice de votre parallélisme. Pour comprendre cela, essayons de transformer le Stream en ParallelStream.

Essayez d’exécuter cette dernière méthode avec le harnais de la liste 7.1, en imprimant aussi le résultat de chaque exécution:

Vous pourriez obtenir quelque chose comme:

Cette fois la performance de votre méthode n’est pas importante : la seule chose pertinente est que chaque exécution renvoie un résultat différent, tous très éloigné de la valeur correcte de 50000005000000. Ceci est dû au fait que plusieurs threads accèdent simultanément à l’accumulateur et en particulier l’exécution de total+=valeur, qui, malgré son apparence, n’est pas une opération atomique. L’origine du problème est que la méthode invoquée dans le bloc forEach a pour effet secondaire de changer l’état mutable d’un objet partagé entre plusieurs threads. Il est obligatoire d’éviter ce genre de situations si vous voulez utiliser des flux parallèles sans encourir des surprises similaires.

Vous savez maintenant que l’état mutable partagé ne colle pas bien avec les flux parallèles et avec les calculs parallèles en général. Nous reviendrons sur cette idée d’éviter la mutation dans les chapitres 13 et 14 lorsque nous discuterons de la programmation fonctionnelle plus en détail. Pour l’instant, gardez à l’esprit qu’éviter l’état mutable partagé garantit que votre flux parallèle produira le bon résultat. Ensuite, nous allons examiner quelques conseils pratiques que vous pouvez utiliser pour déterminer quand il est approprié d’utiliser des flux parallèles pour gagner en performance.

 

7.1.4. Utilisation efficace des flux parallèles

En général, il est impossible (et inutile) d’essayer de donner un indice quantitatif sur l’utilisation d’un flux parallèle car toute suggestion comme « utiliser un flux parallèle uniquement si vous avez au moins un millier (ou un million ou tout ce que vous voulez) « Pourrait être correct pour une opération spécifique s’exécutant sur une machine spécifique, mais être complètement faux dans un contexte différent. Néanmoins, il est au moins possible de fournir des conseils qualitatifs qui pourraient être utiles pour décider s’il est logique d’utiliser un flux parallèle dans une situation donnée :

  • En cas de doute, mesurez. Transformer un flux séquentiel en un flux parallèle est trivial, mais pas toujours la bonne chose à faire. Comme nous l’avons déjà démontré dans cette section, un flux parallèle n’est pas toujours plus rapide que la version séquentielle correspondante. De plus, les flux parallèles peuvent parfois fonctionner de manière contre-productive, de sorte que la première et la plus importante des suggestions pour choisir entre les flux séquentiels et parallèles est de toujours vérifier leurs performances avec un benchmark approprié.
  • Méfiez-vous du boxing. Les opérations automatiques de boxing et unboxing peuvent considérablement nuire aux performances. Java 8 inclut des flux de primitives (IntStream, LongStream et DoubleStream) pour éviter de telles opérations, utilisez-les si possible.
  • Certaines opérations se comportent naturellement moins bien sur un flux parallèle que sur un flux séquentiel. En particulier, les opérations telles que limit et findFirst qui reposent sur l’ordre des éléments sont coûteuses dans un flux parallèle. Par exemple, findAny fonctionnera mieux que findFirst car il n’est pas contraint de maintenir l’ordre rencontré. Vous pouvez toujours transformer un flux ordonné en un flux non ordonné en invoquant la méthode unordored. Par exemple, si vous avez besoin de N éléments de votre flux et que vous n’êtes pas nécessairement intéressé par les N premiers, l’appel de la méthode limit() sur un flux parallèle non ordonné peut s’exécuter plus efficacement que sur un flux avec un ordre définit (par exemple la source est une liste).
  • Considérez le coût de calcul total du pipeline d’opérations effectué par le flux. Avec N étant le nombre d’éléments à traiter et Q le coût approximatif de traitement de l’un de ces éléments à travers le pipeline de flux, le produit de N * Q donne une estimation qualitative approximative de ce coût. Une valeur plus élevée pour Q implique une meilleure chance d’avoir de bonnes performances lors de l’utilisation d’un flux parallèle.
  • Pour une petite quantité de données, choisir un flux parallèle n’est presque jamais une décision gagnante. Les avantages de traiter en parallèle seulement quelques éléments ne suffisent pas à compenser le coût supplémentaire introduit par le processus de parallélisation.
  • Tenez compte de la qualité de la décomposition de la structure de données sous-jacente au flux. Par exemple, une ArrayList peut être divisée beaucoup plus efficacement qu’une LinkedList, parce que la première peut être divisée de façon égale sans la traverser, contrairement à la seconde. En outre, les flux primitifs créés avec la méthode rangeClosed peuvent être décomposés rapidement. Enfin, comme vous l’apprendrez dans la section 7.3, vous pouvez obtenir le contrôle total de ce processus de décomposition en mettant en œuvre votre propre Spliterator.
  • Les caractéristiques d’un flux et la manière dont les opérations intermédiaires à travers le pipeline les modifient peuvent modifier les performances du processus de décomposition. Par exemple, un flux dimensionné peut être divisé en deux parties égales, puis chaque partie peut être traitée en parallèle plus efficacement, mais une opération de filtrage peut jeter un nombre imprévisible d’éléments, rendant la taille du flux elle-même inconnue. Du coup s’il est nécessaire de connaître la taille de la liste à la fin. Cela peut couter cher en termes de performance.
  • Déterminez si une opération terminale a une étape de fusion bon marché ou coûteuse (par exemple, la méthode combiner dans un collecteur). Si cela est coûteux, le coût engendré par la combinaison des résultats partiels générés par chaque sous-flux peut dépasser les avantages de performance d’un flux parallèle.

Le tableau 7.1 donne un résumé de la convivialité de certaines sources de flux paralleles en termes de décomposabilité.

Enfin, nous devons souligner que l’infrastructure utilisée en coulisses par les flux parallèles pour exécuter des opérations en parallèle est le framework fork/join introduit en Java 7. L’exemple de sommation parallèle a prouvé qu’il est essentiel de bien comprendre le fonctionnement interne des flux parallèles afin de les utiliser correctement, nous allons donc étudier en détail le framework fork/join dans la section suivante.

7.2. Le framework fork/join

Le framework fork/join a été conçu pour scinder récursivement une tâche parallélisable en tâches plus petites, puis combiner les résultats de chaque sous-tâche pour produire le résultat global. C’est une implémentation de l’interface ExecutorService, qui distribue ces sous-tâches aux threads disponibles dans un pool de threads, appelé ForkJoinPool. Commençons par explorer comment définir une tâche et des sous-tâches.

7.2.1. Travailler avec RecursiveTask

Pour soumettre des tâches à ce pool, vous devez créer une sous-classe de RecursiveTask <R>, où R est le type du résultat produit par la tâche parallélisée (et chacune de ses sous-tâches) ou de RecursiveAction si la tâche ne renvoie aucun résultat(pourrait mettre à jour d’autres structures non locales). Pour définir RecursiveTasks, vous devez seulement implémenter sa méthode abstraite unique,

Cette méthode définit à la fois la logique de division de la tâche en sous-tâches et l’algorithme pour produire le résultat d’une seule sous-tâche lorsqu’il n’est plus possible ou pratique de la diviser davantage. Pour cette raison, une implémentation de cette méthode ressemble souvent au pseudocode suivant :

En général, il n’y a pas de critères précis pour décider si une tâche donnée doit être divisée ou non, mais vous pouvez suivre différentes heuristiques pour vous aider à prendre cette décision. Nous les clarifions plus en détail à la section 7.2.1. Le processus de division des tâches récursif est synthétisé visuellement par la figure 7.3.

Comme vous l’avez peut-être remarqué, ce n’est rien de plus que la version parallèle de l’algorithme divide-and-conquer bien connu. Pour illustrer un exemple pratique d’utilisation de la structure fork/join en utilisant nos exemples précédents, essayons de calculer la somme d’une plage de nombres (représentée ici par un tableau de nombres long []) en utilisant ce framework. Comme expliqué, vous devez d’abord fournir une implémentation pour la classe RecursiveTask, comme indiqué par ForkJoinSumCalculator dans la figure suivante.

 

L’écriture d’une méthode effectuant une somme parallèle des n premiers nombres naturels est maintenant assez simple. Vous avez juste besoin de passer le tableau de nombres désiré au constructeur de ForkJoinSumCalculator :

Ici, vous générez un tableau contenant les n premiers nombres naturels en utilisant une LongStream. Ensuite, vous créez un ForkJoinTask (la superclasse de RecursiveTask), en passant ce tableau au constructeur public de ForkJoinSumCalculator montré dans la figure 7.2. Enfin, vous créez un nouveau ForkJoinPool et transmettez cette tâche à sa méthode invoke. La valeur renvoyée par cette dernière méthode est le résultat de la tâche définie par la classe Fork-JoinSumCalculator lorsqu’elle est exécutée dans le ForkJoinPool.

Notez que dans une application du monde réel, cela n’a pas de sens d’utiliser plus d’un ForkJoinPool. Pour cette raison, ce que vous devriez faire est de ne l’instancier qu’une seule fois et de garder cette instance dans un champ statique, ce qui en fait un singleton, donc il pourrait être facilement réutilisé par n’importe quelle partie de votre logiciel. Ici, pour le créer, vous utilisez son constructeur par défaut sans argument, ce qui signifie que vous voulez autoriser le pool à utiliser tous les processeurs disponibles pour la JVM. Plus précisément, ce constructeur utilisera la valeur renvoyée par Runtime.availableProcessors pour déterminer le nombre de threads utilisés par le pool. Notez que la méthode availableProcessors, malgré son nom, renvoie en réalité le nombre de cœurs disponibles, y compris les cœurs virtuels dus à l’hyperthreading.

Exécution du ForkJoinSumCalculator

Lorsque vous transmettez la tâche ForkJoinSumCalculator au ForkJoinPool, cette tâche est exécutée par un thread du pool qui à son tour appelle la méthode de calcul de la tâche. Cette méthode vérifie si la tâche est suffisamment petite pour être exécutée de manière séquentielle. Sinon, il divise le tableau de nombres à sommer en deux moitiés et les affecte à deux nouveaux ForkJoinSumCalculators qui sont programmés pour être exécutés par le ForkJoinPool. Par conséquent, ce processus peut être répété de manière récursive, ce qui permet de diviser la tâche d’origine en tâches plus petites, jusqu’à ce que la condition utilisée pour vérifier s’il n’est plus possible de la diviser soit vérifiée (dans ce cas, le nombre d’éléments à additionner est inférieur ou égal à 10 000). À ce stade, le résultat de chaque sous-tâche est calculé séquentiellement, et l’arbre binaire (implicite) des tâches créées par le processus de forking est renvoyé vers sa racine. Le résultat de la tâche est ensuite calculé, en combinant les résultats partiels de chaque sous-tâche. Ce processus est illustré à la figure 7.4.

Une fois encore, vous pouvez vérifier explicitement les performances de la méthode de sommation en utilisant le framework fork/join avec le harnais développé au début de ce chapitre :

Dans ce cas, il produit la sortie suivante:

Ici, la performance est pire que la version utilisant le flux parallèle, mais seulement parce que vous êtes obligé de mettre tout le flux de nombres dans un long[] avant de pouvoir l’utiliser dans la tâche ForkJoinSumCalculator.

7.2.2. Meilleures pratiques pour l’utilisation du framework fork/join

Même si le framework fork/join est relativement facile à utiliser, il est malheureusement facile d’en abuser. Voici quelques bonnes pratiques pour l’utiliser efficacement :

  • L’appel de la méthode join sur une tâche bloque l’appelant jusqu’à ce que le résultat produit par cette tâche soit prêt. Pour cette raison, il est nécessaire de l’appeler après le début du calcul des deux sous-tâches. Sinon, vous obtiendrez une version plus lente et plus complexe de votre algorithme séquentiel d’origine car chaque sous-tâche devra attendre que l’autre soit terminée avant de commencer.
  • La méthode invoke d’un ForkJoinPool ne doit pas être utilisée depuis une RecursiveTask. Au lieu de cela, vous devez toujours appeler les méthodes compute ou fork directement ; seul le code séquentiel devrait utiliser invoke pour commencer le calcul parallèle.
  • L’appel de la méthode fork sur une sous-tâche est le moyen de la planifier sur le ForkJoinPool. Il peut sembler naturel de l’invoquer à la fois sur les sous-tâches gauche et droite, mais cela est moins efficace que d’appeler directement compute sur l’une d’entre elles. Cela vous permet de réutiliser le même thread pour l’une des deux sous-tâches et d’éviter le surcoût dû à l’allocation inutile d’une autre tâche sur le pool.
  • Déboguer un calcul parallèle en utilisant le framework fork/join peut être délicat. En particulier, il est généralement courant de parcourir une stack trace dans votre IDE pour découvrir la cause d’un problème, mais cela ne fonctionne pas avec un traitement fait avec fork/join car l’appel à compute se produit dans un thread différent de l’appelant, qui est le code qui a appelé la méthode fork.
  • Comme vous l’avez découvert avec des flux parallèles, vous ne devez jamais tenir pour acquis qu’un calcul utilisant le framework fork/join sur un processeur multicœur est plus rapide que le logiciel séquentiel. Nous avons déjà dit qu’une tâche devrait être décomposable en plusieurs sous-tâches indépendantes afin d’être parallélisable avec un gain de performance pertinent. Toutes ces sous-tâches devraient prendre plus de temps à s’exécuter que de forker une nouvelle tâche ; un idiome consiste à placer les E / S dans une sous-tâche et le calcul dans une autre, ce qui permet de superposer les calculs avec les E / S. De plus, vous devriez considérer d’autres choses lorsque vous comparez les performances des versions séquentielle et parallèle du même algorithme. Comme tout autre code Java, le framework fork/join doit être « réchauffé, warm up », ou exécuté, plusieurs fois avant d’être optimisé par le compilateur JIT. C’est pourquoi il est toujours important d’exécuter le programme plusieurs fois avant de mesurer sa performance, comme nous l’avons fait dans notre harnais. Sachez également que les optimisations intégrées au compilateur peuvent injustement donner un avantage à la version séquentielle (par exemple, en effectuant une analyse de code mort – en supprimant un calcul qui n’est jamais utilisé).

La stratégie de division fork/join mérite une dernière remarque : vous devez choisir les critères utilisés pour décider si une sous-tâche donnée doit être divisée ou est suffisamment petite pour être exécutée séquentiellement. Nous donnons quelques conseils à ce sujet dans la section suivante.

7.2.3. Work stealing

Dans notre exemple ForkJoinSumCalculator, nous avons décidé d’arrêter de créer plus de sous-tâches lorsque le tableau de nombres à sommer contenait au plus 10 000 éléments. C’est un choix arbitraire, mais dans la plupart des cas, il est difficile de trouver une bonne heuristique, sinon d’essayer de l’optimiser en faisant plusieurs tentatives avec des entrées différentes. Dans notre cas de test, nous avons démarré avec un tableau de 10 millions d’éléments, ce qui signifie que ForkJoinSumCalculator va débiter au moins 1 000 sous-tâches. Cela peut sembler une perte de ressources car nous l’avons fait fonctionner sur une machine qui n’a que quatre cœurs. Dans ce cas précis, c’est probablement vrai, car toutes les tâches sont liées au processeur et devraient prendre le même temps.

Mais forker un assez grand nombre de tâches est en général un choix gagnant. En effet, idéalement, vous souhaitez partitionner la charge de travail d’une tâche parallélisée de telle sorte que chaque sous-tâche prenne exactement le même laps de temps, en conservant tous les cœurs de votre CPU également occupés. Malheureusement, surtout dans les cas plus proches des scénarios réels que l’exemple simple présenté ici, le temps passé par chaque sous-tâche peut varier considérablement en raison de l’utilisation d’une stratégie de partition inefficace ou de causes imprévisibles comme un accès lent au disque ou le besoin de coordonner l’exécution avec des services externes.

Le framework fork/join contourne ce problème avec une technique appelée work stealing. En pratique, cela signifie que les tâches sont plus ou moins réparties sur tous les threads du ForkJoinPool. Chacun de ces threads contient une file d’attente doublement liée des tâches qui lui sont assignées, et dès qu’il termine une tâche, il en extrait une autre de la tête de la file et commence à l’exécuter. Pour les raisons que nous avons énumérées précédemment, un thread peut accomplir toutes les tâches qui lui sont assignées beaucoup plus rapidement que les autres, ce qui signifie que sa file d’attente deviendra vide alors que les autres threads sont encore assez occupés. Dans ce cas, au lieu de devenir inactif, le thread choisit aléatoirement une file d’attente d’un thread différent et « vole » une tâche, en la retirant de la queue de la file d’attente. Ce processus continue jusqu’à ce que toutes les tâches soient exécutées, puis toutes les files d’attente deviennent vides. C’est pourquoi avoir beaucoup de tâches plus petites, au lieu de seulement quelques plus grandes, peut aider à mieux équilibrer la charge de travail entre les threads de travail.

Plus généralement, cet algorithme de vol de travail est utilisé pour redistribuer et équilibrer les tâches entre les threads présents dans le pool. La Figure 7.5 montre comment ce processus se produit. Lorsqu’une tâche dans la file d’attente d’un thread est divisée en deux sous-tâches, l’une des deux sous-tâches est volée par un autre thread inactif. Comme décrit précédemment, ce processus peut continuer récursivement jusqu’à ce que la condition utilisée pour définir qu’une sous-tâche donnée doit être exécutée de manière séquentielle devient vraie.

La façon dont un flux peut utiliser le framework fork/join pour traiter ses éléments en parallèle devrait maintenant être clair, mais il manque encore un ingrédient. Dans cette section, nous avons analysé un exemple où vous avez explicitement développé la logique pour diviser un tableau de nombres en plusieurs tâches. Néanmoins, vous n’avez pas eu à faire quelque chose de similaire lorsque vous avez utilisé les flux parallèles au début de ce chapitre, et cela signifie qu’il doit y avoir un mécanisme automatique qui scinde le flux pour vous. Ce nouveau mécanisme automatique s’appelle Spliterator, et nous l’explorons dans la section suivante.

7.3. Spliterator

Le Spliterator est une autre nouvelle interface ajoutée à Java 8. Son nom signifie « itérateur splitable » . Comme les itérateurs, les Spliterators sont utilisés pour parcourir les éléments d’une source, mais ils sont également conçus pour le faire en parallèle. Bien que vous n’ayez pas à développer votre propre Spliterator en pratique, comprendre comment le faire vous permettra de mieux comprendre le fonctionnement des flux parallèles. Java 8 fournit déjà une implémentation de Spliterator par défaut pour toutes les structures de données incluses dans son Framework Collections. Collections implémente maintenant l’interface Spliterator, qui fournit une méthode spliterator. Cette interface définit plusieurs méthodes, comme indiqué dans la liste suivante.

Comme d’habitude, T est le type des éléments traversés par le Spliterator. La méthode tryAdvance se comporte de la même manière qu’un Iterator normal dans ce sens qu’elle est aussi utilisée pour consommer séquentiellement les éléments du Spliterator, retournant true s’il y a encore d’autres éléments à parcourir. Mais la méthode trySplit est plus spécifique à l’interface Spliterator car elle est utilisée pour partitionner certains de ses éléments en un second Spliterator (celui retourné par la méthode), permettant de les traiter en parallèle. Un Spliterator peut également fournir une estimation du nombre d’éléments restant à parcourir via sa méthode estimateSize, car même une valeur inexacte mais rapide à calculer peut être utile pour diviser la structure plus ou moins uniformément.

Il est important de comprendre comment ce processus de division est effectué en interne afin de prendre le contrôle de celui-ci lorsque cela est nécessaire. Par conséquent, nous l’analysons plus en détail dans la section suivante.

7.3.1. Le processus de fractionnement

L’algorithme qui divise un flux en plusieurs parties est un processus récursif et se déroule comme indiqué dans la figure 7.6. Dans la première étape, trySplit est invoqué sur le premier Spliterator et génère un second. Ensuite, à l’étape 2, il est appellé à nouveau sur ces deux Spliterators, ce qui donne un total de quatre. Le framework continue à appeler la méthode trySplit sur un Spliterator jusqu’à ce qu’il renvoie null pour signaler que la structure de données qu’il traite n’est plus divisible, comme indiqué à l’étape 3. Enfin, ce processus de scission récursif se termine à l’étape 4 lorsque tous les Spliterators retournent null à une invocation trySplit.

Ce processus de division peut également être influencé par les caractéristiques du Spliterator lui-même, qui sont déclarées via la méthode characteristics.

Les caractéristiques du Spliterator

La dernière méthode abstraite déclarée par l’interface Spliterator est characteristics, qui retourne un int encodant l’ensemble des caractéristiques du Spliterator lui-même. Les clients Spliterator peuvent utiliser ces caractéristiques pour mieux contrôler et optimiser son utilisation. Le tableau 7.2 les résume. (Malheureusement, bien que ces concepts chevauchent conceptuellement les caractéristiques d’un collecteur, ils sont codés différemment.)

 

Maintenant que vous avez vu l’interface de Spliterator et les méthodes qu’elle définit, vous pouvez essayer de développer votre propre implémentation d’un Spliterator.

7.3.2. Implémenter votre propre Spliterator

Regardons un exemple pratique dont vous pourriez avoir besoin pour implémenter votre propre Spliterator. Nous allons développer une méthode simple qui compte le nombre de mots dans une chaîne. Une version itérative de cette méthode pourrait être écrite comme indiqué dans la figure suivante.

Essayons cette méthode sur la première phrase de Dante’s Inferno:

Notez que nous avons ajouté quelques espaces aléatoires supplémentaires dans la phrase pour démontrer que l’implémentation itérative fonctionne correctement même en présence de plusieurs espaces entre deux mots. Comme prévu, ce code affiche les éléments suivants :

Idéalement, vous souhaiterez obtenir le même résultat dans un style plus fonctionnel car vous pourriez, comme indiqué précédemment, paralléliser ce processus en utilisant un flux parallèle sans devoir gérer explicitement les threads et leur synchronisation.

Réécriture du WordCounter dans un style fonctionnel

D’abord, vous devez convertir la chaîne en un flux. Malheureusement, il y a des flux primitifs uniquement pour int, long et double, donc vous devrez utiliser un Stream<Character> :

Vous pouvez calculer le nombre de mots en effectuant une réduction sur ce flux. Tout en réduisant le flux, vous devrez conserver un état composé de deux variables : un int représentant le nombre de mots trouvés jusqu’à présent et un booléen à retenir si le dernier caractère rencontré était un espace ou non. Parce que Java n’a pas de tuples (une construction pour représenter une liste ordonnée d’éléments hétérogènes sans avoir besoin d’un objet wrapper), vous devrez créer une nouvelle classe, WordCounter, qui encapsulera cet état comme indiqué dans la figure suivante.

Dans cette figure, la méthode accumulate définit comment changer l’état du WordCounter, ou plus précisément avec quel état créer un nouveau WordCounter car c’est une classe immuable. La méthode accumulate est appelée chaque fois qu’un nouveau caractère du flux est traversé. En particulier, comme vous l’avez fait dans la méthode countWordsIteratively dans la figure 7.4, le compteur est incrémenté quand un nouvel « non-espace » est rencontré et que le dernier caractère rencontré est un espace. La figure 7.7 montre les transitions d’état du WordCounter lorsqu’un nouveau caractère est traversé par la méthode accumulate.

La seconde méthode, combine, est invoquée pour agréger les résultats partiels de deux WordCounters fonctionnant sur deux sous-parties différentes du flux de caractères, de sorte qu’il combine deux WordCounters en additionnant leurs compteurs internes.

Maintenant que vous avez codé la logique de la façon d’accumuler des caractères sur un WordCounter et comment les combiner dans le WordCounter lui-même, écrire une méthode qui réduira le flux de caractères est plutôt simple :

Vous pouvez maintenant essayer cette méthode avec le flux créé à partir de la chaîne contenant la première phrase de Dante’s Inferno:

Vous pouvez vérifier que sa sortie correspond à celle générée par la version itérative:

Jusqu’à présent, tout va bien, mais nous avons dit que l’une des principales raisons de la mise en œuvre du WordCounter en termes fonctionnels était de pouvoir facilement paralléliser cette opération, alors voyons comment s’y prendre.

Faire fonctionner le WordCounter en parallèle

Vous pouvez essayer d’accélérer l’opération de comptage de mots en utilisant un flux parallèle, comme ceci :

Malheureusement, cette fois, la sortie est

Évidemment, quelque chose a mal tourné, mais quoi ? Le problème n’est pas difficile à découvrir. Parce que la chaîne originale est divisée à des positions arbitraires, parfois un mot est divisé en deux, puis compté deux fois. En général, cela démontre que passer d’un flux séquentiel à un flux parallèle peut conduire à un mauvais résultat si ce résultat peut être affecté par la position où le flux est divisé.

Comment pouvez-vous résoudre ce problème ? La solution consiste à s’assurer que la chaîne n’est pas scindée à une position aléatoire mais seulement à la fin d’un mot. Pour ce faire, vous devez implémenter un Spliterator de Character qui sépare un String uniquement entre deux mots, comme indiqué dans la figure suivante, puis crée le flux parallèle à partir de celui-ci.

Ce Spliterator est créé à partir de la chaîne à analyser et itère sur ses caractères en maintenant l’index de celui qui est actuellement traversé. Revoyons rapidement les méthodes du WordCounterSpliterator implémentant l’interface Spliterator :

  • La méthode tryAdvance alimente le consumer avec le caractère dans la chaîne à la position d’index actuelle et incrémente cette position. Le Consumer passé en argument est une classe Java interne transmettant le Caractère consommé à l’ensemble des fonctions qui doivent lui être appliquées lors de la traversée du flux, qui dans ce cas est seulement une fonction réductrice, à savoir la méthode accumulate de la classe WordCounter. La méthode tryAdvance renvoie true si la nouvelle position du curseur est inférieure à la longueur totale de la chaîne et si d’autres caractères doivent être itérés.
  • La méthode trySplit est la plus importante dans un Spliterator car c’est celle qui définit la logique utilisée pour diviser la structure de données à itérer. Comme vous l’avez fait dans la méthode de calcul de la RecursiveTask implémentée dans la liste 7.1 (sur comment utiliser le framework fork/join), la première chose que vous devez faire ici est de définir une limite sous laquelle vous ne voulez pas effectuer de splits supplémentaires. Ici, vous utilisez une limite très basse de 10 caractères seulement pour vous assurer que votre programme effectuera tout de même quelques divisions avec la chaîne relativement courte que vous êtes en train d’analyser, mais dans les applications réelles, vous devrez utiliser une limite plus élevée, comme vous avez fait dans l’exemple fork/join, pour éviter de créer trop de tâches. Si le nombre de caractères restants à parcourir est inférieur à cette limite, vous renvoyez null pour indiquer qu’aucune autre division n’est nécessaire. Inversement, si vous devez effectuer une scission, vous définissez la position de scission du candidat sur la moitié du tronçon de String restant à analyser. Mais vous n’utilisez pas cette position de partage (l’index de split) directement parce que vous voulez éviter de diviser au milieu d’un mot, ainsi vous avancez jusqu’à ce que vous trouviez un caractère vide. Une fois que vous avez trouvé une position de séparation opportune, vous créez un Spliterator qui traversera le tronçon de la sous-chaîne allant de la position actuelle à l’index de division ; vous définissez la position actuelle de celle-ci à la division, car la partie avant elle sera gérée par le nouveau Spliterator, puis vous le retournez.
  • La taille estimée des éléments restant à parcourir est la différence entre la longueur totale de la chaîne analysée par ce Spliterator et la position actuellement itérée.
  • Enfin, la méthode characteristic signale au framework que ce Spliterator est ORDERED (l’ordre est juste la séquence des Caractères dans la String), SIZED (la valeur retournée par la méthode estimatedSize est exacte), SUBSIZED (les autres Spliterators créés par la méthode trySplit ont aussi une taille exacte), NONNULL (il ne peut y avoir aucun caractère nul dans la chaîne), et IMMUTABLE (aucun autre caractère ne peut être ajouté pendant l’analyse de la chaîne car la chaîne elle-même est une classe immuable).

Mettre le WordCounterSpliterator en pratique

Vous pouvez maintenant utiliser un flux parallèle avec ce nouveau WordCounterSpliterator comme ceci:

Le deuxième argument booléen transmis à la méthode d’usine StreamSupport.stream signifie que vous souhaitez créer un flux parallèle. Le passage de ce flux parallèle à la méthode countWords :

produit la sortie correcte, comme prévu:

Vous avez vu comment un Spliterator peut vous permettre de prendre le contrôle de la stratégie utilisée pour diviser une structure de données. Une dernière caractéristique notable de Spliterators est la possibilité de lier la source des éléments à traverser au premier point de la traversée, au premier split, ou à la première requête d’estimation de taille, plutôt qu’au moment de sa création. Lorsque cela se produit, on parle d’un Spliterator de liaison tardive (late-binding Spliterator). Nous avons dédié l’annexe C pour montrer comment vous pouvez développer une classe d’utilitaires capable d’effectuer plusieurs opérations sur le même flux en parallèle en utilisant cette fonctionnalité.

7.4. Résumé

Dans ce chapitre, vous avez appris ce qui suit :

  • L’itération interne vous permet de traiter un flux en parallèle sans avoir besoin d’utiliser explicitement et de coordonner différents threads dans votre code.
  • Même si le traitement d’un flux en parallèle est si facile, rien ne garantit que cela accélèrera vos programmes en toutes circonstances. Le comportement et la performance des programmes parallèles peuvent parfois être contre-productifs, et pour cette raison, il est toujours nécessaire de les mesurer et de s’assurer que vous ne ralentissez pas plutôt vos programmes.
  • L’exécution parallèle d’une opération sur un ensemble de données, comme le fait un flux parallèle, peut fournir une amélioration des performances, en particulier lorsque le nombre d’éléments à traiter est énorme ou que le traitement de chaque élément est particulièrement long.
  • Du point de vue des performances, l’utilisation de la bonne structure de données, par exemple, en utilisant des flux primitifs au lieu des flux non spécialisés autant que possible, est presque toujours plus important que d’essayer de paralléliser certaines opérations.
  • La structure fork/join vous permet de fractionner récursivement une tâche parallélisable en tâches plus petites, de les exécuter sur des threads différents, puis de combiner les résultats de chaque sous-tâche afin de produire le résultat global.
  • Les Spliterators définissent comment un flux parallèle peut diviser les données qu’il traverse.