Wednesday, June 12, 2013

Concurrencia en Java Parte 7

Esta es la septima y última parte del tutorial, este link te lleva al post anterior concurrencia en java parte-6

Clases de utilería para sincronización

La API de concurrencia tiene varias clases para la sincronización en el paquete java.util.concurrent como Semaphore, CountDownLatch, CyclicBarrier, Phaser y Exchanger.

Semaphore

Un semáforo tiene un contador que permite el acceso a un recurso compartido, es similar a la interfaz Lock pero cuando un thread quiere adquirir un semáforo, este verifica su contador y si es mayor que cero, entonces el thread obtiene el semáforo y se reduce el contador. Sin embargo, si el contador es cero el thread espera hasta que se incremente el contador.

Ejemplo:
 public class Recurso {  
      Semaphore semaphore = new Semaphore(2);//Indica que dos threads pueden acceder el recurso al mismo tiempo.  
      public void lock() {  
           semaphore.acquire();//Si el contador es cero el thread se duerme, de lo contrario se reduce y obtiene el acceso  
           System.out.println(“Comenzando, el thread ” + Thread.currentThread().getName() + “ tiene el lock”);  
           try {                           
                Thread.sleep(10000);  
           } catch (InterruptedException e) {  
                e.printStackTrace();  
           }  
           System.out.println(“Terminando, el thread ” + Thread.currentThread().getName() + “ tiene el lock”);  
           semaphore.release();//Libera el semaphore e incrementa el countador  
      }  
 }  
 .......  

En el ejemplo, el método acquire() obtiene el semáforo si el contador es mayor que cero, de lo contrario se espera hasta que se incremente y reduce el contador, después el thread que obtiene el semáforo ejecuta el recurso y finalmente, se ejecuta el método release() y incrementa el contador.

Si el semáforo inicializa su contador con un valor de uno, entonces se llama un semáforo binario y se comporta como la interfaz java.util.concurrent.locks.Lock.

CountDownLatch

El CountdownLatch se puede utilizar para hacer esperar a un thread a que N threads hayan completado alguna acción, o algún tipo de acción se ha completado N veces. Tiene una contador y se inicializa con un valor entero, este contador es el número de acciones a esperar.

El CountdownLatch tiene el método countdown(), que reduce el contador interno y el método await() bloquea o pone a dormir un thread hasta que el contador llegue a cero.

Ejemplo:
 public class Prueba implements Runnable {    
   private CountDownLatch controlador = new CountDownLatch(10);  
   private CyclicBarrier cyclicBarrier;  
   public Prueba(CyclicBarrier cyclicBarrier) {  
      this.cyclicBarrier = cyclicBarrier;  
   }  
   public void setQuestionAnswered(){  
     controlador.countDown();  
   }  
   @Override  
   public void run() {  
      try {  
       controller.await();  
       //Empieza la evaluacion de la prueba  
      cyclicBarrier.await();//Espera a todos los threads o partidas  
     } catch (InterruptedException e) {  
       e.printStackTrace();  
     }  
   }    
   public double getTestResult() {  
   }  
 }  
 ..........  
 final Prueba [] pruebas = new Prueba[10];  
 CyclicBarrier cyclicBarrier = new CyclicBarrier(10,  
       new Runnable(){  
           public void run() {  
                //Obtiene el promedio de las pruebas  
                ..........  
           }                                
      });  
 for(int i=0; i<pruebas.length; i++)  
      pruebas[i] = new Prueba(cyclicBarrier);  
 ..........  

En el ejemplo la misma clase Prueba usada en el ejemplo anterior, pero aquí el CyclicBarrier obtendrá el promedio de todas las pruebas. Cuando todos los threads Prueba terminen el examen CyclicBarrier ejecutará el objeto Runnable que se pasa como parámetro y obtendrá el promedio de las pruebas.

Phaser

La clase Phaser es similar a CyclicBarrier y CountdownLatch pero más flexible, las tareas o subprocesos se sincronizan en pasos o fases. Las partes registradas para sincronizar en un Phaser pueden variar con el tiempo, se puede cambiar de forma dinámica con los métodos register(), bulkRegister().

El método arriveAndDeregister() notifica al Phaser que una tarea ha terminado y que no participará en las futuras fases, se reduce el número de partidos.

