Saturday, 17 August 2013

Freeing up Resources in Completion of Java Future - Here: Reference to Callable and its outer Variables

Freeing up Resources in Completion of Java Future - Here: Reference to
Callable and its outer Variables

In my project I frequently work with concurrent task using Java Futures.
In one application, each concurrent task is requiring quite big chunk of
memory during its completion. Due to some other design choices, that
memory is created and referenced in object created outside the thread (see
more detailed example below).
To my surprise, the future holds a reference to this object, even after
the future task (i.e., its calculation thread) has been completed. That
is: if no other reference to this object is held elsewhere the object will
not be freed, unless the future is freed - even though the task has
completed.
My naive thinking was that limiting the number of concurrent threads will
automatically limit the number of resources (memory) held by the tasks.
THIS IS NOT TRUE!
Consider the code below. In this example I create some task during their
calculation an ArrayList grows in size. The method returns a
Vector<Future>. Even if the task have completed the Future holds a
reference to the arrays (via FutureTask.sync.callable.
To summarize:
The FutureTask holds a reference to the Callable, even if the Callable has
completed.
The Callable holds references to the final outer variables used during
calculation, even if the calculation has completed.
Question: What is the best way to free resources held via the Future? (Of
course, I know that local variables of the callable are released upon
thread completion - this is not what I am asking for).
/*
* (c) Copyright Christian P. Fries, Germany. All rights reserved.
Contact: email@christianfries.com.
*
* Created on 17.08.2013
*/
package net.finmath.experiments.concurrency;
import java.util.ArrayList;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* @author Christian Fries
*
*/
public class ConcurrencyTest {
private ExecutorService executor = Executors.newFixedThreadPool(10);
private int numberOfDoubles = 1024*1024/8; // 1 MB
private int numberOfModels = 100; // 100 * 1 MB
/**
* @param args
* @throws ExecutionException
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException,
ExecutionException {
ConcurrencyTest ct = new ConcurrencyTest();
ct.concurrencyTest();
}
/**
* @throws ExecutionException
* @throws InterruptedException
*/
public void concurrencyTest() throws InterruptedException,
ExecutionException {
Vector<Double> results = getResults();
Runtime.getRuntime().gc();
System.out.println("Allocated memory (only results): " +
(Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()));
}
private Vector<Double> getResults() throws InterruptedException,
ExecutionException {
Vector<Future<Double>> resultsFutures = getResultsConcurrently();
executor.shutdown();
executor.awaitTermination(1, TimeUnit.HOURS);
/*
* At this point, we expect that no reference to the models is held
* and the memory is freed.
* However, the Future still reference each "model".
*/
Runtime.getRuntime().gc();
System.out.println("Allocated memory (only futures): " +
(Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()));
Vector<Double> results = new Vector<Double>(resultsFutures.size());
for(int i=0; i<resultsFutures.size(); i++) {
results.add(i, resultsFutures.get(i).get());
}
return results;
}
private Vector<Future<Double>> getResultsConcurrently() {
/*
* At this point we allocate some model, which represents
* something our workers work on.
*/
Vector<ArrayList<Double>> models = new
Vector<ArrayList<Double>>(numberOfModels);
for(int i=0; i<numberOfModels; i++) {
models.add(i, new ArrayList<Double>());
}
/*
* Work on the models concurrently
*/
Vector<Future<Double>> results = calculateResults(models);
/*
* Return the futures.
* Note: We expect that no more reference is held to a model
* once we are running out scope of this function AND the
respective worker
* has completed.
*/
return results;
}
private Vector<Future<Double>>
calculateResults(Vector<ArrayList<Double>> models) {
Vector<Future<Double>> results = new
Vector<Future<Double>>(models.size());
for(int i=0; i<models.size(); i++) {
final ArrayList<Double> model = models.get(i);
final int modelNumber = i;
Callable<Double> worker = new Callable<Double>() {
public Double call() throws InterruptedException {
/*
* The models will perform some thread safe lazy init,
* which we simulate here, via the following line
*/
for(int j=0; j<numberOfDoubles; j++)
model.add(Math.random());
/*
* Now the worker starts working on the model
*/
double sum = 0.0;
for(Double value : model) sum += value.doubleValue();
Thread.sleep(1000);
Runtime.getRuntime().gc();
System.out.println("Model " + modelNumber + "
completed. Allocated memory: " +
(Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()));
return sum;
}
};
// The following line will add the result of the calculation
to the vector discreteProcessAtCurrentTimeIndex
results.add(i, executor.submit(worker));
}
return results;
}
}
Here is a screenshot of a debugger/profiler (this was done in another
example). The FutureTask has completed (as is obvious from the result).
However, the FutureTask holds a reference to the Callable. In this case
the Callable holds a reference to the outer final variable arguments which
contains some "big" object.

(This example is more real life. Here Obba Server works on a spreadsheet
using concurrent creation and processing of data - taken from a project of
mine).

No comments:

Post a Comment