...
Code Block | ||||
---|---|---|---|---|
| ||||
#include <stdio.h><condition_variable> #include <iostream> #include <mutex> #include <threads.h><thread> enum { NTHREADS = 5 }; mtx_t std::mutex mutex; cnd_tstd::condition_variable cond; int void run_step(void *tsize_t my_step) { static intsize_t current_step = 0; size_t my_step = *(size_t *)tstd::unique_lock<std::mutex> lk(mutex); if (thrd_success != mtx_lock(&mutex)) { /* Handle error */ } printf("Thread %zu has the lock\n", my_step);std::cout << "Thread " << my_step << " has the lock" << std::endl; while (current_step != my_step) { printf("Thread %zustd::cout << "Thread " << my_step << " is sleeping...\n", my_step) << std::endl; if (thrd_success != wait(&cond, &mutex)) {cond.wait(lk); std::cout << /*"Thread Handle" error */ } printf("Thread %zu << my_step << " woke up\n", my_step) << std::endl; } /* Do processing ... */ printf("Thread %zustd::cout << "Thread " << my_step << " is processing...\n", my_step) << std::endl; current_step++; /* Signal awaiting task */ if (thrd_success != cond.notify_one(&cond)) {); std::cout << /*"Thread Handle" error */ } printf("Thread %zu << my_step << " is exiting...\n", my_step) << std::endl; } if (thrd_success != mtx_unlock(&mutex) int main(void) { std::thread threads[NTHREADS]; /* HandleCreate errorthreads */ } return 0; } int main(voidfor (size_t i = 0; i < NTHREADS; ++i) { thrd_t threads[NTHREADSi]; size_t step[NTHREADS]; if (thrd_success != mtx_init(&mutex, mtx_plain)) { /* Handle error */ } if (thrd_success != cnd_init(&cond)= std::thread(run_step, i); } /* Wait for all threads to complete */ for (size_t i = NTHREADS; i != 0; --i) { /* Handle error */threads[i-1].join(); } /* Create threads */ for (size_t i = 0; i < NTHREADS; ++i) { step[i] = i; if (thrd_success != thrd_create(&threads[i], run_step, &step[i])) { /* Handle error */ } } /* Wait for all threads to complete */ for (size_t i = NTHREADS; i != 0; --i) { if (thrd_success != thrd_join(threads[i-1], NULL)) { /* Handle error */ } } mtx_destroy(&mutex); cnd_destroy(&cond); return 0; } |
In this example, all threads share a condition variable. Each thread has its own distinct condition predicate because each thread requires current_step
to have a different value before proceeding. When the condition variable is signaled, any of the waiting threads can wake up. The following table illustrates a possible scenario in which the liveness property is violated. If, by chance, the notified thread is not the thread with the next step value, that thread will wait again. No additional notifications can occur, and eventually the pool of available threads will be exhausted.
Deadlock: Out-of-Sequence Step Value
Time | Thread # |
| Action |
---|---|---|---|
0 | 3 | 0 | Thread 3 executes first time: predicate is |
1 | 2 | 0 | Thread 2 executes first time: predicate is |
2 | 4 | 0 | Thread 4 executes first time: predicate is |
3 | 0 | 0 | Thread 0 executes first time: predicate is |
4 | 1 | 1 | Thread 1 executes first time: predicate is |
5 | 3 | 2 | Thread 3 wakes up (scheduler choice): predicate is |
6 | — | — | Thread exhaustion! No more threads to run, and a conditional variable signal is needed to wake up the others |
This noncompliant code example violates the liveness property.
Compliant Solution (notify_all()
)
This compliant solution uses notify_all()
to signal all waiting threads instead of a single random thread. Only the run_step()
thread code from the noncompliant code example is modified, as follows:
Code Block | ||||
---|---|---|---|---|
| ||||
#include <stdio.h>
#include <threads.h>
mtx_t mutex;
cnd_t cond;
int run_step(void *t) {
static size_t current_step = 0;
size_t my_step = *(size_t *)t;
if (thrd_success != mtx_lock(&mutex)) {
/* Handle error */
}
printf("Thread %zu has the lock\n", my_step);
while (current_step != my_step) {
printf("Thread %zu is sleeping...\n", my_step);
if (thrd_success != wait(&cond, &mutex)) {
/* Handle error */
}
printf("Thread %zu woke up\n", my_step);
}
/* Do processing ... */
printf("Thread %zu is processing...\n", my_step);
current_step++;
/* Signal ALL waiting tasks */
if (thrd_success != notify_all(&cond)) {
/* Handle error */
}
printf("Thread %zu is exiting...\n", my_step);
if (thrd_success != mtx_unlock(&mutex)) {
/* Handle error */
}
return 0;
} |
Awakening all threads guarantees the liveness property because each thread will execute its condition predicate test, and exactly one will succeed and continue execution.
Compliant Solution (Using notify_one()
with a Unique Condition Variable per Thread)
Another compliant solution is to use a unique condition variable for each thread (all associated with the same mutex). In this case, notify_one()
wakes up only the thread that is waiting on it. This solution is more efficient than using notify_all()
because only the desired thread is awakened.
return 0;
} |
In this example, all threads share a condition variable. Each thread has its own distinct condition predicate because each thread requires current_step
to have a different value before proceeding. When the condition variable is signaled, any of the waiting threads can wake up. The following table illustrates a possible scenario in which the liveness property is violated. If, by chance, the notified thread is not the thread with the next step value, that thread will wait again. No additional notifications can occur, and eventually the pool of available threads will be exhausted.
Deadlock: Out-of-Sequence Step Value
Time | Thread # |
| Action |
---|---|---|---|
0 | 3 | 0 | Thread 3 executes first time: predicate is |
1 | 2 | 0 | Thread 2 executes first time: predicate is |
2 | 4 | 0 | Thread 4 executes first time: predicate is |
3 | 0 | 0 | Thread 0 executes first time: predicate is |
4 | 1 | 1 | Thread 1 executes first time: predicate is |
5 | 3 | 2 | Thread 3 wakes up (scheduler choice): predicate is |
6 | — | — | Thread exhaustion! No more threads to run, and a conditional variable signal is needed to wake up the others |
This noncompliant code example violates the liveness property.
Compliant Solution (notify_all()
)
This compliant solution uses notify_all()
to signal all waiting threads instead of a single random thread. Only the run_step()
thread code from the noncompliant code example is modified, as follows:Note that the condition predicate of the signaled thread must be true; otherwise, a deadlock will occur.
Code Block | ||||
---|---|---|---|---|
| ||||
#include <stdio.h><condition_variable> #include <threads.h> enum { NTHREADS = 5 }; mtx_t<iostream> #include <mutex> #include <thread> std::mutex mutex; cnd_tstd::condition_variable cond[NTHREADS]; int void run_step(void *tsize_t my_step) { static size_t current_step = 0; size_t my_step = *(size_t *)t; if (thrd_success != mtx_lock(&mutex)) { /* Handle error */ } printf("Thread %zustd::unique_lock<std::mutex> lk(mutex); std::cout << "Thread " << my_step << " has the lock\n", my_step); << std::endl; while (current_step != my_step) { printf("Thread %zustd::cout << "Thread " << my_step << " is sleeping...\n", my_step) << std::endl; if (thrd_success != wait(&cond[my_step], &mutex)) { /* Handle error */ } printf("Thread %zu woke up\n", my_step)cond.wait(lk); std::cout << "Thread " << my_step << " woke up" << std::endl; } /* Do processing .... */ printf("Thread %zu */ std::cout << "Thread " << my_step << " is processing...\n", my_step) << std::endl; current_step++; /* Signal next step thread */ if ((my_step + 1) < NTHREADS) { /* Signal ALL waiting if (thrd_success != notify_one(&cond[my_step + 1])) { /* Handle error */ } } printf("Thread %zu is exiting...\n", my_step); if (thrd_success != mtx_unlock(&mutex)) { /* Handle error */ } return 0; } int main(void) { thrd_t threads[NTHREADS]; size_t step[NTHREADS]; if (thrd_success != mtx_init(&mutex, mtx_plain)) { /* Handle error */ } for (size_t i = 0; i< NTHREADS; ++i) { if (thrd_success != cnd_init(&cond[i])) { /* Handle error */ } } /* Create threads */ for (size_t i = 0; i < NTHREADS; ++i) { step[i] = i; if (thrd_success != thrd_create(&threads[i], run_step, &step[i])) { /* Handle error */ } } /* Wait for all threads to completetasks */ cond.notify_all(); std::cout << "Thread " << my_step << " is exiting..." << std::endl; } |
Awakening all threads guarantees the liveness property because each thread will execute its condition predicate test, and exactly one will succeed and continue execution.
Compliant Solution (Using notify_one()
with a Unique Condition Variable per Thread)
Another compliant solution is to use a unique condition variable for each thread (all associated with the same mutex). In this case, notify_one()
wakes up only the thread that is waiting on it. This solution is more efficient than using notify_all()
because only the desired thread is awakened.
Note that the condition predicate of the signaled thread must be true; otherwise, a deadlock will occur.
Code Block | ||||
---|---|---|---|---|
| ||||
#include <condition_variable> #include <iostream> #include <mutex> #include <thread> enum { NTHREADS = 5 }; std::mutex mutex; std::condition_variable cond[NTHREADS]; void run_step(size_t my_step) { static size_t current_step = 0; std::unique_lock<std::mutex> lk(mutex); std::cout << "Thread " << my_step << " has the lock" << std::endl; while (current_step != my_step) { std::cout << "Thread " << my_step << " is sleeping..." << std::endl; cond[my_step].wait(lk); std::cout << "Thread " << my_step << " woke up" << std::endl; } /* Do processing ... */ std::cout << "Thread " << my_step << " is processing..." << std::endl; current_step++; /* Signal next step thread */ if ((my_step + 1) < NTHREADS) { cond[my_step + 1].notify_one(); } std::cout << "Thread " << my_step << " is exiting..." << std::endl; } int main(void) { std::thread threads[NTHREADS]; /* Create threads */ for (size_t i = NTHREADS0; i !=< 0NTHREADS; --++i) { if (thrd_success != thrd_join(threads[i-1], NULL)) { /* Handle error */ } } mtx_destroy(&mutex); threads[i] = std::thread(run_step, i); } /* Wait for all threads to complete */ for (size_t i = 0NTHREADS; i <!= NTHREADS0; ++--i) { cnd_destroy(&condthreads[i-1].join(); } return 0; } |
Risk Assessment
...