...
This noncompliant code example uses five threads that are intended to execute sequentially according to the step level assigned to each thread when it is created (serialized processing). The current_step
currentStep
variable holds the current step level and is incremented when the respective thread completes. Finally, another thread is signaled so that the next step can be executed. Each thread waits until its step level is ready, and the wait()
call is wrapped inside a while
loop, in compliance with CON54-CPP. Wrap functions that can spuriously wake up in a loop.
Code Block | ||||
---|---|---|---|---|
| ||||
#include <condition_variable> #include <iostream> #include <mutex> #include <thread> std::mutex mutex; std::condition_variable cond; void run_step(size_t my_stepmyStep) { static size_t current_stepcurrentStep = 0; std::unique_lock<std::mutex> lk(mutex); std::cout << "Thread " << my_stepmyStep << " has the lock" << std::endl; while (current_stepcurrentStep != my_stepmyStep) { std::cout << "Thread " << my_stepmyStep << " is sleeping..." << std::endl; cond.wait(lk); std::cout << "Thread " << my_stepmyStep << " woke up" << std::endl; } // Do processing... std::cout << "Thread " << my_stepmyStep << " is processing..." << std::endl; current_stepcurrentStep++; // Signal awaiting task. cond.notify_one(); std::cout << "Thread " << my_stepmyStep << " is exiting..." << std::endl; } int main() { constexpr size_t NumThreadsnumThreads = 5; std::thread threads[NumThreadsnumThreads]; // Create threads. for (size_t i = 0; i < NumThreadsnumThreads; ++i) { threads[i] = std::thread(run_step, i); } // Wait for all threads to complete. for (size_t i = NumThreadsnumThreads; i != 0; --i) { threads[i - 1].join(); } } |
In this example, all threads share a condition variable. Each thread has its own distinct condition predicate because each thread requires current_step
currentStep
to have a different value before proceeding. When the condition variable is signaled, any of the waiting threads can wake up. The following table illustrates a possible scenario in which the liveness property is violated. If, by chance, the notified thread is not the thread with the next step value, that thread will wait again. No additional notifications can occur, and eventually the pool of available threads will be exhausted.
...
Time | Thread # |
| Action |
---|---|---|---|
0 | 3 | 0 | Thread 3 executes first time: predicate is |
1 | 2 | 0 | Thread 2 executes first time: predicate is |
2 | 4 | 0 | Thread 4 executes first time: predicate is |
3 | 0 | 0 | Thread 0 executes first time: predicate is |
4 | 1 | 1 | Thread 1 executes first time: predicate is |
5 | 3 | 2 | Thread 3 wakes up (scheduler choice): predicate is |
6 | — | — | Thread exhaustion! No more threads to run, and a conditional variable signal is needed to wake up the others |
...
Code Block | ||||
---|---|---|---|---|
| ||||
#include <condition_variable> #include <iostream> #include <mutex> #include <thread> std::mutex mutex; std::condition_variable cond; void run_step(size_t my_stepmyStep) { static size_t current_stepcurrentStep = 0; std::unique_lock<std::mutex> lk(mutex); std::cout << "Thread " << my_stepmyStep << " has the lock" << std::endl; while (current_stepcurrentStep != my_stepmyStep) { std::cout << "Thread " << my_stepmyStep << " is sleeping..." << std::endl; cond.wait(lk); std::cout << "Thread " << my_stepmyStep << " woke up" << std::endl; } // Do processing ... std::cout << "Thread " << my_stepmyStep << " is processing..." << std::endl; current_stepcurrentStep++; // Signal ALL waiting tasks. cond.notify_all(); std::cout << "Thread " << my_stepmyStep << " is exiting..." << std::endl; } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
#include <condition_variable> #include <iostream> #include <mutex> #include <thread> constexpr size_t NumThreadsnumThreads = 5; std::mutex mutex; std::condition_variable cond[NumThreadsnumThreads]; void run_step(size_t my_stepmyStep) { static size_t current_stepcurrentStep = 0; std::unique_lock<std::mutex> lk(mutex); std::cout << "Thread " << my_stepmyStep << " has the lock" << std::endl; while (current_stepcurrentStep != my_stepmyStep) { std::cout << "Thread " << my_stepmyStep << " is sleeping..." << std::endl; cond[my_stepmyStep].wait(lk); std::cout << "Thread " << my_stepmyStep << " woke up" << std::endl; } // Do processing ... std::cout << "Thread " << my_stepmyStep << " is processing..." << std::endl; current_stepcurrentStep++; // Signal next step thread. if ((my_stepmyStep + 1) < NumThreadsnumThreads) { cond[my_stepmyStep + 1].notify_one(); } std::cout << "Thread " << my_stepmyStep << " is exiting..." << std::endl; } int main() { std::thread threads[NumThreadsnumThreads]; // Create threads. for (size_t i = 0; i < NumThreadsnumThreads; ++i) { threads[i] = std::thread(run_step, i); } // Wait for all threads to complete. for (size_t i = NumThreadsnumThreads; i != 0; --i) { threads[i - 1].join(); } return 0; } |
...