Annexe C. Exécution de plusieurs opérations en parallèle sur un flux

Exécution de plusieurs opérations en parallèle sur un flux

L’une des plus grandes limitations d’un flux Java 8 est que vous ne pouvez l’utiliser qu’une seule fois et en obtenir qu’un seul résultat. En effet, si vous essayez de traverser un flux une seconde fois, la seule chose que vous aurez est une exception comme celle-ci: java.lang.IllegalStateException: stream has already been operated upon or closed

Malgré cela, il y a des situations où vous aimeriez obtenir plusieurs résultats lors du traitement d’un seul flux. Par exemple, vous pouvez vouloir analyser un fichier de log dans un flux, comme nous l’avons fait dans la section 5.7.3, mais rassembler plusieurs statistiques en une seule étape. Ou alors, en conservant le modèle de données de menu utilisé pour expliquer les fonctions de Stream dans les chapitres 4-6, vous pouvez vouloir récupérer différentes informations tout en traversant le flux de plats.

En d’autres termes, vous souhaitez faire passer un flux à travers plus d’un lambda sur un seul passage, et pour ce faire, vous avez besoin d’un type de méthode fork et d’appliquer différentes fonctions à chaque flux forké. Encore mieux, ce serait bien si vous pouviez effectuer ces opérations en parallèle, en utilisant différents threads pour calculer les différents résultats requis.

Malheureusement, ces fonctionnalités ne sont actuellement pas disponibles sur l’implémentation de flux fournie dans Java 8, mais dans cette annexe, nous allons vous montrer comment utiliser un Spliterator et en particulier sa capacité de liaison tardive, avec BlockingQueues et Futures, pour implémenter cette fonctionnalité utile et la rendre disponible avec une API pratique

C.1. Forker une Stream

La première chose nécessaire pour exécuter plusieurs opérations en parallèle sur un flux consiste à créer un StreamForker qui enveloppe le flux d’origine, sur lequel vous pouvez définir les différentes opérations que vous souhaitez effectuer. Jetez un oeil à la figure suivante.

Figure C.1. Définition d’un StreamForker pour exécuter plusieurs opérations sur un flux

Ici, la méthode fork accepte deux arguments:

  • Une fonction, qui transforme le flux en un résultat de tout type représentant l’une de ces opérations
  • Une clé, qui vous permettra de récupérer le résultat de cette opération et accumule ces paires clé / fonction dans une Map interne

La méthode fork renvoie le StreamForker lui-même; par conséquent, vous pouvez construire un pipeline en forkant plusieurs opérations.

La Figure C.1 montre les principales idées derrière le StreamForker.

Ici, l’utilisateur définit trois opérations à effectuer sur un flux indexé par trois clés. Le StreamForker traverse ensuite le flux d’origine et les fork en trois autres flux. À ce stade, les trois opérations peuvent être appliquées en parallèle sur les flux bifurqués, et les résultats de ces applications de fonction, indexées avec leurs clés correspondantes, sont utilisés pour remplir la Map résultante.

L’exécution de toutes les opérations ajoutées via la méthode fork est déclenchée par l’appel de la méthode getResults, qui renvoie une implémentation de l’interface Results définie comme suit:

Cette interface a une seule méthode à laquelle vous pouvez passer l’une des clés utilisézs dans l’une des méthodes fork, et cette méthode renvoie le résultat de l’opération correspondant à cette clé.

C.1.1. Implémentation de l’interface Résultats avec ForkingStreamConsumer

La méthode getResults peut être implémentée comme suit:

ForkingStreamConsumer implémente à la fois l’interface Results définie précédemment et l’interface Consumer. Comme vous le verrez lorsque nous analysons son implémentation plus en détail, sa tâche principale est de consommer tous les éléments du flux et de les multiplexer vers un nombre de BlockingQueues égal au nombre d’opérations soumises via la méthode fork. Notez qu’il est garanti que le flux est séquentiel, car si la méthode forEach était exécutée sur un flux parallèle, ses éléments pourraient être déplacés vers les files d’attente dans le désordre. La méthode finish ajoute des éléments spéciaux à ces files d’attente pour indiquer qu’il n’y a plus d’éléments à traiter. La méthode de construction utilisée pour créer ForkingStreamConsumer est affichée dans la figure suivante.

