Le multithreading

Depuis C++11, le langage dispose d'outils et d'abstractions pour la programmation sur plusieurs threads, basés sur son memory model. Cependant, la programmation avec plusieurs threads demande des précautions supplémentaires : on doit savoir quelles données vont être lues ou écrites en même temps pour éviter les data races.

Il s'agit d'un bug arrivant lorsqu'un thread lit une valeur pendant qu'un autre écrit à la même valeur, ou lorsque deux threads écrivent à la même valeur. C'est un bug très difficile à détecter, car il arrive de façon aléatoire et ses conséquences sont imprévisibles. Il faut donc garantir à l'avance que le programme ne permette aucun data race.

std::thread

std::thread représente un thread que l'on créé en passant le function object à exécuter. On va chercher à minimiser le nombre de threads utilisés car leur création et le fait de changer de thread courant coûte cher au système d'exploitation.

Par exemple, si l'on a une tâche qui peut être parallélisée un grand nombre de fois, on va utiliser autant de threads que le nombre maximal de threads pouvant être exécutés en même temps :

// Exécute 'f()' le nombre donné, en parallèle.
template <class F>
void run_nth_times(F const& f, int count) {

    // Retourne le nombre maximal de threads pouvant s'exécuter en même temps.
    int const nbThreads = std::thread::hardware_concurrency();
    
    std::vector<std::thread> threads;
    
    for (int n = 0; n < nbThreads; ++n) {
        int begin =   (n * count) / nbThreads;
        int end = ((n+1) * count) / nbThreads;
        
        threads.emplace_back(std::thread{ [=]
            for (int i = begin; i < end; ++i) f();
        });
    }
    
    for (auto& t : threads) t.join();
}

Afin de ne pas recréer de nouveaux threads à chaque appel de la fonction, on va généralement utiliser une thread pool, qui contient des threads déjà créés pour les faire travailler.

std::this_thread met à disposition des fonctions propres au thread courant, comme std::sleep(duration) ou son identifiant, pour utiliser ses fonctions natives selon le système d'exploitation (comme pour changer la taille de leur pile).

Les threads peuvent être en attente active (si ils exécutent des instructions) ou passive (si ils dorment). Dans ce dernier cas, ils prennent moins de ressources mais prennent plus longtemps pour s'endormir ou se réveiller.

Par défaut, détruire un std::thread va appeler std::terminate(). On peut alors appeler soit thread.join(), qui va bloquer le thread courant jusqu'à ce que le thread ciblé ait fini, soit thread.detach(), qui va indiquer au thread de se continuer même si std::thread est détruit.

Si une exception est lancée et pas catch à l'intérieur d'un thread, elle va appeler std::terminate(). L'exception doit être propagée à un autre thread manuellement en la récupérant dans un std::exception_ptr.

Des variables peuvent être recopiées dans chaque thread : on les marque alors avec la portée thread_local.

Mutexes, locks et std::condition_variable

Afin d'éviter simplement les problèmes de data race, on peut utiliser des std::mutex pour garantir qu'un seul thread à la fois exécute une partie de code (appelée section critique). Cela se fait en appelant mutex.lock() au début de la section critique puis mutex.unlock(): Si le mutex est libre, le thread continue son exécution et bloque le mutex. Sinon, le mutex va bloquer le thread jusqu'à ce qu'il soit libre.

Des objets permettant d'appeler unlock() automatiquement sont préférés : par défaut, on construit un std::guard_lock avec le mutex (ce qui va appeler mutex.lock()) et le destructeur du lock appellera mutex.unlock().

std::mutex mutex;

int counter = 0;

int increment_counter() {
    // Le mutex est verrouillé : il ne laisse passer qu'un thread à la fois.
    auto lock = std::guard_lock{ mutex };
    
    return counter++;
    
    // Le mutex est libéré : un autre thread peut exécuter la fonction.
}

std::guard_lock est un type template : il peut agir sur n'importe quel type ayant les fonctions lock() et unlock().

Afin de gagner en flexibilité, on dispose de std::condition_variable qui permet de bloquer des threads (qui vont dormir) et d'en notifier (qui vont se réveiller pour continuer leur exécution). Voici un exemple d'utilisation. std::unique_lock est utilisé à la place de std::guard_lock pour permettre de libérer l'objet lock avant son destructeur.

Lors de l'utilisation de locks, il faut faire attention aux problèmes de deadlock, qui arrivent lorsque un thread A attend une ressource d'un thread B, et que le thread B attend aussi une ressource de A (aucun des deux ne progressera).

Il peut aussi y avoir des problèmes de famine (ou starvation), lorsqu'un thread qui peut être lancé reste suspendu, ainsi que les problèmes de contention, qui désignent le ralentissement d'un programme losrque plusieurs threads entrent en concurrence.

