Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Cleaning up some code examples

...

Code Block
bgColor#FFcccc
langc
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>

enum { NTHREADS = 5 };

std::mutex mutex;
std::condition_variable cond;

void run_step(size_t my_step) {
  static size_t current_step = 0;
  std::unique_lock<std::mutex> lk(mutex);

  std::cout << "Thread " << my_step << " has the lock" << std::endl;

  while (current_step != my_step) {
    std::cout << "Thread " << my_step << " is sleeping..." << std::endl;
    cond.wait(lk);
    std::cout << "Thread " << my_step << " woke up" << std::endl;
  }

  //* Do processing ... */
  std::cout << "Thread " << my_step << " is processing..." << std::endl;
  current_step++;

  /*/ Signal awaiting task */.
  cond.notify_one();

  std::cout << "Thread " << my_step << " is exiting..." << std::endl;
}

int main(void) {
  constexpr size_t NumThreads = 5;
  std::thread threads[NTHREADSNumThreads];

  /*/ Create threads */.
  for (size_t i = 0; i < NTHREADSNumThreads; ++i) {
    threads[i] = std::thread(run_step, i);
  }

  /*/ Wait for all threads to complete */.
  for (size_t i = NTHREADSNumThreads; i != 0; --i) {
    threads[i - 1].join();
  }

  return 0;
}

In this example, all threads share a condition variable. Each thread has its own distinct condition predicate because each thread requires current_step to have a different value before proceeding. When the condition variable is signaled, any of the waiting threads can wake up. The following table illustrates a possible scenario in which the liveness property is violated. If, by chance, the notified thread is not the thread with the next step value, that thread will wait again. No additional notifications can occur, and eventually the pool of available threads will be exhausted.

...

Time

Thread #
(my_step)

current_step

Action

0

3

0

Thread 3 executes first time: predicate is FALSE false -> wait()

1

2

0

Thread 2 executes first time: predicate is FALSE false -> wait()

2

4

0

Thread 4 executes first time: predicate is FALSE false -> wait()

3

0

0

Thread 0 executes first time: predicate is TRUE true -> current_step++; notify_one()

4

1

1

Thread 1 executes first time: predicate is TRUE true -> current_step++; notify_one()

5

3

2

Thread 3 wakes up (scheduler choice): predicate is FALSE false -> wait()

6

Thread exhaustion! No more threads to run, and a conditional variable signal is needed to wake up the others

...

Code Block
bgColor#ccccff
langc
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>

std::mutex mutex;
std::condition_variable cond;

void run_step(size_t my_step) {
  static size_t current_step = 0;
  std::unique_lock<std::mutex> lk(mutex);

  std::cout << "Thread " << my_step << " has the lock" << std::endl;

  while (current_step != my_step) {
    std::cout << "Thread " << my_step << " is sleeping..." << std::endl;
    cond.wait(lk);
    std::cout << "Thread " << my_step << " woke up" << std::endl;
  }

  /*/ Do processing ... */
  std::cout << "Thread " << my_step << " is processing..." << std::endl;
  current_step++;

  /*/ Signal ALL waiting tasks */.
  cond.notify_all();

  std::cout << "Thread " << my_step << " is exiting..." << std::endl;
}

...

Code Block
bgColor#ccccff
langc
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>

enumconstexpr {size_t NTHREADSNumThreads = 5 };

std::mutex mutex;
std::condition_variable cond[NTHREADSNumThreads];

void run_step(size_t my_step) {
  static size_t current_step = 0;
  std::unique_lock<std::mutex> lk(mutex);

  std::cout << "Thread " << my_step << " has the lock" << std::endl;

  while (current_step != my_step) {
    std::cout << "Thread " << my_step << " is sleeping..." << std::endl;
    cond[my_step].wait(lk);
    std::cout << "Thread " << my_step << " woke up" << std::endl;
  }

  /*/ Do processing ... */
  std::cout << "Thread " << my_step << " is processing..." << std::endl;
  current_step++;

  //* Signal next step thread */.
  if ((my_step + 1) < NTHREADSNumThreads) {
    cond[my_step + 1].notify_one();
  }

  std::cout << "Thread " << my_step << " is exiting..." << std::endl;
}

int main(void) {
  std::thread threads[NTHREADSNumThreads];

  /*/ Create threads */.
  for (size_t i = 0; i < NTHREADSNumThreads; ++i) {
    threads[i] = std::thread(run_step, i);
  }

  //* Wait for all threads to complete */.
  for (size_t i = NTHREADSNumThreads; i != 0; --i) {
    threads[i - 1].join();
  }

  return 0;
}

Risk Assessment

...