Programmation "au fil de l’eau" (streams)

(((stream programing))

images/blackbelt.png

A partir de Java 8 il a été introduit un paradigme de programmation basée sur le notion de stream (traduite ici par "programmation au fil de l’eau").

Ce principe existe depuis longtemps en particulier dans des langages de script (shells UNIX, powerShell,…).

L’idée fondamentale est de former un flot de données (un pipeline) et de positionner sur ce flot des codes qui d’un coté consomment les données qui arrivent et de l’autre font passer ces données aux codes suivants.

Un premier point important en ce qui concerne les pipelines Java est que les objets ne circulent qu'à la demande: on peut ainsi créer des producteurs qui n’envoient des données qu’au fur et à mesure qu’elles seront consommées. Un exemple déjà connu de ce type de mécanisme est l’itérateur: on peut éventuellement ne pas avoir tous les objets concernés en mémoire (comme dans une ArrayList ou un tableau) mais les fabriquer à la demande (exemples: lecture progressive dans un cache de base de données, générateurs de données). Un autre exemple serait celui des principes associés au java.util.concurrent.Flow que nous verrons ultérieurement.

Le package java.util.stream définit essentiellement des interfaces qui décrivent les comportements des codes qui vont opérer sur les Streams.

Il va exister deux types principaux de codes:

En principe la mise en place d’un tel dispositif devrait permettre de réaliser des pipelines opérant en parrallèle. Le code "fournisseur" prend en charge la mise en place des Threads qui vont faire fonctionner les pipelines et l’agent collecteur devra centraliser les résultats provenant de plusieurs tâches.

Les codes intermédiaires doivent donc obéir à des contraintes strictes: ils ne doivent pas agir sur la source (une fois que le flot a commencé à "couler"), et, dans la plupart de cas, il ne gèrent pas d'état (leur production ne doit pas être basée sur des critères qui évolueraient au cours de l’exécution).

Un stream n’est utilisable qu’une seule fois! (normal : quand on a consommé on ne peut pas s’attendre à une régénération spontanée des données).

[Note]Avertissement important

Si le paradigme des streams a été proposé pour mettre en place des codes profitant pleinement du parallélisme il n’est pas garanti que les gains de performance seront au rendez-vous!

En effet les performances réélles dépendent étroitement des conditions de mise en place du parallélisme au niveau de la source (nombre de Threads, caractéristiques du découpage des blocs de données sur lesquelles opérer, etc…)

L’expérience accumulée avec l’utilisation des codes de map/reduce ForkJoinTask dans java.util.concurrent montre qu’il faut être très attentif aux réglages et que ceci est un travail de spécialiste.

Une application de traitement lourd basée sur des stream devra être soigneusement calibrée pour être efficace - voir ci-dessous la note sur les SplitIterator -

Exemple (complexe) de codes :

L’objectif ici est de dire pour chaque capteur le nombre de valeurs anormales (inférieures à un minimum, supérieures à un maximum)

Décomposition:

Exemple de résultat d’exécution de la méthode (ici le nom des capteurs montre comment l’optimiseur de parallélisme a découpé les Threads):

{capteur_21=97, capteur_32=53, capteur_43=22, capteur_54=11, capteur_1=17}
nombre d'éléments traités
{capteur_21=500, capteur_32=250, capteur_43=125, capteur_54=62, capteur_1=62}
[Note]calibrage des SplitIterators

La plupart du temps on est amené à utiliser des Streams générés par les librairies standard de Java. Toutefois, dans certains cas, on peut être amené à écrire son propre générateur de Stream, en réalisant un SplitIterator particulier. Ceci permet d’optimiser les stratégies de partitionnement des traitements en parallèle mais demande des réglages soigneux.

Globalement pour la réalisation d’un SplitIterator on lira avec soin la documentation javadoc . Les méthodes suivantes sont les plus importantes:

  • boolean tryAdvance(Consumer<? super T> action) : c’est la méthode qui permet de générer un élément qui sera envoyé dans le Stream … pas de difficulté particulère de ce coté
  • par contre ces méthodes sont importantes pour le calibrage:

    • Spliterator<T> trySplit() : permet de générer un sous-iterateur dans un thread différent. Quand le code ne souhaite plus de partitionnement supplémentaire il faut rendre null. En première approximation une stratégie possible est de découper en N threads équilibrés N étant le nombre de processeurs obtenus par Runtime.getRuntime().availableProcessors()
    • int characteristics() : renvoie un entier qui est une combinaison des "caractéristiques" de l’iterateur courant. Exemple de code :

          int res = CONCURRENT | DISTINCT | IMMUTABLE | NONNULL;
              if(isGenerated){
              // le booleén est ici une implantation spécifique à cette réalisation
              //(il n'est pas standard)
                  return res | SIZED | SUBSIZED ;
              }
              return res ;

      (bien regarder la documentation des constantes correspondantes)

    • long estimateSize() renvoie une estimation du nombre d'éléments qui reste à consommer (ou Long.MAX_VALUE s’il est impossible de donner cette estimation).
[Note]les streams sont AutoCloseable

La classe qui implante le service de SplitIterator peut avoir besoin de clore les opérations (par exemple en étant doté d’une méthode close()). La mise en place de l’appel automatique de cette cloture implique que l’opération soit notifiée au Stream:

 public Map<String, Integer> stats(Spliterator<RapportCapteur> source,
                                     Runnable closeHandler, °°°) {
        // autoCloseable
        try (Stream<RapportCapteur> stream = StreamSupport.stream(source, true)) {
            stream.onClose(closeHandler) ;
            Stream<RapportCapteur> pipeline = stream ;
            pipeline = pipeline.filter(rapport -> {
                return °°°
            });
       °°°
   }
 }

--exercice--