A bounded thread pool allows the the programmer to specify the upper limit on the number of threads that can execute in a thread pool at a particular time. Tasks that depend on the completion of other tasks should not be executed in the same bounded thread pool.
A form of deadlock called _thread starvation deadlock_ arises when all the threads executing in the pool are blocked on tasks that have not yet begun execution and are waiting on an internal queue. Thread starvation deadlock occurs when currently-executing tasks submit other tasks to the same thread pool and wait for them to complete, but the thread pool does not have the capacity to accommodate all the tasks at once.
This problem is deceptive because the program may appear to function correctly when fewer threads are needed. In some cases, the issue can be mitigated by choosing a larger pool size, however, there is often no easy way to determine a suitable size.
Similarly, threads in a thread pool may not be recycled if two executing tasks require each other to complete before they can terminate. A blocking operation within a sub-task can also lead to unbounded queue growth \[[Goetz 06|AA. Java References#Goetz 06]\].
h2. Noncompliant Code Example (interdependent sub-tasks)
This noncompliant code example is vulnerable to a thread starvation deadlock. It consists of class {{ValidationService}} which performs various input validation tasks such as checking whether a user-supplied field exists in a back-end database.
The {{fieldAggregator()}} method accepts a variable number of {{String}} arguments and creates a task corresponding to each argument to gain some speedup. The task performs input validation using class {{ValidateInput}}.
The class {{ValidateInput}} in turn, attempts to sanitize the input by creating a sub-task for each request using class {{SanitizeInput}}. All tasks are executed in the same thread pool. The method {{fieldAggregator()}} blocks until all the tasks have finished executing. When all results are available, it aggregates the processed inputs as a {{StringBuilder}} and returns it to the caller.
{code:bgColor=#FFCCCC}
public final class ValidationService {
private final ExecutorService pool;
public ValidationService(int poolSize) {
pool = Executors.newFixedThreadPool(poolSize);
}
public void shutdown() {
pool.shutdown();
}
public StringBuilder fieldAggregator(String... inputs)
throws InterruptedException, ExecutionException {
StringBuilder sb = new StringBuilder();
Future<String>[] results = new Future[inputs.length]; // Stores the results
for(int i = 0; i < inputs.length; i++) { // Submits the tasks to thread pool
results[i] = pool.submit(new ValidateInput<String>(inputs[i], pool));
}
for(int i = 0; i < inputs.length; i++) { // Aggregates the results
sb.append(results[i].get());
}
return sb;
}
}
public final class ValidateInput<V> implements Callable<V> {
private final V input;
private final ExecutorService pool;
ValidateInput(V input, ExecutorService pool) {
this.input = input;
this.pool = pool;
}
@Override public V call() throws Exception {
// If validation fails, throw an exception here
Future<V> future = pool.submit(new SanitizeInput<V>(input)); // Sub-task
return (V)future.get();
}
}
public final class SanitizeInput<V> implements Callable<V> {
private final V input;
SanitizeInput(V input) {
this.input = input;
}
@Override public V call() throws Exception {
// Sanitize input and return
return (V)input;
}
}
{code}
{mc}
// Hidden main() method
public static void main(String[] args) throws InterruptedException, ExecutionException {
ValidationService vs = new ValidationService(5);
System.out.println(vs.fieldAggregator("field1", "field2","field3","field4", "field5","field6"));
vs.shutdown();
}
{mc}
Assume that the caller sets the thread pool size as 6. When {{ValidationService.fieldAggregator()}} is invoked with six arguments that are required to be validated, six tasks are submitted to the thread pool. Six more sub-tasks corresponding to {{SanitizeInput}} must also execute before these threads can return their results. However, this is not possible because the queue is full with all threads blocked. Furthermore, invoking the {{shutdown()}} method does not shutdown the thread pool when it contains active tasks.
This situation can also occur when using single threaded Executors, for example, when the caller creates several sub-tasks and waits for the results.
h2. Compliant Solution (no interdependent tasks)
This compliant solution refactors the {{ValidateInput<V>}} class so that the tasks corresponding to {{SanitizeInput}} are not executed as distinct threads in the thread pool but in the same threads as the {{ValidateInput}} tasks. Consequently, the {{ValidateInput}} and {{SanitizeInput}} tasks are independent of each other, and need not wait for each other to complete. Also, {{SanitizeInput}} can be refactored to not implement {{Callable}}.
{code:bgColor=#ccccff}
public final class ValidationService {
// ...
public StringBuilder fieldAggregator(String... inputs)
throws InterruptedException, ExecutionException {
// ...
for (int i = 0; i < inputs.length; i++) {
results[i] = pool.submit(new ValidateInput<String>(inputs[i])); // Don't pass-in thread pool
}
// ...
}
}
public final class ValidateInput<V> implements Callable<V> { // Does not use same thread pool
private final V input;
ValidateInput(V input) {
this.input = input;
}
@Override public V call() throws Exception {
// If validation fails, throw an exception here
return (V) new SanitizeInput().sanitize(input);
}
}
public final class SanitizeInput<V> { // No longer a Callable task
public SanitizeInput() {}
public V sanitize(V input) {
// Sanitize input and return
return input;
}
}
{code}
Always submit independent tasks to the {{Executor}}. Thread starvation issues can be mitigated by choosing a large thread pool size, however, the limitations of the application when using this approach should be clearly documented.
Note that operations that have further constraints, such as the total number of database connections or total {{ResultSet}} objects open at a particular time, impose an upper bound on the thread pool size as each thread continues to block until the resource becomes available. The other rules of fair concurrency, such as not running time consuming tasks, also apply.
Sometimes, a {{private static}} {{ThreadLocal}} variable is used per thread to maintain local state. When using thread pools, {{ThreadLocal}} variables should be used only if their lifetime is bounded by the corresponding task \[[Goetz 06|AA. Java References#Goetz 06]\]. Such variables should also not be used as a communication mechanism between tasks. Furthermore, the implementation should be compliant with [CON33-J. Ensure ThreadLocal variables are reinitialized when using thread pools].
h2. Compliant Solution (Unbounded thread pool)
This compliant solution uses a cached thread pool, which dynamically creates new threads as needed and prevents deadlock. However, this implementation may result in resource exhaustion and should not be used in front-end or critical production systems.
{mc} Might need some sort of time out metric otherwise wouldn't this lead to a DoS? {mc}
{code:bgColor=#ccccff}
public final class ValidationService {
private final ExecutorService pool;
public ValidationService(int poolSize) {
pool = Executors.newCachedThreadPool();
}
// ...
}
{code}
The {{Executors.newCachedThreadPool()}} method does the following: \[[API 06|AA. Java References#API 06]\]
{quote}
Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available. These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks. Calls to execute will reuse previously constructed threads if available. If no existing thread is available, a new thread will be created and added to the pool. Threads that have not been used for sixty seconds are terminated and removed from the cache. Thus, a pool that remains idle for long enough will not consume any resources.
{quote}
h2. Noncompliant Code Example (sub-tasks at multiple levels)
This noncompliant code example (based on \[[Gafter 06|AA. Java References#Gafter 06]\]) shows a {{BrowserManager}} class that has several methods that use a fork-join mechanism, that is, they start threads and wait for them to finish. The methods are called in the sequence {{perUser()}}, {{perProfile}} and {{perTab()}}. The method {{methodInvoker()}} spawns several instances of the specified {{Runnable}} object depending on the value of the variable {{numberOfTimes}}. One fixed sized thread pool is used to execute the enumerations of tasks created at different levels.
{code:bgColor=#FFCCCC}
public final class BrowserManager {
private final ExecutorService pool = Executors.newFixedThreadPool(10);
private final int numberOfTimes;
private static AtomicInteger count = new AtomicInteger(); // count = 0
public BrowserManager(int n) {
numberOfTimes = n;
}
public void perUser() {
methodInvoker(numberOfTimes, "perProfile");
pool.shutdown();
}
public void perProfile() {
methodInvoker(numberOfTimes, "perTab");
}
public void perTab() {
methodInvoker(numberOfTimes, "doSomething");
}
public void doSomething() {
System.out.println(count.getAndIncrement());
}
public void methodInvoker(int n, final String method) {
final BrowserManager manager = this;
Callable<Object> callable = new Callable<Object>() {
@Override public Object call() throws Exception {
Method meth = manager.getClass().getMethod(method);
return meth.invoke(manager);
}
};
Collection<Callable<Object>> collection = Collections.nCopies(n, callable);
try {
Collection<Future<Object>> futures = pool.invokeAll(collection);
} catch (InterruptedException e) {
// Forward to handler
Thread.currentThread().interrupt(); // Reset interrupted status
}
// ...
}
public static void main(String[] args) {
BrowserManager manager = new BrowserManager(5);
manager.perUser();
}
}
{code}
Contrary to what is expected, this program does not print the total count, that is, the number of times {{doSomething()}} is invoked. This is because it is susceptible to a thread starvation deadlock because the size of the thread pool (10) does not allow either thread from {{perTab()}} to invoke the {{doSomething()}} method. The output of the program varies for different values of {{numberOfTimes}} and the thread pool size. Note that different threads are allowed to invoke {{doSomething()}} in different orders; we are concerned only with the maximum value of {{count}} to determine how many times the method executed.
h2. Compliant Solution ({{CallerRunsPolicy}})
{mc}To prevent thread starvation, every level (worker) must have a double ended queue where all sub-tasks are queued \[[Goetz 06|AA. Java References#Goetz 06]\]. Each level removes the most recently generated sub-task from the queue so that it can process it. When there are no more threads left to process, the current level runs the least-recently created sub-task of another level by picking and removing it from that level's queue (work stealing). {mc}
This compliant solution selects and schedules tasks for execution, and consequently avoids the thread starvation deadlock. It sets the {{CallerRunsPolicy}} on a {{ThreadPoolExecutor}}, and uses a {{SynchronousQueue}} \[[Gafter 06|AA. Java References#Gafter 06]\]. The policy dictates that if the thread pool runs out of available threads, any subsequent tasks will run in the thread that submitted the tasks.
{code:bgColor=#ccccff}
public final class BrowserManager {
private final static ThreadPoolExecutor pool =
new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
private final int numberOfTimes;
private static AtomicInteger count = new AtomicInteger(); // count = 0
static {
pool.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy());
}
// ...
}
{code}
According to Goetz et al. \[[Goetz 06|AA. Java References#Goetz 06]\]:
{quote}
A {{SynchronousQueue}} is not really a queue at all, but a mechanism for managing handoffs between threads. In order to put an element on the {{SynchronousQueue}}, another thread must already be waiting to accept the handoff. It no thread is waiting but the current pool size is less than the maximum, {{ThreadPoolExecutor}} creates a new thread; otherwise the task is rejected according to the saturation policy.
{quote}
According to the Java API class {{java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy}} documentation \[[API 06|AA. Java References#API 06]\], the {{CallerRunsPolicy}} class is:
{quote}
A handler for rejected tasks that runs the rejected task directly in the calling thread of the {{execute}} method, unless the executor has been shut down, in which case the task is discarded.
{quote}
In this compliant solution, tasks that have other tasks waiting to accept the handoff are added to the {{SynchronousQueue}} when the thread pool is full. For example, tasks corresponding to {{perTab()}} are added to the {{SynchronousQueue}} because the tasks corresponding to {{perProfile()}} are waiting to receive the handoff. Once the pool is full, additional tasks are rejected according to the saturation policy in effect. Because the {{CallerRunsPolicy}} is used to handle these rejected tasks, all the rejected tasks are executed in the main thread that started the initial tasks. When all the threads corresponding to {{perTab()}} have finished executing, the next set of tasks corresponding to {{perProfile()}} are added to the {{SynchronousQueue}} because the handoff will be further used by tasks corresponding to {{perUser()}}. Consequently, all tasks are executed in bottom-up fashion.
The caller-runs policy allows graceful degradation of service when faced with many requests by distributing the workload from the thread pool to the work queue. Because the submitted tasks do not block for any reason other than waiting for other tasks to complete, the caller-runs policy guarantees that the current thread can handle multiple tasks sequentially (The caller-runs policy would not prevent thread-starvation deadlock if the tasks were to block for some other reason, such as network IO.). Furthermore, because {{SynchronousQueue}} does not store tasks indefinitely for future execution, there is no unbounded queue growth, and all tasks are handled by the current thread, or by a thread in the thread pool.
This compliant solution is subject to the vagaries of the thread scheduler which may not optimally schedule the tasks, however, it avoids the thread starvation deadlock.
h2. Risk Assessment
Executing interdependent tasks in a thread pool can lead to denial of service.
|| Rule || Severity || Likelihood || Remediation Cost || Priority || Level ||
| CON30- J | low | probable | medium | {color:green}{*}P4{*}{color} | {color:green}{*}L3{*}{color} |
h3. Automated Detection
TODO
h3. Related Vulnerabilities
Search for vulnerabilities resulting from the violation of this rule on the [CERT website|https://www.kb.cert.org/vulnotes/bymetric?searchview&query=FIELD+KEYWORDS+contains+FIO38-J].
h2. References
\[[API 06|AA. Java References#API 06]\]
\[[Gafter 06|AA. Java References#Gafter 06]\] [A Thread Pool Puzzler|http://gafter.blogspot.com/2006/11/thread-pool-puzzler.html]
\[[Goetz 06|AA. Java References#Goetz 06]\] 8.3.2 "Managing queued tasks", 8.3.3 "Saturation Policies", 5.3.3 Deques and work stealing
----
[!The CERT Sun Microsystems Secure Coding Standard for Java^button_arrow_left.png!|VOID CON28CON29-J. Prevent partially initialized objects from being used Use thread pools to enable graceful degradation of service during traffic bursts] [!The CERT Sun Microsystems Secure Coding Standard for Java^button_arrow_up.png!|11. Concurrency (CON)] [!The CERT Sun Microsystems Secure Coding Standard for Java^button_arrow_right.png!|CON04CON31-J. Ensure that callstasks submitted to chaineda thread methodspool are atomicinterruptible]
|