Monday, May 27, 2013

Concurrencia en Java Parte 5

Esta es la quinta parte del tutorial, este link te lleva al  post anterior concurrencia en java parte-4

Executor Framework

Cuando un programa ejecuta muchas tareas concurrentes, todo el código relacionado con los threads tiene que ser implementado, se tiene que crear un objeto por thread, ejecutar el thread, obtener sus resultados, y así sucesivamente. Esto puede traer algunos problemas tales como no administrar de manera eficiente los recursos de la máquina y afectar el rendimiento de la aplicación. Para aplicaciones de gran tamaño se necesita un mejor enfoque y el executor freamwork puede ayudar con esto.

El executor freamwork separa la tarea de creación del thread, la ejecución y su administración, encapsula la funcionalidad y mejora el rendimiento usando un “pool” de threads

La forma en que el executor freamwor trabaja es muy simple, sólo requiere de instancias de objetos de tipo Runnable o Callable y el se encarga del resto.

Ejemplo:
 public class Servidor {  
             private ThreadPoolExecutor executor;  
             public Servidor() {  
               executor = (ThreadPoolExecutor)Executors.newCachedThreadPool();//Crea el objeto executor o un pool de threads  
             }  
             public void ejecutaTarea(Task tarea) {  
                    executor.execute(tarea);// Ejecuta una tarea 
             }  
           public void terminaServidor () {  
                    executor.shutdown();// Este metodo termina el executor  
             }  
 }  
 public class Tarea implements Runnable {  
   private String nombre;  
   public Task(String nombre) {      
     this.nombre = nombre;  
   }  
   public void run() {  
     System.out.println(Thread.currentThread().getName() + ", creado: " + new Date());  
     try {  
       TimeUnit.SECONDS.sleep((long) (Math.random() * 10));  
     } catch (InterruptedException e) {  
       e.printStackTrace();  
     }  
     System.out.println(Thread.currentThread().getName() + ", terminado: " + new Date());       
   }  
 }  
 ...........  
     Servidor servidor = new Servidor();  
     for (int i = 0; i < 10; i++) {  
       Tarea tarea = new Tarea("Tarea " + i);  
       servidor.ejecutaTarea(tarea);  
     }  
     servidor.endServer();  
 ...........  

En el ejemplo anterior, la clase Executors creo un objeto de tipo java.util.concurrent.ThreadPoolExecutor, esta clase es una implementación de la interfaz java.util.concurrent.ExecutorService. Aunque ThreadPoolExecutor puede ser creado usando directamente sus constructores, se recomienda utilizar la clase Executors.

El ThreadPoolExecutor utiliza el método execute() para ejecutar un Runnable o Callable. Tambien tiene otros métodos como getPoolSize(), getCompleteTaskCount () para obtener el estado del pool.
El ThreadPoolExecutor tiene que ser terminada explícitamente llamando al método endServer(), de lo contrario no termina y el programa nunca terminará.

Para evitar saturar la aplicación y provocar un mal rendimiento, la clase Executors tiene el método newFixedThreadPool(int nthreads) que crea un executor de threads de tamaño fijo. Este executor tiene un número máximo de threads indicados por el parametrio nthreads, y como dice el java api "En cualquier momento, como máximo un numero “nthreads ” de threads estarán activos ejecutando tareas. Si tareas adicionales son enviadas cuando todos los threads estén activos, van a tener que esperar en la cola hasta que un thread este disponible. Si cualquier thread termina debido a un error durante la ejecución antes de terminar el executor, uno nuevo tomará su lugar si es necesario para ejecutar tareas posteriores ".

Ejemplo:
   ......  
   public Servidor() {      
     executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);  
   }  
   ......  

En el ejemplo se crea un executor con un máximo de 5 threads, esto significa que si más de 5 tareas se envían a ejecutar sólo 5 se ejecutaran y el resto se bloqueará hasta que haya un thread libre para procesarlos.

Tareas que regresan un valor

El executor framework puede ejecutar tareas que devuelven un valor, esta es otra ventaja de utilizar este framework. Para este mecanismo se utiliza la interfaz java.util.concurrent.Callable, en lugar de tener un método run(), se hace una llamada al call() que devuelve cualquier tipo de objeto que es especificado de forma genérica:

                  public interface Callable<V> {
                       V call() throws Exception;
                  }

El ExecutorService tiene el método submit() que acepta objetos de tipo Callable y los ejecuta, este método devuelve un objeto del tipo java.util.concurrent.Future, la interfaz Future tiene métodos para obtener el resultado generado por el objeto Callable.

Ejemplo:
 public class CalculadorMultiplicacion implements Callable<Integer> {  
   private int operador1;  
   private int operador2;  
   public CalculadorMultiplicacion(int operador1, int operador2) {  
     this.operador1 = operador1;  
     this.operador2 = operador2;  
   }  
   public Integer call() throws Exception {  
     return operador1 * operador2;  
   }  
 }  
 .......................  
 ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);//Maximo 2 threads   
  List<Future<Integer>> listaResultados = new ArrayList<Future<Integer>>();  
  for (int i = 0; i < 10; i++) {  
       CalculadorMultiplicacion calculador = new CalculadorMultiplicacion((int)(Math.random()*10), (int)(Math.random()*10));  
       Future<Integer> resultado = executor.submit(calculador);  
       listaResultados.add(resultado);  
     }  
     while (executor.getCompletedTaskCount() < listaResultados.size()) {  
         try {  
         Thread.sleep(50);  
       } catch (InterruptedException e) {  
         e.printStackTrace();  
       }  
     }//Espera a que las tareas se completen  
     for (int i = 0; i < listaResultados.size(); i++) {  
       Future<Integer> resultado = listaResultados.get(i);  
       try {        
         System.out.println("El resultado de la tarea "+i+ " es:" + resultado.get());  
       } catch (Exception e) {  
         e.printStackTrace();  
       }   
     }  
 .......................  