Les futures

Les std::future<T> sont des objets qui sont soit vides, soit remplies avec une valeur ou une erreur : elles représentent des valeurs fournies plus tard, souvent via un autre thread. A chaque future correspond une std::promise<T>. C'est cet objet qui créé la future et lui passe la valeur. Ces objets ne peuvent être utilisés qu'une fois.

// Fournit la valeur 42 depuis un autre thread.
void produce_value(std::promise<int>&& promise) {
    std::thread{[p = std::move(promise)] {
        p.set_future(42);
    }}.detach();
}

int main() {
    std::promise<int> promise;
    auto future = promise.get_future();
    produce_value(std::move(promise));
    
    // Appel bloquant : attend que la future soit remplie.
    int result = future.get();
    assert(!future.valid());
}

Une std::shared_future<T> peut être crée depuis une future, afin que plusieurs consommateurs puissent obtenir la valeur de la future.

std::async(f) appelle un function object de façon asynchrone, et retourne une future qui contiendra son résultat. L'éventuelle exception lancée par f est récupérée et propagée à travers la future.

La façon d'exécuter la fonction peut être spécifiée en passant un tag : std::launch::deferred va exécuter la fonction lorsque future.get() est appelé, et std::launch::async va lancer un nouveau thread pour exécuter f à l'intérieur. Par défaut, les deux tags sont spécifiés, ce qui laisse à std::async le choix entre les deux façons de faire.

void background_task();

// Attend un input de l'utilisateur, et exécute des tâches en attendant.
int main() {
    std::cout << "enter number\n";
    auto future = std::async(std::launch::async, [] {
        int value;
        // Appel bloquant attendant un input de l'utilisateur.
        std::cin >> value;
        return value;
    });
    using namespace std::chrono_literals;
    while (future.wait_for(10ms) != std::future_status::ready) {
        background_task();
    }
    std::cout << "input = " << future.get() << '\n';
}

std::packaged_task<Ret(Args...)> peut stocker un function object ayant une signature compatible avec Ret(Args...). Cette classe peut renvoyer la future qui sera remplie lors de son exécution. Cela permet à un exécuteur de ne pas s'occuper du résultat de la fonction, puisque la future va se compléter automatiquement.

template <class F>
void execute_task(F&& f);

int handle_argument(char const* arg);

// Appelle 'handle_argument' pour chaque argument.
// Utilise 'execute_task' pour appeler 'handle_argument' de façon asynchrone.
int main(int argc, char** argv) {
    std::packaged_task<int()> task;
    std::vector<std::future<int>> results;
    for (int i = 0; i < argc; ++i) {
        task = [argv[i]] {
            return handle_argument(argv[i]);
        };
        results.push_back(task.get_future());
        execute_task(std::move(task));
    }
    for (auto& r : results) {
        int error_code = r.get();
        if (error_code) return error_code;
    };
}

Les opérations atomiques