Figure C.2. La méthode de construction utilisée pour créer ForkingStreamConsumer

Dans la figure C.2, vous créez d’abord la liste des BlockingQueues mentionnée précédemment. Ensuite, vous créez une Map, ayant comme clés les mêmes clés utilisées pour identifier les différentes opérations à exécuter sur le flux, et ayant comme valeurs les futurs qui contiendront les résultats correspondants de ces opérations. La liste des BlockingQueues et la Map de Futures sont ensuite passées au constructeur du ForkingStreamConsumer. Chaque futur est créé avec cette méthode getOperationResult, comme indiqué dans la liste suivante.

Listing C.3. Futures créés avec la méthode getOperationResult

La méthode getOperationResult crée une nouvelle BlockingQueue et l’ajoute à la liste des files d’attente. Cette file d’attente est transmise à un nouveau BlockingQueueSpliterator, qui est un Spliterator de liaison tardive, lisant l’élément à traverser de la file d’attente; nous allons examiner comment il est fait sous peu.

Vous créez ensuite un flux séquentiel traversant ce Spliterator, et enfin vous créez un Future pour calculer le résultat de l’application de la fonction représentant l’une des opérations que vous souhaitez effectuer sur ce flux. Cet avenir est créé à l’aide d’une méthode d’usine statique de la classe CompletableFuture qui implémente l’interface Future. C’est une autre nouvelle classe introduite dans Java 8, et nous l’avons étudiée en détail au chapitre 11.

C.1.2. Développer le ForkingStreamConsumer et le BlockingQueueSpliterator

Les deux dernières parties que vous devez développer sont les classes ForkingStreamConsumer et BlockingQueueSpliterator que nous avons introduites précédemment. Le premier peut être implémenté comme suit.

Figure C.4. Un ForkingStreamConsumer pour ajouter des éléments de flux à plusieurs files d’attente

Cette classe implémente les interfaces Consumer et Results et contient une référence à List of BlockingQueues et à une Map de Futures exécutant les différentes opérations sur le flux.

L’interface Consumer nécessite une implémentation pour la méthode accept. Ici, chaque fois que ForkingStreamConsumer accepte un élément du flux, il ajoute cet élément à tous les BlockingQueues. En outre, une fois que tous les éléments du flux d’origine ont été ajoutés à toutes les files d’attente, la méthode finish entraîne l’ajout d’un dernier élément à toutes les files d’attente. Cet élément, lorsqu’il est rencontré par BlockingQueueSpliterators, fera comprendre aux queues qu’il n’y a plus d’éléments à traiter.

L’interface Results nécessite une implémentation pour la méthode get. Ici, elle récupère la Futur qui est indexé dans la Map avec la clé d’argument et déballe son résultat ou attend jusqu’à ce qu’un résultat soit disponible.

Enfin, il y aura un BlockingQueueSpliterator pour chaque opération à effectuer sur le flux. Chaque BlockingQueueSpliterator aura une référence à l’un des BlockingQueues rempli par ForkingStreamConsumer, et il peut être implémenté comme indiqué dans la liste suivante.

Figure C.5. Un Spliterator lisant les éléments qu’il traverse à partir d’un BlockingQueue

Dans cette figure, un Spliterator est implémenté, non pour définir la politique de division d’un flux mais uniquement pour utiliser sa capacité de liaison tardive. Pour cette raison, la méthode trySplit n’est pas implémentée.

En outre, il est impossible de renvoyer une valeur significative à la méthode estimatedSize car vous ne pouvez pas prévoir le nombre d’éléments pouvant encore être extraits de la file d’attente. De plus, étant donné que vous n’essayez pas de scinder, cette estimation sera inutile. Cette implémentation ne possède aucune des caractéristiques de Spliterator listées dans le tableau 7.2, donc la méthode de characteristic retourne 0.

