There are certain problems associated with the incorrect use of the Executor
interface. For instance, Bounded thread pools allow the programmer to specify an upper limit on the number of threads that can concurrently execute in a thread pool. Programs must not use threads from a bounded thread pool to execute tasks that depend on the completion of other tasks should not execute in the same thread pool. A task that submits another task to a single threaded Executor
remains blocked until the results are received whereas the second task may have dependencies on the first task. This constitutes a deadlock.
Noncompliant Code Example
Wiki Markup |
---|
This noncompliant code example shows a _thread starvation deadlock_. This situation not only occurs in single threaded Executors, but also in those with large Thread Pools. This can happen when all the threads executing in the pool are blocked on tasks that are waiting on the queue. A blocking operation within a subtask can also lead to unbounded queue growth. \[[Goetz 06|AA. Java References#Goetz 06]\] |
A form of deadlock called thread-starvation deadlock arises when all the threads executing in the pool are blocked on tasks that are waiting on an internal queue for an available thread in which to execute. Thread-starvation deadlock occurs when currently executing tasks submit other tasks to a thread pool and wait for them to complete and the thread pool lacks the capacity to accommodate all the tasks at once.
This problem can be confusing because the program can function correctly when fewer threads are needed. The issue can be mitigated, in some cases, by choosing a larger pool size. However, determining a suitable size may be difficult or even impossible.
Similarly, threads in a thread pool may fail to be recycled when two executing tasks each require the other to complete before they can terminate. A blocking operation within a subtask can also lead to unbounded queue growth [Goetz 2006].
Noncompliant Code Example (Interdependent Subtasks)
This noncompliant code example is vulnerable to thread-starvation deadlock. It consists of the ValidationService
class, which performs various input validation tasks such as checking whether a user-supplied field exists in a back-end database.
The fieldAggregator()
method accepts a variable number of String
arguments and creates a task corresponding to each argument to enable concurrent processing. The task performs input validation using the ValidateInput
class.
In turn, the ValidateInput
class attempts to sanitize the input by creating a subtask for each request using the SanitizeInput
class. All tasks are executed in the same thread pool. The fieldAggregator()
method blocks until all the tasks have finished executing and, when all results are available, returns the aggregated results as a StringBuilder
object to the caller.
Code Block | ||
---|---|---|
| ||
public final class ValidationService {
private final ExecutorService pool;
public ValidationService(int poolSize) {
pool = Executors.newFixedThreadPool(poolSize);
}
public void shutdown() {
pool.shutdown();
}
public StringBuilder fieldAggregator(String... inputs)
throws InterruptedException, ExecutionException {
StringBuilder sb = new StringBuilder();
// Stores the results
Future<String>[] results = new Future[inputs.length];
// Submits the tasks to thread pool
for (int i = 0; i < inputs.length; i++) {
results[i] = pool.submit(
new ValidateInput<String>(inputs[i], pool));
}
for (int i = 0; i < inputs.length; i++) { // Aggregates the results
sb.append(results[i].get());
}
return sb;
}
}
public final class ValidateInput<V> implements Callable<V> | ||
Code Block | ||
| ||
// Field password is defined in class InitialHandshake class NetworkServer extends InitialHandshake implements Runnable { private final ServerSocketV serverSocketinput; private final ExecutorService pool; public NetworkServerValidateInput(intV portinput, intExecutorService poolSizepool) throws IOException { serverSocketthis.input = new ServerSocket(port)input; this.pool = Executors.newFixedThreadPool(poolSize)pool; } @Override public voidV runcall() throws Exception { // If validation tryfails, {throw an exception here // Interdependent tasksSubtask Future<V> future = pool.submit(new SanitizeInputSanitizeInput<V>(passwordinput)); return // password is defined in class InitialHandshake pool.submit(new CustomHandshake(password)); // for e.g. client puzzles pool.execute(new Handle(serverSocket.accept())); // Handle connection } catch (IOException ex) { pool.shutdown(); } (V) future.get(); } } public final class SanitizeInput<V> implements Callable<V> { private final V input; SanitizeInput(V input) { this.input = input; } @Override public V call() throws Exception { // Sanitize input and return return (V) input; } } |
In this noncompliant code example, the SanitizeInput
task depends upon the CustomHandshake
task for the value of password
whereas the latter depends on the former to return a password
that has been correctly sanitized.
Compliant Solution
Assume, for example, that the pool size is set to 6. The ValidationService.fieldAggregator()
method is invoked to validate six arguments; consequently, it submits six tasks to the thread pool. Each task submits a corresponding subtask to sanitize the input. The SanitizeInput
subtasks must execute before the original six tasks can return their results. However, this is impossible because all six threads in the thread pool are blocked. Furthermore, the shutdown()
method cannot shut down the thread pool when it contains active tasks.
Thread-starvation deadlock can also occur when a single-threaded Executor
is used, for example, when the caller creates several subtasks and waits for the results.
Compliant Solution (No Interdependent Tasks)
This compliant solution modifies the ValidateInput<V>
class so that the SanitizeInput
tasks are executed in the same threads as the ValidateInput
tasks rather than in separate threads. Consequently, the ValidateInput
and SanitizeInput
tasks are independent, which eliminates their need to wait for each other to complete. The SanitizeInput
class has also been modified to omit implementation of the Callable
interfaceThis compliant solution recommends executing the interdependent tasks as a single task within the Executor
. In other cases, where the subtasks do not require concurrency safeguards, the subtasks can be moved outside the threaded region that is required to be executed by the Executor
.
Code Block | ||
---|---|---|
| ||
class NetworkServer extends InitialHandshake implements Runnable { private final ServerSocket serverSocket; private final ExecutorService pool; public NetworkServer(int port, int poolSize) throws IOException { serverSocket = new ServerSocket(port); pool = Executors.newFixedThreadPool(poolSize); } public void run() { try { // Execute interdependent subtasks as a single combined task within this block public final class ValidationService { // ... public StringBuilder fieldAggregator(String... inputs) throws InterruptedException, ExecutionException { // ... for (int i = 0; i < inputs.length; i++) { // Don't pass-in thread pool results[i] = pool.submit(new ValidateInput<String>(inputs[i])); } // ... } } // Does not use same thread pool public final class ValidateInput<V> implements Callable<V> { private final V input; ValidateInput(V input) { this.input = input; } @Override public V call() throws Exception { // TasksIf SanitizeInput() and CustomHandshake() are performed together in Handle()validation fails, throw an exception here return pool.execute((V) new HandleSanitizeInput(serverSocket).acceptsanitize(input))); } } public final class SanitizeInput<V> { // Handle connectionNo longer a Callable task public SanitizeInput() {} public catchV sanitize(IOExceptionV exinput) { // pool.shutdown();Sanitize input and return } return input; } } |
Always try to submit independent tasks to the Executor
. Thread Thread-starvation issues can be partially mitigated by choosing a large thread pool size. However, an untrusted caller can still overwhelm the system by supplying more inputs (see TPS00-J. Use thread pools to enable graceful degradation of service during traffic bursts).
Note that operations that have further constraints, such as the total number of database connections or total ResultSets
ResultSet
objects open at a particular time, impose an upper bound on the usable thread pool size, as each thread continues to block until the resource becomes available. The other rules of fair concurrency, such as not running time consuming tasks, also apply. When this is not possible, expecting to obtain real time result guarantees from the execution of tasks is conceivably, an unreasonable target.
Wiki Markup |
---|
Sometimes, a {{private static}} {{ThreadLocal}} variable is used per thread to maintain local state. When using thread pools, {{ThreadLocal}} variable should be used only if their lifetime is shorter than that of the corresponding task \[[Goetz 06|AA. Java References#Goetz 06]\]. Moreover, such variables should not be used as a communication mechanism between tasks. |
Noncompliant Code Example
.
Private static ThreadLocal
variables may be used to maintain local state in each thread. When using thread pools, the lifetime of ThreadLocal
variables should be bounded by the corresponding task [Goetz 2006]. Furthermore, programs must not use these variables to communicate between tasks. There are additional constraints in the use of ThreadLocal
variables in thread pools (see TPS04-J. Ensure ThreadLocal variables are reinitialized when using thread pools for more information).
Noncompliant Code Example (Subtasks)
This noncompliant code example contains a series of subtasks that execute in a shared thread pool [Gafter 2006]. The BrowserManager
class calls perUser()
, which starts tasks that invoke perProfile()
. The perProfile()
method starts tasks that invoke perTab()
, and in turn, perTab
starts tasks that invoke doSomething()
. BrowserManager
then waits for the tasks to finish. The threads are allowed to invoke doSomething()
in any order, provided that count
correctly records the number of methods executed. This noncompliant code example (based on \[[Gafter 06|AA. Java References#Gafter 06]\]) shows a {{BrowserManager}} class that has several methods that use a fork-join mechanism, that is, they start threads and wait for them to finish. The methods are called in the sequence {{perUser()}}, {{perProfile}} and {{perTab()}}. The method {{methodInvoker}} spawns several instances of the specified runnable depending on the value of the variable {{numberOfTimes}}. A fixed sized thread pool is used to execute the enumerated threads generated at different levels. Wiki Markup
Code Block | ||
---|---|---|
| ||
public final class BrowserManager { private final ExecutorService pool = Executors.newFixedThreadPool(10); private final int numberOfTimes; private static volatile int AtomicInteger count = new AtomicInteger(); // count = 0; public BrowserManager(int n) { this.numberOfTimes = n; } public void perUser() { methodInvoker(numberOfTimes, "perProfile"); pool.shutdown(); } public void perProfile() { methodInvoker(numberOfTimes, "perTab"); } public void perTab() { methodInvoker(numberOfTimes, "doSomething"); } public void doSomething() { System.out.println(++count.getAndIncrement()); } public void methodInvoker(int n, final String method) { final BrowserManager fmmanager = this; RunnableCallable<Object> runcallable = new RunnableCallable<Object>() { @Override public voidObject runcall() throws Exception { try { Method meth = fmmanager.getClass().getMethod(method); meth.invoke(fm); } catch (Throwable t) { // Forward to exception reporterreturn meth.invoke(manager); } } }; Collection<Callable<Object>> ccollection = new ArrayList<Callable<Object>>(); for(int i=0;i < n; i++) { c.add(Executors.callable(run)); } Collection<Future<Object>> futures = null Collections.nCopies(n, callable); try { Collection<Future<Object>> futures = pool.invokeAll(ccollection); } catch (InterruptedException e) { // Forward to handler Thread.currentThread().interrupt(); // ForwardReset tointerrupted handler status } // ... } public static void main(String[] args) { BrowserManager manager = new BrowserManager(5); manager.perUser(); } } |
Contrary to what is expectedUnfortunately, this program does not print the total count, that is, the number of times doSomething()
is invoked. This is because it is susceptible to a thread-starvation deadlock because the size of the thread pool (10) does not allow either thread from perTab()
to invoke the doSomething()
method. The output of the program varies for different values of numberOfTimes
and the thread pool size. Note that different threads are allowed to invoke doSomething()
in different order; we are concerned only with the maximum value of count
to determine how many times the method executed.
...
. For example, if each of the five perUser
tasks spawns five perProfile
tasks, where each perProfile
task spawns a perTab
task, the thread pool will be exhausted, and perTab()
will be unable to allocate any additional threads to invoke the doSomething()
method.
Compliant Solution (CallerRunsPolicy
)
This compliant solution selects and schedules tasks for scheduling and avoids the thread starvation deadlock. Every level (worker) must have a double ended queue where all sub-tasks are queued. Each level removes the most recently generated sub-task from the queue so that it can process it. When there are no more threads left to process, the current level runs the least-recently created sub-task of another level by picking and removing it from that level's queue. This compliant solution sets the {{CallerRuns}} policy on a {{ThreadPoolExecutor}}, and uses a synchronous queue \[[Gafter 06|AA. Java References#Gafter 06]\]execution, avoiding thread-starvation deadlock. It sets the Wiki Markup CallerRunsPolicy
on a ThreadPoolExecutor
and uses a SynchronousQueue
[Gafter 2006]. The policy dictates that when the thread pool runs out of available threads, any subsequent tasks will run in the thread that submitted the tasks.
Code Block | ||
---|---|---|
| ||
public final class BrowserManager { private final static ThreadPoolExecutor pool = new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); private final int numberOfTimes; private static volatile int AtomicInteger count = new AtomicInteger(); // count = 0; static { pool.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy()); } // ... } |
According to Goetz and colleagues [Goetz 2006]:
A
SynchronousQueue
is not really a queue at all, but a mechanism for managing handoffs between threads. In order to
...
put an element on the
SynchronousQueue
, another thread must already be waiting to accept the handoff. If no thread is waiting, but the current pool size is less than the maximum,ThreadPoolExecutor
creates a new thread; otherwise, the task is rejected according to the saturation policy.
According to the Java API [API 2014], the CallerRunsPolicy
class is
a A handler for rejected tasks that runs the rejected task directly in the calling thread of the
execute
method, unless the executor has been shut down, in which case, the task is discarded.
In this compliant solution, tasks that have other tasks waiting to accept the hand-off are added to the SynchronousQueue
when the thread pool is full. For example, tasks corresponding to perTab()
are added to the SynchronousQueue
because the tasks corresponding to perProfile()
are waiting to receive the hand-off. Once the pool is full, additional tasks are rejected according to the saturation policy in effect. Because the CallerRunsPolicy
is used to handle these rejected tasks, all the rejected tasks are executed in the main thread that started the initial tasks. When all the threads corresponding to perTab()
have finished executing, the next set of tasks corresponding to perProfile()
are added to the SynchronousQueue
because the hand-off is subsequently used by perUser()
tasks.
The CallerRunsPolicy
allows graceful degradation of service when faced with many requests by distributing the workload from the thread pool to the work queue. Because the submitted tasks cannot block for any reason other than waiting for other tasks to complete, the policy guarantees that the current thread can handle multiple tasks sequentially. The policy would fail to prevent thread-starvation deadlock if the tasks were to block for some other reason, such as network I/O. Furthermore, this approach avoids unbounded queue growth because SynchronousQueue
avoids storing tasks indefinitely for future execution, and all tasks are handled either by the current thread or by a thread in the thread pool.
This compliant solution is subject to the vagaries of the thread scheduler, which may not optimally might schedule the tasks , howeversuboptimally. However, it avoids the thread-starvation deadlock.
Risk Assessment
Executing interdependent tasks in a thread pool can lead to denial of service.
Rule | Severity | Likelihood | Remediation Cost | Priority | Level |
---|---|---|---|---|---|
CON29TPS01-J | low Low | probable Probable | medium Medium | P4 | L3 |
Automated Detection
TODO
Related Vulnerabilities
Search for vulnerabilities resulting from the violation of this rule on the CERT website.
References
Wiki Markup |
---|
\[[API 06|AA. Java References#API 06]\]
\[[Gafter 06|AA. Java References#Gafter 06]\] [A Thread Pool Puzzler|http://gafter.blogspot.com/2006/11/thread-pool-puzzler.html] |
Bibliography
[API 2014] | |
Section 5.3.3, "Dequeues and Work Stealing" |
...
FIO36-J. Do not create multiple buffered wrappers on an InputStream 09. Input Output (FIO) 09. Input Output (FIO)