El método arriveAndAwaitAdvance() hace a una tarea esperar a que todos los participantes terminen.

Ejemplo:
 public class Prueba implements Runnable {      
   private Phaser phaser;  
   public Task(Phaser phaser) {  
      this. phaser = phaser;  
  }  
   @Override  
   public void run() {  
      try {  
        phaser.arriveAndAwaitAdvance();  
      //Empieza fase1       
      phaser.arriveAndAwaitAdvance();  
      //Empieza fase2       
      phaser.arriveAndAwaitAdvance();  
      //Empieza fase3                 
     } catch (InterruptedException e) {  
       e.printStackTrace();  
     }  
   }    
 }  
 ..........  
 final Prueba[] pruebas = new Prueba[10];  
 Phaser phaser = new Phaser(10);  
 for(int i=0; i<pruebas .length; i++)  
      pruebas[i] = new Prueba(phaser);       
 ..........  

En el ejemplo, el Phaser establece 10 partidas, cuando todas las tareas lleguen al primer arriveAndAwaitAdvance() se iniciará la fase 1 y luego van a tener que esperar a todos los threads antes de empezar la fase 2, y así sucesivamente.

Exchanger

La clase Exchanger sincroniza dos threads en un punto, cuando ambos threads llegan a este punto intercambiar un objeto. Exchanger puede ser visto como una forma bidireccional de un SynchronousQueue.

La clase Exchanger tiene el método exchange() el cual intercambia datos entre threads.

El ejemplo de productor consumidor puede ser implementado con este mecanismo.

Ejemplo:
 public class Consumidor extends Thread {  
      private Exchanger <String> exchanger;  
      public Consumidor(Exchanger exchanger) {  
           this.exchanger = exchanger;  
      }  
      public void run() {  
           String mensaje = "";  
           while (!mensaje.equalsIgnoreCase("fin")) {  
                try {  
                     mensaje = exchanger.exchange(mensaje);//Espera e intercambia el mensaje con el productor  
                     System.out.print( mensaje + " ");  
                } catch (InterruptedException e) {  
                     e.printStackTrace();  
                }  
           }            
      }       
 }  
 public class Productor extends Thread {  
      private Exchanger exchanger;  
      public Productor(Exchanger exchanger) {  
           this.exchanger = exchanger;  
      }  
      public void run() {  
           String [] mensajes = {"Hola", "mundo", "fin"};  
           for (String mensaje:mensajes) {  
                try {  
                     exchanger.exchange(mensaje);//Espera e intercambia el mensaje con el consumidor;                 
                } catch (InterruptedException e) {  
                     e.printStackTrace();  
                }  
           }  
      }            
 }  
 .......  
      Exchanger <String> exchanger = new Exchanger<String>();   
      Consumidor consumidor = new Consumidor(exchanger);  
      Productor productor = new Productor(exchanger);  
      consumidor.start();  
      productor.start();  
 .......  


Este ejemplo es similar al de la parte 4, pero en lugar de tener una clase que actúa como un monitor, aquí el Exchanger hace ese trabajo y sincroniza los mensajes entre los threads.

Monday, June 3, 2013

Concurrencia en Java Parte 6

Esta es la sexta parte del tutorial, este link te lleva al  post anterior concurrencia en java parte-5

Fork/Join Framework

El framework fork/join es una implementación de la interfaz ExecutorService y utiliza un enfoque diferente, que resuelve las dividiéndolas en tareas más pequeñas de forma recursiva.


El framework fork/join implementa el algoritmo work-stealing, que dice que los threads de trabajo que quedan sin cosas que hacer pueden robar tareas de otros threads que aún están ocupados. Dicho de otra manera, dice que un thread que está a la espera la finalización de otros threads, busca threads que no han sido ejecutados y los ejecuta.

Básicamente hay dos operaciones, una para dividir una tarea en tareas más pequeñas "fork", y otra esperar a que las tareas finalicen "join".

La clase java.util.concurrent.ForkJoinPool implementa la interfaz ExecutorSevice y también implementa el algoritmo work-stealing.

La clase java.util.concurrent.ForkJoinTask implementa la interfaz Future, es una clase abstracta para las tareas que se ejecutan en el ForkJoinPool, provee el método fork() para organizar la ejecución asíncrona y el método join() para continuar hasta que el resultado de la tarea se ha calculado.