Les opérations atomiques permettent de lire et de modifier les mêmes données sur plusieurs threads sans locks et sans déclencher de data race. Elles peuvent apporter des performances supplémentaires par rapport aux locks et certaines garanties (comme l'absence de deadlocks), mais demandent des précautions supplémentaires.

std::atomic<T> permet de manipuler T de façon atomique, si il est TriviallyCopyable. Cela s'effectue sans locks selon le type utilisé (sur une architecture 64bits, il faut que sizeof(T) <= 8). On peut notamment appeler les opérations suivantes :

  • T atomic.load() pour lire la valeur

  • atomic.store(T) pour changer la valeur

  • T atomic.exchange(T) pour changer la valeur et lire l'ancienne

Les pointeurs, les types entiers (et les flottants en C+20) ont également des fonctions pour les additions, les soustractions et les opérations bits à bits (and, xor, or).

std::atomic<int> counter = 0;
std::vector<std::future<void>> futures;

for (int i = 0; i < 3; ++i) {
    futures.push_back(std::async([&counter] {
        // ++counter appelle counter.fetch_add(1).
        for (int i = 0; i < 100; ++i) ++counter;
    }));
};
for (auto& f : futures) f.get();

// Appelle implicitement counter.load().
assert(counter == 300);

L'opération la plus puissante est bool atomic.compare_exchange(T& expected, T desired). Cette fonction va comparer la variable avec expected, et renvoyer true si les valeurs sont égales. En cas de succès, la variable prend la valeur desired, et en cas d'échec, expected reçoit la valeur de la variable.

Elle existe en deux variantes : compare_exchange_strong et compare_exchange_weak. Cette dernière peut se comporter comme si atomic != expected alors qu'elles sont égales.

Cette instruction est généralement utilisée pour faire des changements locaux, et les rendre disponible selon une certaine condition :

std::atomic<Node const*> current_node = new Node();

// Cette fonction actualise le node pointé par current_node.
// f modifie le node passé en paramètre.
// current_node reste accessible à n'importe quel instant.
template <class F>
void update_node(F&& f) {
    // Créé localement le nouveau node.
    auto expected = current_node.load();
    auto node = new Node(*current_node);
    f(*node);
    // Actualise current_node que si les modifications se basent bien sur le
    // dernier node. Sinon, recréé le nouveau node.
    while (!current_node.compare_exchange_weak(expected, node)) {
        delete node;
        node = new Node(*current_node);
        f(*node);
    }
    // Note : les nouveaux nodes ne sont à priori pas détruits.
}

Différents niveaux de garanties existent pour le code multithread. En particulier, la meilleure garantie est que le code soit wait-free, ce qui assure que tous les threads finiront leurs tâches en un nombre fini d'étapes. Le code est lock-free si, parmi tous les threads, au moins un est assuré de progresser.

La fonction update_node ci-dessus n'est pas wait-free : en effet, elle peut tourner en boucle indéfiniment. Cependant, si le thread bloque, c'est qu'un autre thread a modifié current_node et a donc fait des progrès : la fonction est donc lock-free.

Par contre, l'accès en lecture à current_node est wait-free : puisque le pointeur atomique est toujours valide, il suffit d'appeler current_node.load() (qui s'exécute en un nombre fini d'étapes).

Pour aller plus loin

Plusieurs threads doivent éviter d'écrire et de lire souvent dans la même cache line, car elle devra être invalidée à chaque écriture pour tous les autres cœurs que celui qui l'a modifiée : c'est le problème de false sharing.

On peut spécifier comment les opérations atomiques peuvent être réorganisées (avec le compilateur ou le processeur) avec un paramètre supplémentaire (std::memory_order) : on parle d'opérations atomiques "relaxées". Cela permet de gagner en performances sur certaines plateformes, mais demande une bonne compréhension et une vérification formelle de l'algorithme.

En C++20, on aura les coroutines incluses dans le langage, avec la possibilité de spécifier comment elles fonctionnent : il s'agit de fonctions que l'on peut rappeler. Elles peuvent remplacer l'utilisation de plusieurs threads pour avec du code asynchrone, ou être utilisées avec plusieurs threads (si un thread appelle la fonction et qu'un autre la finit plus tard). Les coroutines sont déjà disponibles dans Boost.

Les executors vont également être introduits en C++20, qui désignent une façon d'exécuter une tâche donnée.

Boost.Asio définit un modèle d'exécution asynchrone (basé sur les threads et les coroutines) qui sert notamment à utiliser les entrées sorties asynchrones du système d'exploitation courant (comme pour envoyer et recevoir des données avec TCP ou UDP).

compare_exchange ne suffit pas à résoudre tous les problèmes de façon lock-free. Notamment, la création d'une liste chaînée lock-free est sujette au problème ABA. On peut alors spécifier au compilateur de requérir l'instruction DCAS (double compare_and_swap), pour utiliser compare_exchange sur des types deux fois plus grands.

La mémoire transactionnelle, disponible sur certaines architectures, permet d'exécuter plusieurs instructions de façon atomique : cela permet d'avoir des propriétés lock-free de manière simplifiée, en écrivant des opérations dans un même bloc.

Challenges

(**) Créer une thread pool : elle stocke des threads qu'elle réveille lorsque des tâches à exécuter sont passées. Exemple :

// Créé la thread pool avec son nombre de threads.
thread_pool threads(2);

for (int i = 1; i <= 4; ++i) {
    threads.enqueue([i] {
        using namespace std::chrono_literals;
        std::this_thread::sleep(1s);
        std::cout << "task " << i << " finished\n";
    });
}
// Une seconde passe ...
// task 1 finished
// task 2 finished
// Une seconde passe ...
// task 3 finished
// task 4 finished

(**) Créer un ring buffer lock-free : il doit permettre à un thread d'y ajouter des valeurs et à un autre d'y retirer des valeurs. Réfléchir aux changements nécessaires si l'on veut permettre plusieurs producteurs ou plusieurs consommateurs simultanés. Exemple :

// Créé le ring buffer avec sa capacité.
ring_buffer<std::string> buffer(128);

auto producer = std::async([&] {
    buffer.emplace("hello ");
    buffer.emplace("world");
});
auto consumer = std::aync([&] {
    std::cout << buffer.pop() << buffer.pop();
});
producer.get();
consumer.get();

Last updated