Este sitio utiliza cookies de Google para prestar sus servicios y analizar su tráfico. Tu dirección IP y user-agent se comparten con Google, junto con las métricas de rendimiento y de seguridad, para garantizar la calidad del servicio, generar estadísticas de uso y detectar y solucionar abusos.Más información

View site in english Ir a la página de inicio Contacta conmigo
martes, 19 de septiembre de 2017

Multitarea II, sincronización de tareas

Después de revisar las clases para implementar la multitarea básica, en este artículo voy a revisar diferentes mecanismos que permiten realizar una sincronización entre varias tareas, todos ellos definidos en el espacio de nombres System.Threading, con los que se puede organizar el trabajo cuando las interacciones entre tareas requieren de un orden determinado.

Si quieres empezar por el principio, este es el primer artículo de la serie sobre aplicaciones multitarea.

En este enlace puedes descargar el código fuente de la aplicación MultithreadingDemo con los ejemplos, escrita en csharp con Visual Studio 2015. La aplicación utiliza la versión 4.5 de .NET Framework.

En la mayoría de las aplicaciones multiproceso, no es suficiente con lanzar varias tareas en paralelo y esperar a que todas terminen, cada una por su lado. Muchas veces algunas de las tareas requieren que otra u otras hayan terminado su trabajo para comenzar a realizar el suyo. Para realizar esto, existen varios mecanismos estándar de sincronización que pueden utilizarse en los diferentes escenarios que pueden presentarse.

Pero el uso de estos mecanismos no sale gratis. Todos ellos resultan bastante costosos en tiempo de proceso, pudiendo llegar incluso a hacer preferible la ejecución secuencial a la paralela. Los ejemplos que voy a utilizar, además de servir para entender el funcionamiento de estos mecanismos, pretenden mostrar también su alto coste. Se trata, como en el artículo anterior, de procesos de cálculo que utilizan sobre todo tiempo de procesador. En realidad, también son ejemplos de cuándo no deben usarse estos mecanismos. Para acceder a ellos, se debe usar la opción Synchronize del menú Demo.

Opción Synchronize en MultithreadingDemo
La opción Synchronize

Vamos a empezar, como siempre, con un proceso secuencial. El primer ejemplo dibuja un atractor de Lorenz sin hacer uso para nada de las tareas. Se trata de un sistema de tres ecuaciones diferenciales para las coordenadas x, y y z del sistema, cada una de las cuales calcula el incremento de cada variable en el instante actual. Esto quiere decir que, para calcular uno cualquiera de los puntos, primero tenemos que haber calculado todos los anteriores, además, cada ecuación depende de todas o casi todas las variables, por lo que hay que calcularlas todas antes de incrementar los valores de las mismas. Los cálculos para cada variable son además bastante sencillos, por lo que se trata claramente de un proceso poco apropiado para el uso de varias tareas. Podemos lanzar este proceso usando el botón LA Single, este es el código que se va a ejecutar:

x = 1;
y = 1;
z = 1;
...
ix = 0;
while (ix < 1920000)
{
dx = 10 * (y - x);
dy = x * (17 - z) - y;
dz = x * y - z;
x += dt * dx;
y += dt * dy;
z += dt * dz;
xx = (int)(x * 16 + 320);
yy = 480 - (int)(z * 16);
color = 63 + ix / 10000;
_bitmap[xx + 640 * yy] = Color.FromArgb(color, color, 0).ToArgb();
ix++;
}

Y este es el resultado. Como siempre, a la derecha del botón aparece el tiempo total de proceso:

El atractor de Lorenz
El atractor de Lorenz

Puesto que el proceso consta primero del cálculo de los incrementos de las variables (dx, dy y dz) y, a continuación, realiza dicho incremento y calcula la proyección del espacio tridimensional en uno bidimensional para dibujar el punto, supongamos que se nos ocurre que podríamos separar el cálculo y la proyección en dos tareas separadas para que sea todavía más rápido. Para esto debemos definir dos tareas distintas, pero nos encontramos con el problema de que el cálculo se debe ejecutar siempre antes de la proyección, cuándo esté terminado, podemos realizar la proyección mientras se ejecuta el próximo cálculo.