En el ejemplo anterior, los objetos Callable realizan una operación de multiplicación se envían al executor utilizando el método de submit(), el programa espera hasta que todas las tareas hayan acabado verificando con el método del executor getCompletedTaskCount (), una vez que terminen los resultados se obtienen con el método get() del objeto Future.

El ExecutorService tiene el método invokeAny(tareas) que recibe una colección de tareas luego las ejecuta y devuelve el resultado de la primer tarea que termina sin lanzar una excepción, las tareas que no han finalizado se cancelan.

Un ejemplo en el que se podría utilizar este método podría ser para buscar servicios, una aplicación que quiere encontrar una conexión de base de datos en servidores diferentes, la primera tarea que encuentre disponible el servicio es el que se va a utilizar, las demás tareas son ignoradas.

El ejemplo anterior implementado con el método invokeAny () quedaría de la siguiente forma:
 .................  
 ExecutorService executor = (ExecutorService) Executors.newCachedThreadPool();  
     List<CalculadorMultiplicacion> listaTareas= new ArrayList<CalculadorMultiplicacion>();  
     for (int i = 0; i < 10; i++) {  
       CalculadorMultiplicacion calculador = new CalculadorMultiplicacion((int)(Math.random()*10), (int)(Math.random()*10));  
       listaTareas.add(calculador);  
     }  
     try {  
        Integer resultado = executor.invokeAny(listaTareas);  
        System.out.println("El resultado de la primer tarea en terminar es:" + resultado);  
     } catch (Exception e) {e.printStackTrace();}  
     //Termina el Executor  
     executor.shutdown();  
  .............  

El ExecutorService tiene otro mecanismo para ejecutar múltiples tareas y procesar el resultado de todas las tareas, el método invokeAll (tareas) recibe una colección de tareas, los ejecuta y devuelve una lista de objetos de tipo Future.

Ejemplo:
 ExecutorService executor = (ExecutorService) Executors.newCachedThreadPool();
     List<CalculadorMultiplicacion> listaTareas = new ArrayList<CalculadorMultiplicacion>();
     for (int i = 0; i < 10; i++) {
       CalculadorMultiplicacion calculador = new CalculadorMultiplicacion((int)(Math.random()*10), (int)(Math.random()*10));
       listaTareas.add(calculador);
     }    
     List<Future<Integer>> listaResultados = null;      
     try {
        listaResultados = executor.invokeAll(listaTareas);      
     } catch (Exception e) {e.printStackTrace();}    
     executor.shutdown();
     for (int i = 0; i < listaResultados.size(); i++) {
       Future<Integer> resultado = listaResultados.get(i);
       try {      
         System.out.println("El resultado de la tarea "+i+ " es:" + resultado.get());
       } catch (Exception e) {
         e.printStackTrace();
       }
     }

En el ejemplo en lugar de enviar cada tarea para el executor con el método submit(), todas las tareas se agrupan en una lista y se enviar para ejecutar a través de el método invokeAll().

Tareas programadas

La clase Executores puede crear un “pool” que programa las tareas después de un cierto tiempo, o las ejecuta periódicamente. Este “pool” implementa la interfaz java.util.concurrent.ScheduledExecutorService.

Ejemplo:
 ScheduledExecutorService executor=(ScheduledExecutorService)Executors.newScheduledThreadPool(1);  
 List<Future<Integer>> listaResultados = new ArrayList<Future<Integer>>();  
 for (int i=0; i<5; i++) {  
     CalculadorMultiplicacion calculador = new CalculadorMultiplicacion((int)(Math.random()*10), (int)(Math.random()*10));  
     Future resultado<Integer> = executor.schedule(calculacor,i+1 , TimeUnit.SECONDS);  
     listaResultados.add(resultado);  
 }  
 executor.shutdown();  
 //Espera la terminación del executor  
 try {  
      executor.awaitTermination(1, TimeUnit.DAYS);  
 } catch (InterruptedException e) {  
      e.printStackTrace();  
 }  
 ..............  

En el ejemplo se crea un “pool” programado con un tamaño de 1, cada thread es programado utilizando el método schedule(), este método recibe como parámetros la tarea a ejecutar, el período de tiempo de espera antes de la ejecución y la unidad de tiempo .
El executor utiliza el método awaitTermination() que bloquea hasta que todas las tareas se han completado o pase el tiempo de espera .

Tareas rechazadas

Si una tarea se enviá al executor entre la llamada al método shutdown() y el final de su ejecución, la tarea se rechaza. El ejecutor proporciona un mecanismo para manejar esto, sólo requiere una instancia de un objeto que implementa la interfaz java.util.concurrent.RejectedExecutionHandler.

Example:
 public class RejectedTaskController implements RejectedExecutionHandler {  
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {  
     System.out.println("La tarea ha sido rechazada");     
   }  
 }  
 ........  
 RejectedTaskController controller = new RejectedTaskController();      
 ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();  
 executor.setRejectedExecutionHandler(controller);  
 ........  


Cuando se rechaza una tarea el método rejecedExecution() de la instancia de RejectedExecutionHandler es llamado.

Ir a parte 6

3 comments:

  1. Muy bueno tu tutorial, me sirve bastante para aclarar conceptos
    Saludos

    ReplyDelete
  2. el mejor tutorial que he encontrado en la web de este tema en español, muchísimas gracias

    ReplyDelete