# Le multithreading

Depuis C++11, le langage dispose d'outils et d'abstractions pour la programmation sur plusieurs threads, basés sur son [memory model](https://en.cppreference.com/w/cpp/language/memory_model). Cependant, la programmation avec plusieurs threads demande des précautions supplémentaires : on doit savoir quelles données vont être lues ou écrites en même temps pour éviter les [data races](https://en.wikipedia.org/wiki/Race_condition#Example).

Il s'agit d'un bug arrivant lorsqu'un thread lit une valeur pendant qu'un autre écrit à la même valeur, ou lorsque deux threads écrivent à la même valeur. C'est un bug très difficile à détecter, car il arrive de façon aléatoire et ses conséquences sont imprévisibles. Il faut donc garantir à l'avance que le programme ne permette aucun *data race*.

### std::thread

`std::thread` représente un thread que l'on créé en passant le *function object* à exécuter. On va chercher à minimiser le nombre de threads utilisés car leur création et le fait de changer de thread courant coûte cher au système d'exploitation.

Par exemple, si l'on a une tâche qui peut être parallélisée un grand nombre de fois, on va utiliser autant de threads que le nombre maximal de threads pouvant être exécutés en même temps :

```cpp
// Exécute 'f()' le nombre donné, en parallèle.
template <class F>
void run_nth_times(F const& f, int count) {

    // Retourne le nombre maximal de threads pouvant s'exécuter en même temps.
    int const nbThreads = std::thread::hardware_concurrency();
    
    std::vector<std::thread> threads;
    
    for (int n = 0; n < nbThreads; ++n) {
        int begin =   (n * count) / nbThreads;
        int end = ((n+1) * count) / nbThreads;
        
        threads.emplace_back(std::thread{ [=]
            for (int i = begin; i < end; ++i) f();
        });
    }
    
    for (auto& t : threads) t.join();
}
```

{% hint style="success" %}
Afin de ne pas recréer de nouveaux threads à chaque appel de la fonction, on va généralement utiliser une *thread pool*, qui contient des threads déjà créés pour les faire travailler.
{% endhint %}

`std::this_thread` met à disposition des fonctions propres au thread courant, comme `std::sleep(duration)` ou son identifiant, pour utiliser ses fonctions natives selon le système d'exploitation (comme pour changer la taille de leur pile).

{% hint style="info" %}
Les threads peuvent être en attente active (si ils exécutent des instructions) ou passive (si ils dorment). Dans ce dernier cas, ils prennent moins de ressources mais prennent plus longtemps pour s'endormir ou se réveiller.
{% endhint %}

Par défaut, détruire un `std::thread` va appeler `std::terminate()`. On peut alors appeler soit `thread.join()`, qui va bloquer le thread courant jusqu'à ce que le thread ciblé ait fini, soit `thread.detach()`, qui va indiquer au thread de se continuer même si `std::thread` est détruit.

Si une exception est lancée et pas *catch* à l'intérieur d'un thread, elle va appeler `std::terminate()`. L'exception doit être propagée à un autre thread manuellement en la récupérant dans un `std::exception_ptr`.

Des variables peuvent être recopiées dans chaque thread : on les marque alors avec la portée `thread_local`.

### Mutexes, locks et std::condition\_variable

Afin d'éviter simplement les problèmes de data race, on peut utiliser des `std::mutex` pour garantir qu'un seul thread à la fois exécute une partie de code (appelée *section critique*). Cela se fait en appelant `mutex.lock()` au début de la section critique puis `mutex.unlock()`: Si le mutex est libre, le thread continue son exécution et bloque le mutex. Sinon, le mutex va bloquer le thread jusqu'à ce qu'il soit libre.

Des objets permettant d'appeler `unlock()` automatiquement sont préférés : par défaut, on construit un `std::guard_lock` avec le mutex (ce qui va appeler `mutex.lock()`) et le destructeur du lock appellera `mutex.unlock()`.

```cpp
std::mutex mutex;

int counter = 0;

int increment_counter() {
    // Le mutex est verrouillé : il ne laisse passer qu'un thread à la fois.
    auto lock = std::guard_lock{ mutex };
    
    return counter++;
    
    // Le mutex est libéré : un autre thread peut exécuter la fonction.
}
```

{% hint style="info" %}
`std::guard_lock` est un type template : il peut agir sur n'importe quel type ayant les fonctions `lock()` et `unlock()`.
{% endhint %}

Afin de gagner en flexibilité, on dispose de `std::condition_variable` qui permet de bloquer des threads (qui vont dormir) et d'en notifier (qui vont se réveiller pour continuer leur exécution). [Voici un exemple d'utilisation](https://en.cppreference.com/w/cpp/thread/condition_variable). `std::unique_lock` est utilisé à la place de `std::guard_lock` pour permettre de libérer l'objet lock avant son destructeur.

Lors de l'utilisation de locks, il faut faire attention aux problèmes de *deadlock*, qui arrivent lorsque un thread A attend une ressource d'un thread B, et que le thread B attend aussi une ressource de A (aucun des deux ne progressera).

Il peut aussi y avoir des problèmes de famine (ou *starvation*), lorsqu'un thread qui peut être lancé reste suspendu, ainsi que les problèmes de *contention*, qui désignent le ralentissement d'un programme losrque plusieurs threads entrent en concurrence.

### Les futures

Les `std::future<T>` sont des objets qui sont soit vides, soit remplies avec une valeur ou une erreur : elles représentent des valeurs fournies plus tard, souvent via un autre thread. A chaque future correspond une `std::promise<T>`. C'est cet objet qui créé la future et lui passe la valeur. Ces objets ne peuvent être utilisés qu'une fois.

```cpp
// Fournit la valeur 42 depuis un autre thread.
void produce_value(std::promise<int>&& promise) {
    std::thread{[p = std::move(promise)] {
        p.set_future(42);
    }}.detach();
}

int main() {
    std::promise<int> promise;
    auto future = promise.get_future();
    produce_value(std::move(promise));
    
    // Appel bloquant : attend que la future soit remplie.
    int result = future.get();
    assert(!future.valid());
}
```

Une `std::shared_future<T>` peut être crée depuis une future, afin que plusieurs consommateurs puissent obtenir la valeur de la future.

`std::async(f)` appelle un *function object* de façon asynchrone, et retourne une future qui contiendra son résultat. L'éventuelle exception lancée par `f` est récupérée et propagée à travers la future.

La façon d'exécuter la fonction peut être spécifiée en passant un tag : `std::launch::deferred` va exécuter la fonction lorsque `future.get()` est appelé, et `std::launch::async` va lancer un nouveau thread pour exécuter f à l'intérieur.  Par défaut, les deux tags sont spécifiés, ce qui laisse à `std::async` le choix entre les deux façons de faire.

```cpp
void background_task();

// Attend un input de l'utilisateur, et exécute des tâches en attendant.
int main() {
    std::cout << "enter number\n";
    auto future = std::async(std::launch::async, [] {
        int value;
        // Appel bloquant attendant un input de l'utilisateur.
        std::cin >> value;
        return value;
    });
    using namespace std::chrono_literals;
    while (future.wait_for(10ms) != std::future_status::ready) {
        background_task();
    }
    std::cout << "input = " << future.get() << '\n';
}
```

`std::packaged_task<Ret(Args...)>` peut stocker un *function object* ayant une signature compatible avec `Ret(Args...)`. Cette classe peut renvoyer la future qui sera remplie lors de son exécution. Cela permet à un exécuteur de ne pas s'occuper du résultat de la fonction, puisque la future va se compléter automatiquement.

```cpp
template <class F>
void execute_task(F&& f);

int handle_argument(char const* arg);

// Appelle 'handle_argument' pour chaque argument.
// Utilise 'execute_task' pour appeler 'handle_argument' de façon asynchrone.
int main(int argc, char** argv) {
    std::packaged_task<int()> task;
    std::vector<std::future<int>> results;
    for (int i = 0; i < argc; ++i) {
        task = [argv[i]] {
            return handle_argument(argv[i]);
        };
        results.push_back(task.get_future());
        execute_task(std::move(task));
    }
    for (auto& r : results) {
        int error_code = r.get();
        if (error_code) return error_code;
    };
}
```

### Les opérations atomiques

Les opérations atomiques permettent de lire et de modifier les mêmes données sur plusieurs threads sans locks et sans déclencher de *data race*. Elles peuvent apporter des performances supplémentaires par rapport aux locks et certaines garanties (comme l'absence de *deadlocks*), mais demandent des précautions supplémentaires.

`std::atomic<T>` permet de manipuler `T` de façon atomique, si il est *TriviallyCopyable*. Cela s'effectue sans locks selon le type utilisé (sur une architecture 64bits, il faut que `sizeof(T) <= 8`). On peut notamment appeler les opérations suivantes :

* `T atomic.load()` pour lire la valeur
* `atomic.store(T)` pour changer la valeur
* `T atomic.exchange(T)` pour changer la valeur et lire l'ancienne

Les pointeurs, les types entiers (et les flottants en C+20) ont également des fonctions pour les additions, les soustractions et les opérations bits à bits (`and, xor, or`).

```cpp
std::atomic<int> counter = 0;
std::vector<std::future<void>> futures;

for (int i = 0; i < 3; ++i) {
    futures.push_back(std::async([&counter] {
        // ++counter appelle counter.fetch_add(1).
        for (int i = 0; i < 100; ++i) ++counter;
    }));
};
for (auto& f : futures) f.get();

// Appelle implicitement counter.load().
assert(counter == 300);
```

L'opération la plus puissante est `bool atomic.compare_exchange(T& expected, T desired)`. Cette fonction va comparer la variable avec `expected`, et renvoyer `true` si les valeurs sont égales. En cas de succès, la variable prend la valeur `desired`, et en cas d'échec, `expected` reçoit la valeur de la variable.

Elle existe en deux variantes : `compare_exchange_strong` et `compare_exchange_weak`. Cette dernière peut se comporter comme si `atomic != expected` alors qu'elles sont égales.

Cette instruction est généralement utilisée pour faire des changements locaux, et les rendre disponible selon une certaine condition :

```cpp
std::atomic<Node const*> current_node = new Node();

// Cette fonction actualise le node pointé par current_node.
// f modifie le node passé en paramètre.
// current_node reste accessible à n'importe quel instant.
template <class F>
void update_node(F&& f) {
    // Créé localement le nouveau node.
    auto expected = current_node.load();
    auto node = new Node(*current_node);
    f(*node);
    // Actualise current_node que si les modifications se basent bien sur le
    // dernier node. Sinon, recréé le nouveau node.
    while (!current_node.compare_exchange_weak(expected, node)) {
        delete node;
        node = new Node(*current_node);
        f(*node);
    }
    // Note : les nouveaux nodes ne sont à priori pas détruits.
}
```

[Différents niveaux de garanties existent](https://en.wikipedia.org/wiki/Non-blocking_algorithmhttps://en.wikipedia.org/wiki/Non-blocking_algorithm) pour le code multithread. En particulier, la meilleure garantie est que le code soit *wait-free*, ce qui assure que tous les threads finiront leurs tâches en un nombre fini d'étapes. Le code est *lock-free* si, parmi tous les threads, au moins un est assuré de progresser.

La fonction `update_node` ci-dessus n'est pas *wait-free* : en effet, elle peut tourner en boucle indéfiniment. Cependant, si le thread bloque, c'est qu'un autre thread a modifié `current_node` et a donc fait des progrès : la fonction est donc *lock-free*.

Par contre, l'accès en lecture à current\_node est *wait-free* : puisque le pointeur atomique est toujours valide, il suffit d'appeler `current_node.load()` (qui s'exécute en un nombre fini d'étapes).

### Pour aller plus loin

Plusieurs threads doivent éviter d'écrire et de lire souvent dans la même *cache line*, car elle devra être invalidée à chaque écriture pour tous les autres cœurs que celui qui l'a modifiée : c'est le problème de [false sharing](https://en.wikipedia.org/wiki/False_sharing).

On peut spécifier comment les opérations atomiques peuvent être réorganisées (avec le compilateur ou le processeur) avec un paramètre supplémentaire ([`std::memory_order`](https://en.cppreference.com/w/cpp/atomic/memory_order)) : on parle d'opérations atomiques "relaxées". Cela permet de gagner en performances sur certaines plateformes, mais demande une bonne compréhension et une vérification formelle de l'algorithme.

En C++20, on aura les coroutines incluses dans le langage, avec la possibilité de spécifier comment elles fonctionnent : il s'agit de fonctions que l'on peut rappeler. Elles peuvent remplacer l'utilisation de plusieurs threads pour avec du code asynchrone, ou être utilisées avec plusieurs threads (si un thread appelle la fonction et qu'un autre la finit plus tard). [Les coroutines sont déjà disponibles dans Boost](https://www.boost.org/doc/libs/1_67_0/libs/coroutine2/doc/html/index.html).

[Les executors](http://open-std.org/JTC1/SC22/WG21/docs/papers/2017/p0443r1.html) vont également être introduits en C++20, qui désignent une façon d'exécuter une tâche donnée.

[Boost.Asio](https://www.boost.org/doc/libs/1_67_0/doc/html/boost_asio.html) définit un modèle d'exécution asynchrone (basé sur les threads et les coroutines) qui sert notamment à utiliser les entrées sorties asynchrones du système d'exploitation courant (comme pour envoyer et recevoir des données avec TCP ou UDP).

`compare_exchange` ne suffit pas à résoudre tous les problèmes de façon *lock-free*. Notamment, la création d'une liste chaînée *lock-free* est sujette au [problème ABA](https://en.wikipedia.org/wiki/ABA_problem). On peut alors spécifier au compilateur de requérir l'instruction *DCAS* (*double compare\_and\_swap*), pour utiliser `compare_exchange` sur des types deux fois plus grands.

[La mémoire transactionnelle](https://en.cppreference.com/w/cpp/language/transactional_memory), disponible sur certaines architectures, permet d'exécuter plusieurs instructions de façon atomique : cela permet d'avoir des propriétés *lock-free* de manière simplifiée,  en écrivant des opérations dans un même bloc.

### Challenges

(\*\*) Créer une thread pool : elle stocke des threads qu'elle réveille lorsque des tâches à exécuter sont passées. Exemple :

```cpp
// Créé la thread pool avec son nombre de threads.
thread_pool threads(2);

for (int i = 1; i <= 4; ++i) {
    threads.enqueue([i] {
        using namespace std::chrono_literals;
        std::this_thread::sleep(1s);
        std::cout << "task " << i << " finished\n";
    });
}
// Une seconde passe ...
// task 1 finished
// task 2 finished
// Une seconde passe ...
// task 3 finished
// task 4 finished
```

(\*\*) Créer un ring buffer *lock-free* : il doit permettre à un thread d'y ajouter des valeurs et à un autre d'y retirer des valeurs. Réfléchir aux changements nécessaires si l'on veut permettre plusieurs producteurs ou plusieurs consommateurs simultanés. Exemple :

```cpp
// Créé le ring buffer avec sa capacité.
ring_buffer<std::string> buffer(128);

auto producer = std::async([&] {
    buffer.emplace("hello ");
    buffer.emplace("world");
});
auto consumer = std::aync([&] {
    std::cout << buffer.pop() << buffer.pop();
});
producer.get();
consumer.get();
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://cours-cpp.gitbook.io/resources/abstractions/la-concurrence.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