Ejemplo:
 public class TareaIncrementar extends RecursiveAction {      
   private int [] arreglo;  
   private int primer;  
   private int ultimo;  
   public TareaIncrementar(int [] arreglo, int primer, int ultimo) {  
     this. arreglo = arreglo;  
     this. primer = primer;  
     this. ultimo = ultimo;  
   }  
   protected void compute() {  
     if (ultimo - primer < 10) {  
       incrementa();  
       System.out.println("Completado de "+primer+", al: "+ultimo);  
     } else {  
       int medio = ( primer + ultimo) / 2;  
       System.out.println("Tareas pendientes: " + getQueuedTaskCount());  
       TareaIncrementar ti1 = new TareaIncrementar(arreglo, primer, medio + 1);  
       TareaIncrementar ti2 = new TareaIncrementar(arreglo, medio + 1, ultimo);  
       invokeAll(it1, it2); //Espera la finalizacion de las tareas  
     }  
   }  
   private void incrementa() {  
     for (int i = primer; i < ultimo; i++) {  
       arreglo[i] = ++arreglo[i];        
     }      
   }  
 }  
 ...............  
     int arreglo [] = new int [100];  
     Random random = new Random();  
     for (int i=0; i<arreglo .length; i++) {  
       arreglo [i] = random.nextInt(10000);  
     }      
     System.out.print("Arreglo: {");   
     for (int i=0; i<arreglo .length; i++) {      
       System.out.print(arreglo[i] + ", ");  
     }  
     System.out.print(" }");   
     TareaIncrementar tarea = new TareaIncrementar(arreglo, 0, arreglo.length);      
     ForkJoinPool pool = new ForkJoinPool();      
     pool.execute(tarea);      
     do {  
       System.out.println("Threads activas: " + pool.getActiveThreadCount());        
       try {  
         TimeUnit.MILLISECONDS.sleep(5);  
       } catch (InterruptedException e) {  
         e.printStackTrace();  
       }  
     } while (!tarea.isDone());      
     pool.shutdown();  
     if (task.isCompletedNormally()) {  
       System.out.println("El proceso se ha completado normalmente");  
     }  
     System.out.print("Arreglo incrementado: {");   
     for (int i=0; i<arreglo.length; i++) {      
       System.out.print(arreglo[i] + ", ");  
     }  
     System.out.print(" }");   
 ...............  

En el ejemplo la clase TareaIncrementar extiende a la clase java.util.concurrent.RecursiveAction, esta tarea implementa su lógica en el método compute(), esta incrementa el valor de los elementos de una arreglo de enteros. Pero sólo incrementa 10 elementos por tarea, si el tamaño del arreglo es más grande, crea dos subtareas y las ejecuta a través del método invokeAll() de la clase RecursiveAction. De esta manera, las tareas se llaman de forma recursiva.

La clase RecursiveAction extiende la clase ForkJoinTask, tiene el método invokeAll () que ejecuta subtareas y espera su finalización antes de continuar, mientras espera a las subtareas que terminen, el
“worker thread” toma otra tarea y la ejecuta.

El Fork/Join framework provee otra clase para devolver un valor de una tarea, la clase java.util.concurrent.RecursiveTask comporta igual que la clase RecursiveAction pero su método compute() devuelve un valor.

