Monday, June 3, 2013

Concurrencia en Java Parte 6

Esta es la sexta parte del tutorial, este link te lleva al  post anterior concurrencia en java parte-5

Fork/Join Framework

El framework fork/join es una implementación de la interfaz ExecutorService y utiliza un enfoque diferente, que resuelve las dividiéndolas en tareas más pequeñas de forma recursiva.


El framework fork/join implementa el algoritmo work-stealing, que dice que los threads de trabajo que quedan sin cosas que hacer pueden robar tareas de otros threads que aún están ocupados. Dicho de otra manera, dice que un thread que está a la espera la finalización de otros threads, busca threads que no han sido ejecutados y los ejecuta.

Básicamente hay dos operaciones, una para dividir una tarea en tareas más pequeñas "fork", y otra esperar a que las tareas finalicen "join".

La clase java.util.concurrent.ForkJoinPool implementa la interfaz ExecutorSevice y también implementa el algoritmo work-stealing.

La clase java.util.concurrent.ForkJoinTask implementa la interfaz Future, es una clase abstracta para las tareas que se ejecutan en el ForkJoinPool, provee el método fork() para organizar la ejecución asíncrona y el método join() para continuar hasta que el resultado de la tarea se ha calculado.

Ejemplo:
 public class TareaIncrementar extends RecursiveAction {      
   private int [] arreglo;  
   private int primer;  
   private int ultimo;  
   public TareaIncrementar(int [] arreglo, int primer, int ultimo) {  
     this. arreglo = arreglo;  
     this. primer = primer;  
     this. ultimo = ultimo;  
   }  
   protected void compute() {  
     if (ultimo - primer < 10) {  
       incrementa();  
       System.out.println("Completado de "+primer+", al: "+ultimo);  
     } else {  
       int medio = ( primer + ultimo) / 2;  
       System.out.println("Tareas pendientes: " + getQueuedTaskCount());  
       TareaIncrementar ti1 = new TareaIncrementar(arreglo, primer, medio + 1);  
       TareaIncrementar ti2 = new TareaIncrementar(arreglo, medio + 1, ultimo);  
       invokeAll(it1, it2); //Espera la finalizacion de las tareas  
     }  
   }  
   private void incrementa() {  
     for (int i = primer; i < ultimo; i++) {  
       arreglo[i] = ++arreglo[i];        
     }      
   }  
 }  
 ...............  
     int arreglo [] = new int [100];  
     Random random = new Random();  
     for (int i=0; i<arreglo .length; i++) {  
       arreglo [i] = random.nextInt(10000);  
     }      
     System.out.print("Arreglo: {");   
     for (int i=0; i<arreglo .length; i++) {      
       System.out.print(arreglo[i] + ", ");  
     }  
     System.out.print(" }");   
     TareaIncrementar tarea = new TareaIncrementar(arreglo, 0, arreglo.length);      
     ForkJoinPool pool = new ForkJoinPool();      
     pool.execute(tarea);      
     do {  
       System.out.println("Threads activas: " + pool.getActiveThreadCount());        
       try {  
         TimeUnit.MILLISECONDS.sleep(5);  
       } catch (InterruptedException e) {  
         e.printStackTrace();  
       }  
     } while (!tarea.isDone());      
     pool.shutdown();  
     if (task.isCompletedNormally()) {  
       System.out.println("El proceso se ha completado normalmente");  
     }  
     System.out.print("Arreglo incrementado: {");   
     for (int i=0; i<arreglo.length; i++) {      
       System.out.print(arreglo[i] + ", ");  
     }  
     System.out.print(" }");   
 ...............  

En el ejemplo la clase TareaIncrementar extiende a la clase java.util.concurrent.RecursiveAction, esta tarea implementa su lógica en el método compute(), esta incrementa el valor de los elementos de una arreglo de enteros. Pero sólo incrementa 10 elementos por tarea, si el tamaño del arreglo es más grande, crea dos subtareas y las ejecuta a través del método invokeAll() de la clase RecursiveAction. De esta manera, las tareas se llaman de forma recursiva.

La clase RecursiveAction extiende la clase ForkJoinTask, tiene el método invokeAll () que ejecuta subtareas y espera su finalización antes de continuar, mientras espera a las subtareas que terminen, el
“worker thread” toma otra tarea y la ejecuta.

El Fork/Join framework provee otra clase para devolver un valor de una tarea, la clase java.util.concurrent.RecursiveTask comporta igual que la clase RecursiveAction pero su método compute() devuelve un valor.