La seule méthode implémentée ici est tryAdvance, qui attend de sa BlockingQueue les éléments du flux original ajouté par ForkingStreamConsumer. Elle envoie ces éléments à un consommateur qui (basé sur la manière dont ce Spliterator a été créé dans la méthode getOperationResult) est la source d’un autre flux (sur lequel la fonction correspondante, passée à l’une des invocations de méthode fork, doit être appliquée). La méthode tryAdvance renvoie true, pour notifier son invocateur qu’il existe d’autres éléments à consommer, jusqu’à ce qu’il trouve dans la file d’attente l’objet spécial ajouté par ForkingStreamConsumer pour indiquer qu’il n’y a plus d’éléments à retirer de la file d’attente.

La Figure C.2 montre une vue d’ensemble du StreamForker et de ses blocs de construction.

Sur la figure, le StreamForker en haut à gauche a une carte, où chaque opération à effectuer sur le flux, définie par une fonction, est indexée par une clé. Le ForkingStreamConsumer sur la droite contient une file d’attente pour chacune de ces opérations et consomme tous les éléments du flux d’origine, en les multiplexant à toutes les files d’attente.

Au bas de la figure, chaque file d’attente a un BlockingQueueSpliterator tirant ses éléments et agissant comme une source pour un flux différent. Enfin, chacun de ces flux, bifurqué par l’original, est passé en argument à l’une des fonctions, exécutant ainsi l’une des opérations à effectuer. Vous avez maintenant tous les composants de votre StreamForker, il est donc prêt à l’emploi.

C.1.3. Mettre le StreamForker au travail

Mettons le StreamForker au travail sur le modèle de données de menu que nous avons défini au chapitre 4, en forkant le flux original de plats à effectuer  en quatre opérations différentes en parallèle, comme indiqué dans la liste suivante. En particulier, vous voulez générer une liste séparée par des virgules des noms de tous les plats disponibles, calculer le total des calories du menu, trouver le plat avec le plus de calories, et regrouper tous les plats par leur type.

Figure C.6. Mettre le StreamForker au travail

Le StreamForker fournit une API pratique et fluide permettant de classer un flux et d’assigner une opération différente à chaque flux forké. Ces opérations sont exprimées en termes de fonctions appliquées sur le flux et peuvent être identifiées par n’importe quel objet arbitraire; Dans ce cas, nous avons choisi d’utiliser des Strings. Lorsque vous n’avez plus de fork à ajouter, vous pouvez appeler getResults sur StreamForker pour déclencher l’exécution de toutes les opérations définies et obtenir StreamForker.Results. Étant donné que ces opérations sont exécutées de manière interne de manière asynchrone, la méthode getResults retourne immédiatement, sans attendre que tous les résultats soient disponibles.

Vous pouvez obtenir le résultat d’une opération spécifique en transmettant la clé utilisée pour l’identifier dans l’interface StreamForker.Results. Si entre-temps le calcul de cette opération se termine, la méthode get retournera le résultat correspondant; sinon, il bloquera jusqu’à ce qu’un tel résultat ne soit pas disponible.

Comme prévu, ce morceau de code génère la sortie suivante:

C.2. Considérations de performance

Pour des raisons de performances, vous ne devez pas tenir pour acquis que cette approche est plus efficace que de traverser plusieurs fois le flux. La surcharge causée par l’utilisation des files d’attente de blocage peut facilement l’emporter sur les avantages de l’exécution des différentes opérations en parallèle lorsque le flux est constitué de données qui sont toutes en mémoire.

Inversement, l’accès au flux une seule fois peut être un choix gagnant lorsque cela implique des opérations d’E / S coûteuses, comme lorsque la source du flux est un fichier volumineux; donc (comme d’habitude) la seule règle significative lorsque l’optimisation de la performance de votre application est de « Just measure it! »

Cet exemple montre comment il est possible d’exécuter plusieurs opérations sur le même flux en une fois. Plus important encore, nous pensons que cela prouve que même si une fonctionnalité spécifique n’est pas fournie par l’API Java native, la flexibilité des expressions lambda et un peu de créativité pour réutiliser et combiner ce qui est déjà disponible peut vous permettre d’implémenter la fonctionnalité manquante. .