Ejemplo:
 public class TareaSumar extends RecursiveTask <Integer> {   
   private int [] arreglo;  
   private int primer;  
   private int ultimo;  
   public TareaSumar(int [] arreglo, int primer, int ultimo) {  
     this.arreglo = arreglo;  
     this.primer = primer;  
     this.ultimo = ultimo;  
   }       
   protected Integer compute() {  
     int suma = 0;  
     if (primer - ultimo < 10) {  
       suma = sumar();  
       System.out.println("Completado de "+primer+", al: "+ultimo);  
     } else {  
       int medio = ( primer + ultimo) / 2;  
       System.out.println("Tareas pendientes: " + getQueuedTaskCount());  
       TareaSumar ts1 = new TareaSumar(areglo, primer, ultimo + 1);  
       TareaSumar ts2 = new TareaSumar(areglo, medio + 1, ultimo);  
       invokeAll(ts1, ts2);        
       try {  
         sum += ts1.get();  
         sum += ts2.get();  
       } catch (Exception ex) {  
         ex.printStackTrace();  
       }         
     }      
     return suma;  
   }  
   private Integer sumar() {  
     int suma = 0;  
     for (int i = primer; i < ultimo; i++) {  
       suma += arreglo[i];        
     }      
     return suma;  
   }  
 }  
 ......................  
     int arreglo [] = new int [100];  
     Random random = new Random();  
     for (int i=0; i<arreglo.length; i++) {  
       arreglo[i] = random.nextInt(100);  
     }      
     System.out.print("Arreglo: {");   
     for (int i=0; i<arreglo.length; i++) {      
       System.out.print(arreglo[i] + ", ");  
     }  
     System.out.print(" }");   
     TareaSumar tarea = new TareaSumar(arreglo, 0, arreglo.length);      
     ForkJoinPool pool = new ForkJoinPool();      
     pool.execute(tarea);      
     do {  
        System.out.println("Threads activas: " + pool.getActiveThreadCount());        
       try {  
         TimeUnit.MILLISECONDS.sleep(5);  
       } catch (InterruptedException e) {  
         e.printStackTrace();  
       }  
     } while (!tarea.isDone());      
     pool.shutdown();  
     if (task.isCompletedNormally()) {  
        System.out.println("El proceso se ha completado normalmente");  
     }  
     try {   
       System.out.print("La suma del arreglo es:" + tarea.get());  
     } catch (Exception ex) {  
       ex.printStackTrace();;  
     }  
 ......................  

En el ejemplo, la clase TareaSumar extiende a la clase RecursiveTask con Integer como su tipo, TareaSumar añadirá todos los elementos de un arreglo. Sin embargo, sólo añade 10 elementos por tarea, si el tamaño del arreglo es mayor, entonces crea dos subtareas y las ejecuta a través del método invokeAll() de la clase RecursiveTask al igual que el ejemplo anterior, pero en contraste con el ejemplo anterior, cada tarea devuelve la suma de los 10 elementos, y si se crearon subtareas añade el resultado de las subtareas.
El valor de las subtareas se obtiene con el método get() de la interfaz Future.

Hasta ahora todas las tareas se han ejecutado sincrónicamente, cuando una tarea llama al método invokeAll() se espera a que las tareas enviadas a ejecutar a través de este método hayan termnado, con esto el algoritmo work-stealing se lleva a cabo, se asigna una nueva tarea al “worker thread” cuando la tarea está esperando.

Las tareas también se puede ejecutar asíncronamente con los métodos fork() y join() de la clase ForkJoinTask, el método de fork() ejecuta de forma asíncrona una subtarea continuando con la ejecución de la tarea, y el método join() espera a el resultado de la subtarea.
El uso de este mecanismo el algoritmo “work-stealing” no se puede implementar ya que la tarea continúa ejecutándose cuando se crea una subtarea.

Ejemplo:
 public class TareaSumarAsync extends RecursiveTask <Integer> {  
   private int [] arreglo;  
   private int primer;  
   private int ultimo;  
   public TareaSumarAsync(int [] arreglo, int primer, int ultimo) {  
     this.arreglo = arreglo;  
     this.primer = primer;  
     this.ultimo = ultimo;  
   }  
   protected Integer compute() {  
     int suma = 0;  
     if (primer - ultimo < 10) {  
       suma = sumar();  
       System.out.println("Completado de "+primer+", al: "+ultimo);  
     } else {  
       int medio = ( primer + ultimo) / 2;  
       System.out.println("Tareas pendientes: " + getQueuedTaskCount());  
       TareaSumarAsync tsa1 = new TareaSumarAsync(areglo, primer, ultimo + 1);  
       TareaSumarAsync tsa2 = new TareaSumarAsync(areglo, medio + 1, ultimo);  
       tsa1.fork();  
       tsa2.fork();//Continua con la ejecuion  
       sum += st1.join();//Espera al resultado  
       sum += st2.join();  
     }  
     return suma;  
   }  
   private Integer sumar() {  
     int suma = 0;  
     for (int i = primer; i < ultimo; i++) {  
       suma += arreglo[i];        
     }      
     return suma;  
   }  
 }  

En el ejemplo TareaSumarAsync ejecuta su subtareas de forma asíncrona utilizado los métodos fork() y join().