Ejemplo:
 public class TareaSumar extends RecursiveTask <Integer> {   
   private int [] arreglo;  
   private int primer;  
   private int ultimo;  
   public TareaSumar(int [] arreglo, int primer, int ultimo) {  
     this.arreglo = arreglo;  
     this.primer = primer;  
     this.ultimo = ultimo;  
   }       
   protected Integer compute() {  
     int suma = 0;  
     if (primer - ultimo < 10) {  
       suma = sumar();  
       System.out.println("Completado de "+primer+", al: "+ultimo);  
     } else {  
       int medio = ( primer + ultimo) / 2;  
       System.out.println("Tareas pendientes: " + getQueuedTaskCount());  
       TareaSumar ts1 = new TareaSumar(areglo, primer, ultimo + 1);  
       TareaSumar ts2 = new TareaSumar(areglo, medio + 1, ultimo);  
       invokeAll(ts1, ts2);        
       try {  
         sum += ts1.get();  
         sum += ts2.get();  
       } catch (Exception ex) {  
         ex.printStackTrace();  
       }         
     }      
     return suma;  
   }  
   private Integer sumar() {  
     int suma = 0;  
     for (int i = primer; i < ultimo; i++) {  
       suma += arreglo[i];        
     }      
     return suma;  
   }  
 }  
 ......................  
     int arreglo [] = new int [100];  
     Random random = new Random();  
     for (int i=0; i<arreglo.length; i++) {  
       arreglo[i] = random.nextInt(100);  
     }      
     System.out.print("Arreglo: {");   
     for (int i=0; i<arreglo.length; i++) {      
       System.out.print(arreglo[i] + ", ");  
     }  
     System.out.print(" }");   
     TareaSumar tarea = new TareaSumar(arreglo, 0, arreglo.length);      
     ForkJoinPool pool = new ForkJoinPool();      
     pool.execute(tarea);      
     do {  
        System.out.println("Threads activas: " + pool.getActiveThreadCount());        
       try {  
         TimeUnit.MILLISECONDS.sleep(5);  
       } catch (InterruptedException e) {  
         e.printStackTrace();  
       }  
     } while (!tarea.isDone());      
     pool.shutdown();  
     if (task.isCompletedNormally()) {  
        System.out.println("El proceso se ha completado normalmente");  
     }  
     try {   
       System.out.print("La suma del arreglo es:" + tarea.get());  
     } catch (Exception ex) {  
       ex.printStackTrace();;  
     }  
 ......................  

En el ejemplo, la clase TareaSumar extiende a la clase RecursiveTask con Integer como su tipo, TareaSumar añadirá todos los elementos de un arreglo. Sin embargo, sólo añade 10 elementos por tarea, si el tamaño del arreglo es mayor, entonces crea dos subtareas y las ejecuta a través del método invokeAll() de la clase RecursiveTask al igual que el ejemplo anterior, pero en contraste con el ejemplo anterior, cada tarea devuelve la suma de los 10 elementos, y si se crearon subtareas añade el resultado de las subtareas.
El valor de las subtareas se obtiene con el método get() de la interfaz Future.

Hasta ahora todas las tareas se han ejecutado sincrónicamente, cuando una tarea llama al método invokeAll() se espera a que las tareas enviadas a ejecutar a través de este método hayan termnado, con esto el algoritmo work-stealing se lleva a cabo, se asigna una nueva tarea al “worker thread” cuando la tarea está esperando.

Las tareas también se puede ejecutar asíncronamente con los métodos fork() y join() de la clase ForkJoinTask, el método de fork() ejecuta de forma asíncrona una subtarea continuando con la ejecución de la tarea, y el método join() espera a el resultado de la subtarea.
El uso de este mecanismo el algoritmo “work-stealing” no se puede implementar ya que la tarea continúa ejecutándose cuando se crea una subtarea.

Ejemplo:
 public class TareaSumarAsync extends RecursiveTask <Integer> {  
   private int [] arreglo;  
   private int primer;  
   private int ultimo;  
   public TareaSumarAsync(int [] arreglo, int primer, int ultimo) {  
     this.arreglo = arreglo;  
     this.primer = primer;  
     this.ultimo = ultimo;  
   }  
   protected Integer compute() {  
     int suma = 0;  
     if (primer - ultimo < 10) {  
       suma = sumar();  
       System.out.println("Completado de "+primer+", al: "+ultimo);  
     } else {  
       int medio = ( primer + ultimo) / 2;  
       System.out.println("Tareas pendientes: " + getQueuedTaskCount());  
       TareaSumarAsync tsa1 = new TareaSumarAsync(areglo, primer, ultimo + 1);  
       TareaSumarAsync tsa2 = new TareaSumarAsync(areglo, medio + 1, ultimo);  
       tsa1.fork();  
       tsa2.fork();//Continua con la ejecuion  
       sum += st1.join();//Espera al resultado  
       sum += st2.join();  
     }  
     return suma;  
   }  
   private Integer sumar() {  
     int suma = 0;  
     for (int i = primer; i < ultimo; i++) {  
       suma += arreglo[i];        
     }      
     return suma;  
   }  
 }  

En el ejemplo TareaSumarAsync ejecuta su subtareas de forma asíncrona utilizado los métodos fork() y join().

No comments:

Post a Comment