Programmation parallèle et accès concurrents

images/blackbelt.png

L’amélioration des performances de l’exécution multi-tâches, en particulier dans des environnements multi-processeurs, a été un souci constant des fournisseurs de J.V.M.

Du point de vue des spécifications du langage de nombreuses précisions ont été apportées à partir de java 1.5 pour rechercher le meilleur compromis possible entre des comportements reproductibles entre plate-formes et une meilleure prise en compte des threads natifs du système local (voir le chapitre des nouvelles spécifications qui traite du modèle mémoire).

JSE 5 a introduit également un ensemble d’utilitaires de niveau intermédiaire pour la gestion des concurrences d’accès: le package java.util.concurrent et ses sous-packages.

Informations de pile

L’objet java.lang.StackWalker permet d’obtenir des informations sur l'état de la pile d’exécution courante (objets StackWalker.StackFrame).

A un moment donné de l’exécution on pourra savoir sur le Thread courant quelles sont les invocations qui ont mené au point courant.

Les packages java.util.concurrent

Les services de bases de Java concernant les concurrences d’accès (blocs synchronized, méthodes wait() et notify()) sont parfois de trop bas niveau pour des applications sophistiquées. On est amené à construire des utilitaires dont la conception est très délicate: le package java.util.concurrent et ses sous-packages offrent des services généralistes de plus haut niveau pour répondre à des besoins comme par exemple: comment acquérir un verrou dans une méthode et le relacher dans une autre? Comment créer des sections de codes qui supportent des "lectures" en parallèle avec un mode exclusif réservé aux modifications? etc.

Ces packages offrent un ensemble de services dédiés aux verrous, aux files d’attente, à la gestion de tâches.

Sémaphores

Un objet Semaphore représente un ensemble de "permis". La méthode acquire bloque le thread appelant jusqu'à ce qu’un permis soit disponible (un permis est rendu par release). On peut donc utiliser un sémaphore pour restreindre le nombre de threads qui peut accéder à une ressource.

L’option booléenne fair permet de garantir que les threads bloqués seront réactivés dans l’ordre des demandes bloquées (dans toutes ces librairies cette option stratégique pénalise les performances mais peut s’avérer essentielle).

import static java.util.concurrent.TimeUnit.* ;

public abstract class ServiceRestreint {// goulot d'étranglement
   protected Semaphore controleurEntrées;
   long attenteAdmissible ;
   °°°
   public ServiceRestreint(int maxU, long maxAttente,°°°) {
      controleurEntrées = new Semaphore(maxU, true) ;
      attenteAdmissible = maxAttente ;
      °°°
   }

   public void doIt() throws Exception {
      if(! controleurEntrées.tryAcquire(
         attenteAdmissible, SECONDS)) {
         throw new ExceptionAttente(°°°) ;
      }
      try {service() ;}
      finally { controleurEntrées.release();}
   }

   protected abstract void service() throws Exception ;
}

On notera ici l’usage du try/finally: on utilisera également cette précaution avec les objets de verrouillage qui implantent l’interface java.util.concurrent.Lock

    Lock verrou = °°°° ;
    verrou.lock() ;
    try { /*code sous protection du verrou*/ °°°}
    finally { verrou.unLock() ; }

Voir ci-après un exemple de Lock.

Verrous

Un java.util.concurrent.ReentrantLock est un verrou d’exclusion mutuelle qui reproduit d’assez près les services d’un bloc synchronized.

Un tel verrou est acquis par un thread qui arrive à exécuter la méthode lock(). Si le thread courant est déjà propriétaire du verrou cet appel est sans effet. Si par contre un autre thread a acquis le verrou le thread courant reste bloqué.

Comme pour tous les Locks on peut associer à cet objet un ou plusieurs Conditions qui sont des objets qui factorisent les services de moniteur (wait, notify, notifyall). L’appel sur un objet Condition d’une méthode comme await() bloque le thread appelant et lui fait relacher le Lock associé. Avant de reprendre l’exécution le thread courant doit re-acquérir le verrou.

La sortie du blocage survient lorsque :

  • il y a appel de signal() ou signalAll() sur l’objet condition
  • il y a éventuellement interruption du thread
  • il y a éventuellement une fin de temporisation (timeout)

La spécification précise que certaines sorties intempestives sont possibles du fait de contraintes liées au système sous-jacent (spurious wakeup). Il est donc essentiel de disposer d’une condition de mise en latence qui soit retestée par un while.

[Note]Illustration

Dans l’exemple ci-après on a une Console qui permet de visualiser des messages présentés par des threads différents. Chaque message doit être affiché entièrement et donc le thread qui affiche acquiert un verrou.