El primer mecanismo que voy a mostrar es la clase CountdownEvent. Su funcionamiento es muy sencillo, se inicializa con un determinado valor entero, un proceso llama al método Wait, con lo que queda detenido, y uno o más procesos llaman al método Signal que decrementa el valor del contador en uno. Cuando el contador llega a cero, el proceso que estaba en espera puede continuar su ejecución. El método Reset devuelve el contador a su valor inicial. Con este mecanismo esperaremos a que la tarea que realiza los cálculos finalice.

Para detener los cálculos usaremos otro mecanismo, mediante la clase AutoResetEvent. Este mecanismo es aún más sencillo. Se inicializa el objeto con un valor booleano true o false, cuando una tarea llama al método WaitOne, quedará detenida si el valor es false, y continuará cuando pase a true. Esto causará que el valor vuelva a tomar automáticamente el valor false. Para poner el valor a true se utiliza el método Set. Con este mecanismo se puede liberar un solo proceso.

La tarea que debe realizar los cálculos, TaskDxyz, queda de la siguiente manera:

private void TaskDxyz()
{
while (ix < 1920000)
{
dx = 10 * (y - x);
dy = x * (17 - z) - y;
dz = x * y - z;
_cdEvent.Signal();
_arXYZ.WaitOne();
}
}

Al terminar los cálculos decrementa el contador del CountdownEvent, que en este caso vale uno, para indicar que ha terminado, y espera para poder continuar con el AutoResetEvent.

La tarea que realiza la proyección, TaskCalc, es la siguiente:

private void TaskCalc()
{
while (ix < 1920000)
{
_cdEvent.Wait();
x += dt * dx;
y += dt * dy;
z += dt * dz;
_cdEvent.Reset();
_arXYZ.Set();
xx = (int)(x * 16 + 320);
yy = 480 - (int)(z * 16);
color = 63 + ix / 10000;
_bitmap[xx + 640 * yy] = Color.FromArgb(color, color, 0).ToArgb();
ix++;
}
_arXYZ.Set();
}

Lo primero que hace es esperar al CountdownEvent, para asegurarse que el cálculo ha terminado, luego incrementa las coordenadas con el resultado, reinicia el contador y libera a la otra tarea, antes de realizar la proyección. En realidad, lo único que estamos haciendo es incrementar innecesariamente el tiempo de proceso, haciendo un uso excesivo de estos mecanismos, pero podemos comprobar que funcionan utilizando el botón LA Sync. Este es el código que inicializa los objetos de sincronización y lanza las tareas:

x = 1;
y = 1;
z = 1;
...
_arXYZ = new AutoResetEvent(false);
_cdEvent = new CountdownEvent(1);
List<Task> tasks = new List<Task>();
...
ix = 0;
tasks.Add(Task.Run(() => { TaskDxyz(); }));
tasks.Add(Task.Run(() => { TaskCalc(); }));
Task.WaitAll(tasks.ToArray());

Al ejecutarlo, obtendremos el mismo atractor que en el caso secuencial, pero tardará mucho más en ejecutarse. La primera lección a aprender es que no hay que hacer un uso excesivo de estos mecanismos, u obtendremos el efecto contrario al deseado, una aplicación mucho más lenta.

Así que vamos a probar con otro ejemplo que utilice muchas menos llamadas de este tipo y que además utilice más tareas simultáneamente. Para ello, primero generaremos una serie con 10000 valores, usando la ecuación logística, y luego realizaremos 64 cálculos de la información mutua a diferentes distancias y pintaremos la gráfica resultante. Para el que no sepa de lo que estoy hablando, simplemente vamos a realizar 64 cálculos matemáticos medianamente complejos utilizando ocho tareas simultáneas, que tendremos que ejecutar ocho veces hasta completar los 64.

Pero primero, realizaremos el proceso de forma secuencial para tomar tiempos. Este es el código que realiza los cálculos:

_points = new PointF[64];
...
Dictionary<Point, int> pairs = new Dictionary<Point, int>();
for (int off = 1; off <= 64; off++)
{
pairs.Clear();
int pos = 0;
while (pos < 10000 - off)
{
Point pair = new Point((int)(_timeSeries[pos] * 100),
(int)(_timeSeries[pos + off] * 100));
if (pairs.ContainsKey(pair))
{
pairs[pair]++;
}
else
{
pairs[pair] = 1;
}
pos++;
}
float p = 0f;
double dpos = pos;
foreach (KeyValuePair<Point, int> kv in pairs)
{
double pc = kv.Value / dpos;
double px = _histogram[kv.Key.X] / dpos;
double pxo = _histogram[kv.Key.Y] / dpos;
float pp = (float)Math.Round(pc * Math.Log(pc / (px * pxo), 2), 10);
p += pp;
}
_points[off - 1] = new PointF((off - 1) * 10, 480 - 70 * p);
}

Cada cálculo completo se realiza dentro del bucle for. Como los cálculos son independientes unos de otros y, además, es indiferente en qué orden los realicemos, resulta evidente que podemos utilizar simplemente un bucle Parallel.For si queremos ejecutarlos en paralelo. Pero vamos a ver qué ocurre si utilizamos ocho tareas sincronizadas en su lugar. Pulsando el botón MI Single podemos ver el resultado, esta vez una gráfica muy poco vistosa, pero que lleva algo más de tiempo calcular:

Gráfica de información mutua
Gráfico de información mutua en serie logística

En la versión multitarea, realizaremos el cálculo de cada punto dentro de una tarea, pero, como solo vamos a usar ocho, habrá que reutilizar cada una de ellas ocho veces. Voy a hacer esto de una forma muy artificiosa, solo para mostrar el uso de los mecanismos de sincronización, que, en este caso, son totalmente innecesarios.

En primer lugar, utilizaré de nuevo un objeto CountdownEvent para llevar la cuenta de los procesos que se han ejecutado, inicializado con un valor de ocho. Para detener y liberar a estos ocho procesos, utilizaré un mecanismo similar al AutoResetEvent, pero que permite liberar a varios procesos a la vez, pues no se reinicia de forma automática, con la clase ManualResetEvent. También representa una variable booleana, y se utiliza mediante los métodos WaitOne y Set de la misma manera que AutoResetEvent, pero es necesario reiniciarlo a false llamando al método Reset. Con el método Set se liberarán todos los procesos que hayan llamado a WaitOne, pero, si no llamamos a Reset para volver a poner el valor a false, estos procesos nunca más se pararán al volver a llamar a WaitOne, debido a que el objeto permanecerá siempre con el valor true.

Además de los ocho procesos de trabajo, existirá un proceso adicional que se deberá ejecutar cuando los ocho hayan terminado, para incrementar una variable que sirve de base para el cálculo y liberar de nuevo los procesos de cálculo, todos a la vez, con el objeto ManualResetEvent.

También utilizaremos otro objeto de sincronización, de la clase Barrier, cuyo funcionamiento es parecido al de la clase CountdounEvent. También se inicializa con un entero que actúa como contador. Los procesos llaman al método SignalAndWait, lo que produce que el contador se decremente en uno y el proceso quede en espera hasta que la cuenta llegue a cero. Con este objeto detectaremos el momento en que todos los procesos hayan terminado, de forma equivalente al uso del método Task.WaitAll.

Los procesos de trabajo comparten todos el mismo código, en el método MutualInformation, que recibe el parámetro off, con un valor entre 0 y 7, para que cada proceso calcule un índice diferente. Este es el código:

private void MutualInformation(int off)
{
Dictionary<Point, int> pairs = new Dictionary<Point, int>();
while (offbase <= 64 - off)
{
int pos = 0;
pairs.Clear();
while (pos < 10000 - (offbase + off))
{
Point pair = new Point((int)(_timeSeries[pos] * 100),
(int)(_timeSeries[pos + offbase + off] * 100));
if (pairs.ContainsKey(pair))
{
pairs[pair]++;
}
else
{
pairs[pair] = 1;
}
pos++;
}
float p = 0f;
double dpos = pos;
foreach (KeyValuePair<Point, int> kv in pairs)
{
double pc = kv.Value / dpos;
double px = _histogram[kv.Key.X] / dpos;
double pxo = _histogram[kv.Key.Y] / dpos;
float pp = (float)Math.Round(pc *
Math.Log(pc / (px * pxo), 2), 10);
p += pp;
}
_points[(offbase + off) - 1] =
new PointF(((offbase + off) - 1) * 10, 480 - 70 * p);
_cdEvent.Signal();
_mrMI.WaitOne();
_mrMI.Reset();
}
_barrier.SignalAndWait();
}

