Fork/Join
Framework
El framework fork/join es una implementación de la interfaz
ExecutorService y utiliza un enfoque diferente, que resuelve las
dividiéndolas en tareas más pequeñas de forma recursiva.
El
framework fork/join
implementa el algoritmo work-stealing, que dice que los threads de
trabajo que quedan sin cosas que hacer pueden robar tareas de otros
threads que aún están ocupados. Dicho de otra manera, dice que un
thread que está a la espera la finalización de otros threads, busca
threads que no han sido ejecutados y los ejecuta.
Básicamente
hay dos operaciones, una para dividir una tarea en tareas más
pequeñas "fork",
y otra esperar a que las tareas finalicen "join".
La
clase java.util.concurrent.ForkJoinPool
implementa la interfaz ExecutorSevice y también implementa el
algoritmo work-stealing.
La
clase java.util.concurrent.ForkJoinTask
implementa la interfaz Future,
es una clase abstracta para las tareas que se ejecutan en el
ForkJoinPool, provee
el método fork()
para organizar la ejecución asíncrona y el método join()
para continuar hasta que el resultado de la tarea se ha calculado.
Ejemplo:
public class TareaIncrementar extends RecursiveAction {
private int [] arreglo;
private int primer;
private int ultimo;
public TareaIncrementar(int [] arreglo, int primer, int ultimo) {
this. arreglo = arreglo;
this. primer = primer;
this. ultimo = ultimo;
}
protected void compute() {
if (ultimo - primer < 10) {
incrementa();
System.out.println("Completado de "+primer+", al: "+ultimo);
} else {
int medio = ( primer + ultimo) / 2;
System.out.println("Tareas pendientes: " + getQueuedTaskCount());
TareaIncrementar ti1 = new TareaIncrementar(arreglo, primer, medio + 1);
TareaIncrementar ti2 = new TareaIncrementar(arreglo, medio + 1, ultimo);
invokeAll(it1, it2); //Espera la finalizacion de las tareas
}
}
private void incrementa() {
for (int i = primer; i < ultimo; i++) {
arreglo[i] = ++arreglo[i];
}
}
}
...............
int arreglo [] = new int [100];
Random random = new Random();
for (int i=0; i<arreglo .length; i++) {
arreglo [i] = random.nextInt(10000);
}
System.out.print("Arreglo: {");
for (int i=0; i<arreglo .length; i++) {
System.out.print(arreglo[i] + ", ");
}
System.out.print(" }");
TareaIncrementar tarea = new TareaIncrementar(arreglo, 0, arreglo.length);
ForkJoinPool pool = new ForkJoinPool();
pool.execute(tarea);
do {
System.out.println("Threads activas: " + pool.getActiveThreadCount());
try {
TimeUnit.MILLISECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (!tarea.isDone());
pool.shutdown();
if (task.isCompletedNormally()) {
System.out.println("El proceso se ha completado normalmente");
}
System.out.print("Arreglo incrementado: {");
for (int i=0; i<arreglo.length; i++) {
System.out.print(arreglo[i] + ", ");
}
System.out.print(" }");
...............
En el ejemplo la clase TareaIncrementar extiende a la clase
java.util.concurrent.RecursiveAction, esta tarea implementa su
lógica en el método compute(), esta incrementa el
valor de los elementos de una arreglo de enteros. Pero sólo
incrementa 10 elementos por tarea, si el tamaño del arreglo es más
grande, crea dos subtareas y las ejecuta a través del método
invokeAll() de la clase RecursiveAction. De esta manera, las
tareas se llaman de forma recursiva.
La clase RecursiveAction extiende la clase ForkJoinTask, tiene
el método invokeAll () que ejecuta subtareas y espera su
finalización antes de continuar, mientras espera a las subtareas que
terminen, el
“worker thread” toma otra tarea y la ejecuta.
El Fork/Join framework provee otra clase para devolver un valor de
una tarea, la clase java.util.concurrent.RecursiveTask
comporta igual que la clase RecursiveAction pero su método
compute() devuelve un valor.
Ejemplo:
public class TareaSumar extends RecursiveTask <Integer> {
private int [] arreglo;
private int primer;
private int ultimo;
public TareaSumar(int [] arreglo, int primer, int ultimo) {
this.arreglo = arreglo;
this.primer = primer;
this.ultimo = ultimo;
}
protected Integer compute() {
int suma = 0;
if (primer - ultimo < 10) {
suma = sumar();
System.out.println("Completado de "+primer+", al: "+ultimo);
} else {
int medio = ( primer + ultimo) / 2;
System.out.println("Tareas pendientes: " + getQueuedTaskCount());
TareaSumar ts1 = new TareaSumar(areglo, primer, ultimo + 1);
TareaSumar ts2 = new TareaSumar(areglo, medio + 1, ultimo);
invokeAll(ts1, ts2);
try {
sum += ts1.get();
sum += ts2.get();
} catch (Exception ex) {
ex.printStackTrace();
}
}
return suma;
}
private Integer sumar() {
int suma = 0;
for (int i = primer; i < ultimo; i++) {
suma += arreglo[i];
}
return suma;
}
}
......................
int arreglo [] = new int [100];
Random random = new Random();
for (int i=0; i<arreglo.length; i++) {
arreglo[i] = random.nextInt(100);
}
System.out.print("Arreglo: {");
for (int i=0; i<arreglo.length; i++) {
System.out.print(arreglo[i] + ", ");
}
System.out.print(" }");
TareaSumar tarea = new TareaSumar(arreglo, 0, arreglo.length);
ForkJoinPool pool = new ForkJoinPool();
pool.execute(tarea);
do {
System.out.println("Threads activas: " + pool.getActiveThreadCount());
try {
TimeUnit.MILLISECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (!tarea.isDone());
pool.shutdown();
if (task.isCompletedNormally()) {
System.out.println("El proceso se ha completado normalmente");
}
try {
System.out.print("La suma del arreglo es:" + tarea.get());
} catch (Exception ex) {
ex.printStackTrace();;
}
......................
En el ejemplo, la clase TareaSumar extiende a la clase RecursiveTask
con Integer como su tipo, TareaSumar añadirá todos los elementos de
un arreglo. Sin embargo, sólo añade 10 elementos por tarea, si el
tamaño del arreglo es mayor, entonces crea dos subtareas y las
ejecuta a través del método invokeAll() de la clase
RecursiveTask al igual que el ejemplo anterior, pero en contraste con
el ejemplo anterior, cada tarea devuelve la suma de los 10
elementos, y si se crearon subtareas añade el resultado de las
subtareas.
El valor de las subtareas se obtiene con el método get() de
la interfaz Future.
Hasta ahora todas las tareas se han ejecutado sincrónicamente,
cuando una tarea llama al método invokeAll() se espera a que las
tareas enviadas a ejecutar a través de este método hayan termnado,
con esto el algoritmo work-stealing se lleva a cabo, se asigna una
nueva tarea al “worker thread” cuando la tarea está esperando.
Las tareas también se puede ejecutar asíncronamente con los métodos
fork() y join() de la clase ForkJoinTask,
el método de fork() ejecuta de forma asíncrona una subtarea
continuando con la ejecución de la tarea, y el método join() espera
a el resultado de la subtarea.
El uso de este mecanismo el algoritmo “work-stealing” no se puede
implementar ya que la tarea continúa ejecutándose cuando se crea
una subtarea.
Ejemplo:
public class TareaSumarAsync extends RecursiveTask <Integer> {
private int [] arreglo;
private int primer;
private int ultimo;
public TareaSumarAsync(int [] arreglo, int primer, int ultimo) {
this.arreglo = arreglo;
this.primer = primer;
this.ultimo = ultimo;
}
protected Integer compute() {
int suma = 0;
if (primer - ultimo < 10) {
suma = sumar();
System.out.println("Completado de "+primer+", al: "+ultimo);
} else {
int medio = ( primer + ultimo) / 2;
System.out.println("Tareas pendientes: " + getQueuedTaskCount());
TareaSumarAsync tsa1 = new TareaSumarAsync(areglo, primer, ultimo + 1);
TareaSumarAsync tsa2 = new TareaSumarAsync(areglo, medio + 1, ultimo);
tsa1.fork();
tsa2.fork();//Continua con la ejecuion
sum += st1.join();//Espera al resultado
sum += st2.join();
}
return suma;
}
private Integer sumar() {
int suma = 0;
for (int i = primer; i < ultimo; i++) {
suma += arreglo[i];
}
return suma;
}
}
No comments:
Post a Comment