L’utilisateur a le choix de bloquer différents type de messages (ici un CheckBox lui permet d’autoriser un type de message: les deux types "Système" et "Utilisateur" sont éventuellement permis en même temps).

Un thread qui cherche à afficher un type de message doit donc non seulement acquérir le verrou mais tester une condition associée. Si le type de message correspondant n’est pas autorisé le thread bloque.

On a donc deux files liées aux blocages d’attente (les deux objets Condition permettent de rendre ce service). Etudier soigneusement l’utilisation des booléens associés ici à ces objets.

Un ReentrantReadWriteLock permet de gérer deux verrous associés spécialisés l’un dans les opérations de consultation l’autre dans les opérations de modification. Un verrou obtenu en lecture n’est pas opposable à une demande de verrou en lecture, par contre l’obtention d’un verrou en écriture est exclusive. Un thread qui a obtenu un verrou en écriture peut le reacquérir en lecture (l’inverse n’est pas vrai) et on peut aussi convertir (downgrade) le verrou en écriture en verrou de lecture.

public class ConsoleAdmin extends Panel {
  private TextArea affichage = new TextArea(20,60) ;
  private Checkbox checkUt = new Checkbox("Utilisateur", false) ;
  private Checkbox checkSys = new Checkbox("Système", false) ;
  private ReentrantLock accès = new ReentrantLock(); // pas "fair"
  private volatile boolean utPermis = false ; //voir AtomicBoolean
  private Condition condUt = accès.newCondition() ;
  private volatile boolean sysPermis = false ;
  private Condition condSys = accès.newCondition() ;
  public ConsoleAdmin() {
   °°°°
   checkSys.addItemListener(new ItemListener() {
      public void itemStateChanged(ItemEvent evt) {
         if(SELECTED == evt.getStateChange()) {
            sysPermis= true ; // fixé en dehors du verrouillage ....
            accès.lock() ;
            condSys.signalAll() ;
            accès.unlock() ;
         } else {sysPermis=false ;}
   }}) ;
   checkUt.addItemListener(new ItemListener() { °°°° // idem
   }});
   °°°°°
  }

  private void afficher(Iterable<String> message) {
   °°°°// met le message intégralement sur affichage
  }

  public void afficherMessageSystème(Iterable<String> mess){
   accès.lock() ;
   try {
      while(!sysPermis) { condSys.await() ;}//problème ...
      afficher(mess) ;
   } catch(Exception exc) {
      °°°° // Log
   } finally {accès.unlock() ;}
  }

  public void afficherMessageUtilisateur(Iterable<String> mess){
   °°°° // idem
  }

}

Un StampedLock permet des lectures optimistes (voir les exemples dans la documentation de l’API).

Rendez-vous, passages de temoin

CyclicBarrier permet à un nombre N de threads de s’attendre les uns les autres jusqu'à ce que tous aient invoqué await() sur l’objet. (voir également Phaser un mécanisme plus complexe avec des phases d’enregistrement/désenregistrement)

CountDownLatch : un point de synchronisation sur lesquels des threads qui exécutent l’appel await() vont bloquer jusqu'à ce que N appels de countDown() aient été exécutés sur l’instance.

SynchronousQueue<T> est un dispositif de passage de donnée entre deux threads: les méthodes put(T témoin) et T take() sont bloquantes en attente l’une de l’autre.

Exchanger<T>: un point de synchronisation où deux threads peuvent s'échanger une donnée chacun via T exchange(T donné) .

Trames d’exécution et promesses

Très classiquement les serveurs optimisent leur utilisation des threads en créant des réserves (pool) de threads réutilisables. On crée un ensemble de threads, on les stocke dans une structure de donnée, et lorsqu’on dispose d’une tâche à exécuter on réveille le thread qui appelle la méthode run() du Runnable fourni; quand cette méthode est exécutée le thread est remis en réserve en attente d’une autre exécution. On a grosso-modo des codes de thread qui ressemblent à :

public class ThreadReutilisable {
   Runnable tâcheCourante ;
   °°°°
   public void execute( Runnable tâche) {
      tâcheCourante = tâche ;
      //déblocageAttente
   }

   public void run() {
      while(true) {
         //blocageEnAttente
         tâcheCourante.run() ;
      }
   }
}

Un ThreadPoolExecutor dispose ainsi d’une méthode execute(Runnable) qui permet de soumettre une tâche pour exécution future. L’utilitaire dispose de nombreux paramètres de configuration et de méthodes d’administration.

En pratique l’instanciation de ces objets est confiée à des méthodes fabrique de la classe Executors: les pools sont alors "vus" comme des objets implantant les interfaces ExecutorService ou ScheduledExecutorService qui en plus du execute(Runnable) proposent, entre autres, les services:

Future<?> submit(Runnable tâche) ;
Future<T> submit(Callable<T> tâche)
ScheduledFuture<T> schedule(Callable<T> tâche,long dl,TimeUnit u)

On objet Callable est semblable à un Runnable mais sa méthode call() rend un résultat et propage des exceptions.

Un Future représente une "promesse" : le résultat d’une exécution asynchrone: sa méthode get peut être invoquée avec une limite d’attente.

// un Callable avec calculs lourds
private static class Calculateur implements Callable<BigDecimal> {
   BigDecimal big ;
   public Calculateur (BigDecimal dec) {big = dec ;}
   public BigDecimal call() throws Exception {
      return °°° ;// calcul très long sur "big"
   }
}
°°°
// un cache de résultats
LinkedHashMap<String, Future<BigDecimal>> cache =
   new LinkedHashMap<String, Future<BigDecimal>> (16,0.75F,true){
      public boolean removeEldestEntry( Map.Entry<String,Future<BigDecimal>>m) {
            return 100 < size() ;
      }
   } ;
°°°
// un service d'exécution des calculs lourds
ExecutorService exécuteur = Executors.newFixedThreadPool(3) ;
°°°
   // exploitation (code non parallèle)
   String val = °°°
   Future<BigDecimal> fut = cache.get(val) ;
   if(null == fut) {
      Future<BigDecimal> newFut = exécuteur.submit(
         new Calculateur(new BigDecimal(val))) ;
      cache.put(val, newFut) ;
      °°°°
   } else {
      if(fut.isDone()){
         showFuture(fut) ;
      }
   }

°°°° // quelque part ailleurs on examine les Future non consultés
public void showFuture( Future<?> future) {
   try {
      show( future.get()) ; // par ex. System.out.println
      // get est bloquant
   } catch (Exception exc) {
      show("err sur" + future + ":" + exc) ;
   }
}

Enchainements de promesses

Les différents types de Future ont la propriété de devoir être exploités par d’autres codes qui testent la disponibilité des résultats (ou des erreurs). Cela pose d’autres problèmes de gestion des tâches.

Il doit être possible d’associer directement à une demande d’exécution un enchainement d’action(s). Ces actions seront directement exécutées dès que la tâche sera terminée (pour quelque raison que ce soit).

A un CompletableFuture sont associés des phases de traitements qui se succèdent (interface CompletionStage)

Un exemple simpliste: soit

  • une méthode statique BigInteger calcule(BigInteger) de la classe calcul
  • un objet afficheur qui affiche(Object)
  • Un objet java.util.Random et un objet int taille
CompletableFuture.supplyAsync(()-> BigInteger.probablePrime(taille, random))
        .thenApply(Calcul::calcule)
        .thenAccept(afficheur::affiche);

Ici on a une génération d’un nombre, suivi d’un calcul, suivi d’un affichage.

  • le code s’exécute de manière asynchrone dans un Thread daemon (ce qui veut dire que la machine virtuelle peut s’arréter avant que le résultat s’affiche!).
  • noter que le résultat de la méthode thenAccept(Consumer<? super T> action) rend un CompletableFuture<Void> alors que celui de thenApply(Function<? super T,? extends U> fn) prend une fonction (ici, par chance, l’argument et le résultat sont du même type BigInteger)

Quand on considère un CompletableFuture il faut distinguer les phases qui prennent une Function, un Consumer ou un Runnable (respectivement méthodes apply, accept ou run) et considérer :

  • Les méthode fabriques (static) qui permettent d’initialiser le lancement:

    • supplyAsync(Supplier) prend une fonction fournissant l'élément initial (une version permet de spécifier l'Executor qui va gérer le Thread)
    • runAsync(Runnable) va exécuter un Runnable (mais ici les enchainements de tâches ne se communiquent pas de données).
  • Les spécifications d’enchainements: méthodes thenApply et handle (avec une version Async permettant de confier la tâche à un autre fil d’exécution). handle a l’avantage de permettre de spécifier des traitements en cas de déclenchement d’exception (voir les méthodes failed , exceptionally, completeExceptionally et cancel)
  • Les méthodes de "complétion" de phase: les enchainements éventuels suivants ne récupèrent pas de données (résultat CompletableFuture<Void>) ce sont les méthodes thenAccept (idem avec version Async) et whenComplete (qui permet de prendre en compte un déclenchement d’exception).
  • Les "rendez-vous" qui permettent des enchainements liants divers CompletableFuture : applyToEither, thenCombine, thenCompose, acceptEither, thenAcceptBoth, runAfter
  • Les méthodes classiques de Future (si on veut, par exemple, récupérer un résultat à la manière des Future)

