Le package java.nio

images/brownbelt.png

Le package java.nio introduit un certain nombre de mécanismes d’entrée/sortie qui ne se situent pas dans la philosophie des flots:

Mise en oeuvre de Buffers

  1. Allocation d’un Buffer

    int taille = 2 + 30 ;
    ByteBuffer buff= ByteBuffer.allocate( taille ) ;

    Il est également possible d’avoir une allocation avec corrrespondance directe avec un dispositif sous-jacent (mapping avec dispositif de lecture/écriture d’un fichier par ex.). Dans ce cas les reads explicites et les rewinds des phases 2 et 3 ne sont pas nécessaires.

    Ici la position courante est au début du buffer

  2. Lecture (par exemple) à partir d’un Channel :

    ReadableByteChannel chan =
        new RandomAccessFile(nomFic,"rw").getChannel() ;
    chan.read(buff) ;

    Ici la posiiton courante est en fin de Buffer

  3. Pour exploitation du Buffer il faut "ramener" le curseur:

    buff.rewind() ;

    Ici la position courante est au début du buffer

  4. On exploite maintenant le Buffer:

    short num = buff.getShort() ;

    Ici la position courante est au début du buffer + 2 octets.

Un exemple de mise en oeuvre de Buffers

Exemple de codage de protocoles utilisant des données de taille fixe: on code ici une chaine de taille maximum 30 ; le protocole demande que l’on indique d’abord la taille de la chaîne

public class ProtocolA {
   public final int TAILLE_MAX = 30;
   public static final String CODAGE = "US_ASCII";
   ByteBuffer buff = ByteBuffer.allocate(2 + TAILLE_MAX) ;

   public synchronized String decode(ReadableByteChannel chan)
            throws IOException{
      buff.clear() ;// Attention ne fait pas le ménage!
      if( 0> chan.read(buff)) return null ;
      buff.rewind() ;
      short num = buff.getShort() ;
      byte[] octets = new byte[num] ;
      buff.get(octets) ;
      return new String(octets, CODAGE) ;
   }

   public synchronized void code (WritableByteChannel chan, String chaine)
            throws IOException {
      buff.clear(); // ne fait pas le menage
      byte[] octets = chaine.getBytes(CODAGE) ;
      short num = (short) octets.length ;
      if( num > TAILLE_MAX) {
         throw new IllegalArgumentException(chaine+ ":too long");
      }
      buff.putShort(num) ;
      buff.put(octets) ;
      buff.rewind() ;
      chan.write(buff) ;
   }

   °°°°
}

L’exploitation de protocoles spécifiques à d’autres plate-formes non-java est accessible du fait de l’existence de convertisseurs de caractères (java.nio.charset -l’utilisation des services de String employés ici sont très couteux-) et de Buffers spécialisés avec une autre convention poids-fort/poids-faible (par ex. appel de: buff.order(ByteOrder.LITTLE_ENDIAN) ;)

