(((stream programing))
A partir de Java 8 il a été introduit un paradigme de programmation basée sur le notion de stream
(traduite ici par "programmation au fil de l’eau").
Ce principe existe depuis longtemps en particulier dans des langages de script (shells UNIX, powerShell,…).
L’idée fondamentale est de former un flot de données (un pipeline) et de positionner sur ce flot des codes qui d’un coté consomment les données qui arrivent et de l’autre font passer ces données aux codes suivants.
Un premier point important en ce qui concerne les pipelines Java est que les objets ne circulent qu'à la demande: on peut ainsi créer des
producteurs qui n’envoient des données qu’au fur et à mesure qu’elles seront consommées.
Un exemple déjà connu de ce type de mécanisme est l’itérateur: on peut éventuellement ne pas avoir tous les objets concernés en mémoire (comme
dans une ArrayList
ou un tableau) mais les fabriquer à la demande (exemples: lecture progressive dans un cache de base de données, générateurs de données). Un autre exemple serait celui des principes associés au java.util.concurrent.Flow
que nous verrons ultérieurement.
Le package java.util.stream
définit essentiellement des interfaces qui décrivent les comportements des codes qui vont opérer sur les Streams
.
Il va exister deux types principaux de codes:
des opérations intermédiaires :
En principe la mise en place d’un tel dispositif devrait permettre de réaliser des pipelines opérant en parrallèle.
Le code "fournisseur" prend en charge la mise en place des Threads
qui vont faire fonctionner les pipelines et l’agent collecteur
devra centraliser les résultats provenant de plusieurs tâches.
Les codes intermédiaires doivent donc obéir à des contraintes strictes: ils ne doivent pas agir sur la source (une fois que le flot a commencé à "couler"), et, dans la plupart de cas, il ne gèrent pas d'état (leur production ne doit pas être basée sur des critères qui évolueraient au cours de l’exécution).
Un stream n’est utilisable qu’une seule fois! (normal : quand on a consommé on ne peut pas s’attendre à une régénération spontanée des données).
![]() | Avertissement important |
---|---|
Si le paradigme des streams a été proposé pour mettre en place des codes profitant pleinement du parallélisme il n’est pas garanti que les gains de performance seront au rendez-vous! En effet les performances réélles dépendent étroitement des conditions de mise en place du parallélisme au niveau de la source (nombre de Threads, caractéristiques du découpage des blocs de données sur lesquelles opérer, etc…) L’expérience accumulée avec l’utilisation des codes de map/reduce Une application de traitement lourd basée sur des stream devra être soigneusement calibrée pour être efficace - voir ci-dessous la note sur les |
Exemple (complexe) de codes :
soit une classe :
public class RapportCapteur { private final String capteur ; private final int valeur ; .... // constructeur + accesseurs }
et un code qui exploite un générateur de ces rapports:
public Map<String, Integer> stats(Spliterator<RapportCapteur> source, boolean parallel, int min, int max) { // generation d'un pipeline Stream<RapportCapteur> stream = StreamSupport.stream(source, parallel); // mise en place des opérations intermédiaires stream = stream.filter(rapport -> { int val = rapport.getValeur(); return (val < min) || (val > max); }); // mise en place d'un collecteur Map<String, Integer> map = stream.collect( Collectors.toConcurrentMap(RapportCapteur::getCapteur, rapport -> 1, (valeurPrécedente, nombre1) -> valeurPrécedente + nombre1 )); return map; }
L’objectif ici est de dire pour chaque capteur le nombre de valeurs anormales (inférieures à un minimum, supérieures à un maximum)
Décomposition:
Le premier objectif est de générer un pipeline : il existe de nombreuses méthodes (souvent statiques) dans l’interface Stream
, la classe StreamSupport
et d’autres (voir dans la javadoc de Stream
la page générée par Use
!)
Ici le Stream est construit à partir d’un objet implantant l’interface SplitIterator
: ces objets rendent des services de consommation de données (analogues aux Iterator
)
mais permettent aux codes qui gèrenet le parallélisme de prendre des décisions de séparation en plusieurs tâches ( split)
Stream
) - ici il y a juste un filtre -
Stream
et Collector
et la classe Collectors
). Ici le Collector
génére une ConcurrentMap
et prend en paramètre: une fonction de creation de la clef, une fonction de création de la valeur initiale et une fonction qui met à jour la valeur quand deux
paires ont potentiellement la même clef (le premier argument est la valeur contenue dans la table, le second est la valeur de l’entrée candidate et le résultat
est la nouvelle valeur qui sera affectée à l’entrée dans la table).
Exemple de résultat d’exécution de la méthode (ici le nom des capteurs montre comment l’optimiseur de parallélisme a découpé les Threads
):
{capteur_21=97, capteur_32=53, capteur_43=22, capteur_54=11, capteur_1=17} nombre d'éléments traités {capteur_21=500, capteur_32=250, capteur_43=125, capteur_54=62, capteur_1=62}
![]() | calibrage des SplitIterators |
---|---|
La plupart du temps on est amené à utiliser des Globalement pour la réalisation d’un
|
![]() | les streams sont AutoCloseable |
---|---|
La classe qui implante le service de public Map<String, Integer> stats(Spliterator<RapportCapteur> source, Runnable closeHandler, °°°) { // autoCloseable try (Stream<RapportCapteur> stream = StreamSupport.stream(source, true)) { stream.onClose(closeHandler) ; Stream<RapportCapteur> pipeline = stream ; pipeline = pipeline.filter(rapport -> { return °°° }); °°° } } |