Al terminar los cálculos, el método Signal del CountdownEvent decrementa el contador en uno, a continuación, el proceso entra en espera con el método WaitOne del ManualResetEvent. Como al terminar la espera todos los procesos quedarán liberados, se llama inmediatamente al método Reset del ManualResetEvent para que los procesos se vuelvan a detener la siguiente vez.

El proceso de control es ControlTask, y su código es muy sencillo:

private void ControlTask()
{
while (offbase <= 64)
{
_cdEvent.Wait();
offbase += 8;
_cdEvent.Reset();
_mrMI.Set();
}
_barrier.SignalAndWait();
}

Lo primero que hace es llamar al método Wait del CountdownEvent, para esperar a que los ocho procesos de cálculo hayan terminado. Entonces, incrementa la variable global offbase con el índice base de los cálculos, vuelve a poner el contador de procesos a su valor inicial con el método Reset, y libera los procesos con el método Set del ManualResetEvent.

Con el botón MI Sync lanzamos todos estos procesos de la forma habitual, usando la clase Task. Primero inicializamos los objetos de sincronización, el CountdownEvent con un valor de 8, el ManualResetEvent con valor false y el objeto Barrier con una cuenta de 10, ocho para los procesos de trabajo, uno para la tarea de control y otro más para el propio controlador del botón:

List<Task> tasks = new List<Task>();
_mrMI = new ManualResetEvent(false);
_cdEvent = new CountdownEvent(8);
_barrier = new Barrier(10);
offbase = 1;
...
for (int ix = 0; ix < 8; ix++)
{
int off = ix;
tasks.Add(Task.Run(() => { MutualInformation(off); }));
}
tasks.Add(Task.Run(() => { ControlTask(); }));
_barrier.SignalAndWait();

Podemos ver que, aunque la diferencia de velocidad se ha reducido al usar menos llamadas a los mecanismos de sincronización, todavía lleva más tiempo que el proceso ejecutado de forma secuencial.

Podemos hacer otra versión de este cálculo utilizando el método ContinueWith de la clase Task. Este método hace que una tarea se ejecute inmediatamente después de que otra ha finalizado, recibiendo como parámetro el resultado devuelto por la tarea anterior. Utilizaremos una versión modificada de las tareas anteriores, ChainMutualInformation, que devuelve como resultado su parámetro off, de tipo entero. Como devuelve un valor int la clase que debemos utilizar para crearla ya no es Task, sino Task<int>. Seguiremos creando ocho tareas de trabajo, pero en lugar de utilizar una sola de control, usaremos una versión modificada, ChainControlTask, al terminar cada una de ellas. Esta tarea utiliza un array de ocho posiciones para determinar cuándo se han ejecutado las ocho tareas e incrementar entonces la variable offbase. Lo hago así porque si dos tareas ejecutan a la vez una operación de escritura sobre la misma posición de memoria, se producirá una excepción, y no quiero utilizar todavía mecanismos de acceso concurrente a variables, que será el tema del próximo artículo.

En el código de las tareas he eliminado por completo los mecanismos de sincronización, por lo demás, son iguales a las anteriores. Con el botón MI Chain ejecutaremos esta nueva versión del proceso:

_points = new PointF[64];
offbase = 1;
List<Task<int>> tasks1 = new List<Task<int>>();
List<Task> tasks2 = new List<Task>();
...
while (offbase < 65)
{
tasks1.Clear();
tasks2.Clear();
_bits = new int[8];
try
{
for (int ix = 0; ix < 8; ix++)
{
int off = ix;
tasks1.Add(Task.Run(
() => { return ChainMutualInformation(off); }));
tasks2.Add(tasks1[ix].ContinueWith(
(b) => { ChainControlTask(b.Result); }));
Task.WaitAll(tasks2.ToArray());
}
}
finally
{
for (int ts = 0; ts < tasks1.Count; ts++)
{
tasks1[ts].Dispose();
tasks2[ts].Dispose();
}
}
}