(on notera dans l’exemple que le Buffer buff n'étant pas "nettoyé" on trouvera dans le Channel des octets sans signification -la méthode clear() n’efface pas les octets dans le Buffer-).

Une mise en oeuvre de la classe précédente :

  public static void main(String[] args) throws Exception {
    RandomAccessFile rfil = new RandomAccessFile(args[0],"rw") ;
    try {
        FileChannel fc = rfil.getChannel() ;
        ProtocolA proto = new ProtocolA() ;
        String res;
        while(null != (res = proto.decode(fc))) {
            System.out.println(res) ;
        }
    } finally {
        rfil.close() ;
    }
    °°°
  }

Ventilation des lectures/écritures dans des Buffers

Les channels GatheringByteChannel et ScatteringByteChannel acceptent des opérations sur un ensemble ordonné de ByteBuffers (on découpe ainsi le traitement des différentes parties d’un protocole).

Soit:

Charset ascii = Charset.forName("US-ASCII") ; // exception possible

utilisation pour un protocole :

  public void code (GatheringByteChannel gath, short val, String chaine)
           throws IOException {
     ByteBuffer myShort = ByteBuffer.allocate(2) ;
     myShort.order(ByteOrder.LITTLE_ENDIAN) ;
     CharBuffer myJavaString = CharBuffer.allocate(LG_CHAINE);
     myJavaString.put(chaine) ;
     myJavaString.rewind() ;
     myShort.putShort(val) ;
     myShort.rewind() ;
     ByteBuffer buff = ascii.encode(myJavaString) ;
     gath.write(new ByteBuffer[] {myShort, buff}) ;
  }

Les channels associé à des fichiers (FileChannel), à des Sockets (SocketChannel), à des datagrammes (DatagramChannel) et à des "pipes" java (Pipe.SinkChannel et Pipe.SourceChannel) ont des capacités de ventilation.

E/S asynchrones , sélecteurs

Les SelectableChannels permettent des opérations d’entrée/sortie non bloquantes. On enregistre chaque Channel en attente d’une opération auprès d’un Selector. On peut ainsi réaliser des serveurs utilisant un nombre limité de threads.

Dans cet exemple c’est un code "client" qui va tenter de contacter en parallèle un certain nombre de serveurs pour obtenir le service echo (service standard sur le port 7).

public class Echo {
   public static final int ECHO_PORT = 7 ;
   public static final long TIME_OUT = 60000L ;

   public static void main (String[] args) throws Exception {
      Selector veille = Selector.open() ;
      ByteBuffer message = ByteBuffer.wrap("hello".getBytes("US-ASCII")) ;
      for(int ix = 0 ; ix < args.length ; ix++) {
         try {
            SocketChannel sckc = SocketChannel.open() ;
            sckc.configureBlocking(false) ;
            SelectionKey skey = sckc.register(veille,
               SelectionKey.OP_READ| SelectionKey.OP_CONNECT);
            skey.attach(args[ix]) ;// annotation libre!
            sckc.connect( new InetSocketAddress(args[ix], ECHO_PORT)) ;
         } catch (Exception exc) {
            Logger.getGlobal().log(Level.SEVERE,args[ix],exc) ;
         }
      }
      °°°° // suite dans un prochain code ....

Ici on crée un SocketChannel par nom de machine hôte passé en paramètre. Chaque SocketChannel est rendu asynchrone et est enregistré auprès d’un "sélecteur" . Ce Selector permet une opération bloquante select qui permet d’obtenir un ensemble de channels sur lesquels une opération d’E/S est en attente (ici on s’inscrit pour surveiller les opérations de connection OP_CONNECT et les opérations de lecture OP_READ).

Suite du code :

         while( veille.keys().size() >0 ) {
            try {
               // appel bloquant
               int nbSelected = veille.select(TIME_OUT) ;
               if(nbSelected == 0) {
                  System.out.println("pas de selection") ;
               }
               Iterator it = veille.selectedKeys().iterator() ;
               while(it.hasNext()) {
                  SelectionKey skey = (SelectionKey) it.next() ;
                  SocketChannel sckc =(SocketChannel) skey.channel();
                  if( skey.isReadable()) {
                     System.out.println(
                        "response from "+skey.attachment()) ;
                     skey.cancel() ;// annulle clef
                     sckc.close() ; // ferme le channel
                  } else if (skey.isConnectable() ) {
                     sckc.finishConnect() ;// conclusions connx
                     System.out.println(
                        "connected to " +skey.attachment()) ;
                     message.rewind() ;
                     sckc.write(message) ;
                  }
               }
         } catch (IOException exc) {
            Logger.getGlobal().log(Level.SEVERE,"",exc) ;
         }
      }

Ici on "sort" de chaque select avec un ensemble (Set) de clefs qui permettent de retrouver le channel correspondant. On peut tester le type d’opération en cours.

  • pour une opération de lecture, on invalide la clef et on ferme le channel.
  • pour une opération de connexion, on termine les opérations de connexion (finishConnect) et on envoie le message pour le service echo.

Le read de lecture n’est pas utilisé ici. Du fait de l’asynchronisme il n’est pas garanti que toute lecture soit complète: il faut donc soigneusement mettre au point tout protocole d'échange.