Pour plus de détails voir la documentation de l’interface CompletionStage

Accès atomiques

Pour optimiser les performances un thread a le droit de maintenir une vision "locale" de la mémoire. Les blocs synchronized ont pour effet d’harmoniser cette vision locale et la mémoire de référence. Lorsqu’une variable est modifiée et lue par plusieurs threads en dehors des blocs de synchronisation elle doit être déclarée volatile.

La modification atomique des variables en dehors des parties de codes verrouillées est l’objet du package java.util.concurrent.atomic. La notion de volatile est englobée et étendue à des modifications de champs ou d'éléments d’un tableau et à des opérations de type "test and set".

public class Avion {
   AtomicReferenceArray<Passager> places;

   public Avion(int n) {
      places = new AtomicReferenceArray<Passager>(n) ;
   }

   public void réservationOptimiste(int numPlace, Passager passager)
         throws ExceptionReservation{
      if(! places.compareAndSet(numPlace, null, passager)){
         throw new ExceptionReservation(numPlace, passager) ;
      }
      °°°
   }
}

Les classes XXXAccumulator et XXXAdder permettent de garantir des modifications de valeurs sur des double et des long.

Collections optimisées

Le package java.util.concurrent offre aussi quelques collections optimisées pour la recherche de performances en situation d’accès concurents : ConcurrentHashMap et ConcurrentLinkedQueue.

CopyOnWriteArrayList et CopyOnWriteArraySet fournissent des structures optimisées lorsque le nombre de lectures est important par rapport au nombre de modifications.

Traitements parallélisables

Certains traitements demandant beaucoup de puissance de calcul sont susceptibles d'être traités par des algorithmes parallèles. Ces pratiques prennent actuellement leur essor à la fois du fait de la nécessité de traiter de grandes quantités de données (Big Data) et aussi du fait de progrès techniques qui concernent les architectures matérielles avec de nombreux threads natifs (et aussi la collaboration de machines en réseau).

Il existe de grandes variétés d’algorithmes (parfois très complexes) pour paralléliser les traitements mais si certaines conditions sont réunies ont peut avoir des utilitaires qui prennent en charge cette complexité.

Quelques conditions qui simplifient l’usage de ces outils:

  • avoir des données que l’on peut disjoindre en sous-ensembles indépendants les uns des autres.
  • avoir des traitements autonomes sur ces sous-ensembles. En d’autres termes les traitements sur un sous-ensemble ne doivent pas affecter un autre sous-ensemble ou ne doivent pas attendre de données venues d’un autre traitement (sauf en phase finale de recollement des résultats).

Quelques mots-clefs sur les patterns correspondant:

  • fork/join : disjoindre des tâches, les exécuter, puis recollement des résultats. Dans le package java.util.concurrent on aura des ForkJoinTask (une tâche découpable) exécutées par des ForkJoinPool (un service d’exécution capable de gérer des ForkJoinTask en les découpant et en assurant la répartition des sous-tâches entre threads). Une amélioration concerne la répartition des tâches: si un thread a fini son travail il peut venir au secours d’un autre thread en lui "volant" une partie de son travail (et donc en détachant un sous-ensemble du travail effectué par un autre).

- map/filter/reduce : un processus avec recherche de données, succession de traitements qui lisent des données en entrée et produisent des données "filtrées" en sortie, puis recollement. Cette technique relève du paradigme des streams en java 8 (ne pas confondre avec les streams d’entrée/sortie).

Fork/join

Les tâches soumises au ForkJoinPool sont décrites de manière recursive. En pseudo-code on pourrait décrire une tâche de nature "fonctionnelle" (ForkJoinTask) de la manière suivante:

public Tache extends RecursiveTask<Resultat> {
   // données (partionnables, partageables par plusieurs threads de manière indépendante)
   // constructeur
   @Override
   public Resultat compute() {
      // par exemple taille inférieure à une taille critique
      if(data.okPourExecSéquentielle()) {
         return executionSéquentielle() ;
      }
      // le plus simple est de découper en 2!
      // mais on pourrait découper en plusieurs morceaux
      Tache[] taches = this.partitionne() ;
                ForkJoinTask.invokeAll(taches) ;
      Resultat res = Resultat.INITIAL ; // par ex. zero
                for(Tache tache: taches) {
         res = res.accumule(tache.join()) ;
      }
      return res ;
   }

   private Resultat exécutionSéquentielle() {
      // on fait le travail vite fait/bien fait
   }
}

Le lancement :

   Tache tache = new Tache(.....) ;
   ForkJoinPool pool = nex ForkJoinPool() ;
   Resultat res = pool.invoke(tache) ;