Como podéis ver, las tareas de cálculo se crean en grupos de ocho y todas tienen su continuación con el método ContinueWith, el parámetro b de la expresión es de la clase Task<int>, como la tarea precedente, por eso debemos obtener el valor de retorno con la propiedad Result, para pasarlo a la tarea de control. Con el método estático WaitAll de la clase Task esperamos a que todas hayan terminado y lanzamos la siguiente ronda. Al ejecutar este código, podemos ver que ahora la velocidad ha mejorado mucho, pero todavía es algo más lento que el proceso secuencial.

Para terminar, voy a mostrar la forma en que podemos cancelar una tarea una vez comenzada. Para ello, se utiliza la clase CancellationTokenSource. Al lanzar la tarea, se puede pasar como parámetro del método Run la propiedad Token de este objeto. Si la queremos cancelar, usaremos el método Cancel del objeto CancellationTokenSource, pero también deberemos comprobar dentro del código de la tarea esta petición de cancelación, y será la tarea la que decida si terminar y de qué manera lo hace. Para ello, la propiedad Token del objeto CancellationTokenSource tiene a su vez la propiedad booleana IsCancellationRequested.

Este es el código de la tarea que implementa este mecanismo, implementada en el método CancelTask:

private void CancelTask()
{
_bitmap = new int[640 * 480];
x = 1;
y = 1;
z = 1;
int ix = 0;
while (true)
{
if (_cancel.Token.IsCancellationRequested)
{
break;
}
dx = -y - z;
dy = x + 0.1 * y;
dz = 0.1 + z * (x - 14);
x += 0.00001 * dx;
y += 0.00001 * dy;
z += 0.00001 * dz;
xx = (int)((x * 8 + 320) + (0.7 * 8 * y));
yy = 360 - (int)((z * 8) + (0.7 * 8 * y));
color = 63 + ix / 10000;
if ((xx + 640 * yy >= 0) && (xx + 640 * yy < _bitmap.Length))
{
_bitmap[xx + 640 * yy] = Color.FromArgb(color, color, 0).ToArgb();
}
ix++;
if (ix > 1920000)
{
ix = 0;
}
}
}

Esta tarea calcula y dibuja un atractor similar al de Lorenz, el atractor de Rössler, pero utilizo un incremento de tiempo muy pequeño para que lo haga con bastante lentitud. Con el botón Launch podemos lanzar la tarea. En este caso, no esperaremos a que termine, pues no se detendrá hasta que volvamos a pulsar el botón, que habrá cambiado a Cancel, momento en el que se dibujará el resultado:

El atractor de Rossler
El atractor de Rossler

Este es el código del controlador del evento del botón:

private void bCancel_Click(object sender, EventArgs e)
{
if (bCancel.Text == "Launch")
{
_cancel = new CancellationTokenSource();
_cTask = Task.Run(() =>
{
CancelTask();
}, _cancel.Token);
bCancel.Text = "Cancel";
}
else
{
...
try
{
_cancel.Cancel();
bCancel.Text = "Launch";
Task.WaitAll(new Task[] { _cTask });
...
pbDrawing.Image = bmp;
}
catch (Exception ex)
{
...
MessageBox.Show(ex.Message);
}
finally
{
_cancel.Dispose();
_cancel = null;
_cTask.Dispose();
_cTask = null;
}
}
}

Y eso es todo por el momento. En el próximo artículo mostraré cómo tratar el acceso concurrente a los datos y otros recursos.

Comparte este artículo: Compartir en Twitter Compártelo en Facebook Compartir en Google Plus Compartir en LinkedIn
Comentarios (0):
* (Su comentario será publicado después de la revisión)

E-Mail


Nombre


Web


Mensaje


CAPTCHA
Change the CAPTCHA codeSpeak the CAPTCHA code