Zig for C programmers - async/await - Part 4

Zig for C programmers - async/await - Part 4

Basic async/await implementation

Introduction

Whoa! It's been a long road to get here. But that's just an evidence about the complexities which lie behind this technique. In this post we are going to glue all the pieces that we have been developing through the previous posts to get a basic implementation of async/await. Although it is just a toy experiment to let you understand how this technique can be implemented, it provides you with enough functionality to analyze different scenarios and check how it behaves. It even allows you to chain async calls, so that you can test nested asynchronous functions.

In this example we will use terms that you can find with another names in other programming languages. For example, the term Coroutine that we will be using would equate to the term Future (used in Dart) or Promise (used in Javascript). And any function receiving a Coroutine parameter, would be the equivalent to a function marked as async.

async function loadJson(url) {
  let response = await fetch(url);
  if (response.status == 200) {
    return response.json();
  } else {
    throw new HttpError(response);
  }
}

The main difference with respect to the previous post, is that we will store coroutines in the Thread Pool queue (work queue in the previous post) and the Event Loop queue (done queue in the previous post). As coroutines are resumable, we don't need to carry a couple of callbacks as we did before.

#ifndef __LOOP__
#define __LOOP__

#include "coroutine.h"

typedef struct EventLoop EventLoop;

EventLoop* new_EventLoop();
void delete_EventLoop(EventLoop* l);

void EventLoop_submit(EventLoop* l, CoroutineFn fn);
void EventLoop_run(EventLoop* l);
void EventLoop_stop(EventLoop* l);

typedef struct Task {
        CoroutineFn fn;
        Coroutine*  parent;
        void*       data;
} Task;
Coroutine* EventLoop_async(EventLoop* l, Task task);
void* EventLoop_await(EventLoop* l, Coroutine* co);

#endif

Once a coroutine is done, we can resume the execution of its parent coroutine. This leads us to the next big difference. But in this case it is related to the first post. In this implementation, we are going to extend the definition of coroutines to keep track of their parent coroutines. This technique will allow us to nest coroutine calls.

#ifndef __COROUTINE__
#define __COROUTINE__

#include <stdbool.h>
#include <ucontext.h>

typedef struct Coroutine Coroutine;
typedef void* (*CoroutineFn)(Coroutine*);

struct Coroutine {
    Coroutine*  parent;      // New field
    CoroutineFn fn;
    ucontext_t  caller_ctx;
    ucontext_t  callee_ctx;
    void*       yield_value;
    bool        finished;
};

Coroutine* new_Coroutine(CoroutineFn fn, Coroutine* parent);
void* Coroutine_resume(Coroutine* c);
void Coroutine_suspend(Coroutine* c, void* value);
void delete_Coroutine(Coroutine* c);

#endif

When we want to submit a new task to the Event Loop, we create a coroutine without a parent, and push it into the Event Loop queue.

void EventLoop_submit(EventLoop* l, CoroutineFn fn) {
        Coroutine* co = new_Coroutine(fn, NULL);
        EventLoop_resume(l, co);
}

That coroutine will be picked up by the Event Loop later on. It will resume the coroutine, and after that it will check whether that coroutine has finished or not. If that is the case and it has a parent, the Event Loop will queue up the parent coroutine to go up the nested calls of coroutines. Otherwise, the Event Loop will delete the coroutine.

void EventLoop_run(EventLoop* l) {
        AtomicBool_set(l->running, true);

        while (AtomicBool_get(l->running)) {
                Coroutine* co = NULL;
                if (SafeQueue_pop(l->queue, &co)) {
                        Coroutine_resume(co);
                        if (co->finished) {
                                if (co->parent != NULL) {
                                        EventLoop_resume(l, co->parent);
                                } else {
                                        delete_Coroutine(co);
                                }
                        }
                }
        }
}

When a coroutine is resumed, we get to execute its code. There, we can delegate the execution of an asynchronous call to the worker threads. To do so, we can call the async function. We have to wrap the function that we want to execute, a reference to the current coroutine as parent and a reference to any input data into a Task object that we use as argument for the async function. Then, it will create a new coroutine with the data provided in the Task object, and push it into the Thread Pool queue.

void* download_images(Coroutine* co) {
        printf("[%lu] download image 1\n", pthread_self());
        string str1;
        Coroutine* c1 = EventLoop_async(loop, (Task){
                        .fn = get_image,
                        .parent = co,
                        .data = str1
                        });
        char* img1 = (char*)EventLoop_await(loop, c1);
        printf("[%lu] %s\n", pthread_self(), img1);

        printf("[%lu] download image 2\n", pthread_self());
        string str2;
        Coroutine* c2 = EventLoop_async(loop, (Task){
                        .fn = get_image,
                        .parent = co,
                        .data = str2
                        });
        char* img2 = (char*)EventLoop_await(loop, c2);
        printf("[%lu] %s\n", pthread_self(), img2);

        return 0;
}
Coroutine* EventLoop_async(EventLoop* l, Task task) {
        Coroutine* c = new_Coroutine(task.fn, task.parent);
        c->yield_value = task.data;
        ThreadPool_submit(l->pool, c);
        return c;
}

The Worker Threads behave similarly to the Event Loop. They pick up a coroutine from the Thread Pool queue and resume it. After that, they will check if that coroutine has finished. If that is the case, they will queue up their parent coroutine into the Event Loop queue.

void* ThreadPool_run(void* arg) {
        ThreadPool* p = (ThreadPool*) arg;

        while (AtomicBool_get(p->running)) {
                Coroutine* co;
                if (SafeQueue_pop(p->queue, &co)) {
                        Coroutine_resume(co);
                        if (co->finished) {
                                EventLoop_resume(p->loop, co->parent);
                        }
                }
        }

        return NULL;
}

After triggering an asynchronous call from a coroutine, we can wait for its execution using the await function. It will suspend the parent coroutine. And when the execution flow comes back again, it saves the yield value of the current coroutine to return it. But before that, it deletes the coroutine.

void* EventLoop_await(EventLoop* l, Coroutine* co) {
        Coroutine_suspend(co->parent, co->parent->yield_value);
        void* result = co->yield_value;
        delete_Coroutine(co);
        return result;
}

Event Loop

Here is the complete code of the final Event Loop:

static
void EventLoop_resume(EventLoop* l, Coroutine* co);

typedef struct ThreadPool {
        SafeQueue*      queue;
        EventLoop*      loop;
        pthread_t*      threads;
        size_t          nthreads;
        AtomicBool*     running;
} ThreadPool;

static
void ThreadPool_submit(ThreadPool* p, Coroutine* co) {
        while(!SafeQueue_push(p->queue, &co));
}

static
void* ThreadPool_run(void* arg) {
        ThreadPool* p = (ThreadPool*) arg;

        while (AtomicBool_get(p->running)) {
                Coroutine* co;
                if (SafeQueue_pop(p->queue, &co)) {
                        Coroutine_resume(co);
                        if (co->finished) {
                                EventLoop_resume(p->loop, co->parent);
                        }
                }
        }

        return NULL;
}

static
void ThreadPool_start(ThreadPool* p) {
        AtomicBool_set(p->running, true);

        for (int i=0; i<p->nthreads; ++i) {
                pthread_create(&p->threads[i], NULL, ThreadPool_run, p);
        }
}

static
ThreadPool* new_ThreadPool(EventLoop* loop) {
        ThreadPool* p = calloc(1, sizeof(ThreadPool));

        p->nthreads = NUM_THREADS;
        p->queue    = new_SafeQueue(sizeof(Coroutine*), QUEUE_SIZE);
        p->threads  = calloc(p->nthreads, sizeof(pthread_t));
        p->loop = loop;
        p->running = new_AtomicBool();

        ThreadPool_start(p);

        return p;
}

static
void delete_ThreadPool(ThreadPool* p) {
        AtomicBool_set(p->running, false);

        for (int i=0; i<p->nthreads; ++i) {
                pthread_join(p->threads[i], NULL);
        }
        free(p->threads);

        delete_SafeQueue(p->queue);
        delete_AtomicBool(p->running);

        free(p);
}

// *******************************************************

struct EventLoop {
        ThreadPool* pool;
        SafeQueue*  queue;
        AtomicBool* running;
};

EventLoop* new_EventLoop() {
        EventLoop* l = calloc(1, sizeof(EventLoop));

        l->pool = new_ThreadPool(l);
        l->queue = new_SafeQueue(sizeof(Coroutine*), QUEUE_SIZE);
        l->running = new_AtomicBool();

        return l;
}

void delete_EventLoop(EventLoop* l) {
        delete_ThreadPool(l->pool);
        delete_SafeQueue(l->queue);
        delete_AtomicBool(l->running);
        free(l);
}

static
void EventLoop_resume(EventLoop* l, Coroutine* co) {
        while (!SafeQueue_push(l->queue, &co));
}

void EventLoop_run(EventLoop* l) {
        AtomicBool_set(l->running, true);

        while (AtomicBool_get(l->running)) {
                Coroutine* co = NULL;
                if (SafeQueue_pop(l->queue, &co)) {
                        Coroutine_resume(co);
                        if (co->finished) {
                                if (co->parent != NULL) {
                                        EventLoop_resume(l, co->parent);
                                } else {
                                        delete_Coroutine(co);
                                }
                        }
                }
        }
}

void EventLoop_stop(EventLoop* l) {
        AtomicBool_set(l->running, false);
}

void EventLoop_submit(EventLoop* l, CoroutineFn fn) {
        Coroutine* co = new_Coroutine(fn, NULL);
        EventLoop_resume(l, co);
}

Coroutine* EventLoop_async(EventLoop* l, Task task) {
        Coroutine* c = new_Coroutine(task.fn, task.parent);
        c->yield_value = task.data;
        ThreadPool_submit(l->pool, c);
        return c;
}

void* EventLoop_await(EventLoop* l, Coroutine* co) {
        Coroutine_suspend(co->parent, co->parent->yield_value);
        void* result = co->yield_value;
        delete_Coroutine(co);
        return result;
}

Example

And here is the code of the example:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <time.h>

#include "loop.h"
#include "coroutine.h"

#define STR 128
typedef char string[STR];

static
EventLoop* loop;

void* get_image(Coroutine* co) {
        sleep(1);
        int r = rand();

        char buffer[16];
        sprintf(buffer, "%d", r);

        char* result = (char*)co->yield_value;
        strcat(result, "image id: ");
        strcat(result, buffer);

        return result;
}

void* download_images(Coroutine* co) {
        printf("[%lu] download image 1\n", pthread_self());
        string str1;
        Coroutine* c1 = EventLoop_async(loop, (Task){
                        .fn = get_image,
                        .parent = co,
                        .data = str1
                        });
        char* img1 = (char*)EventLoop_await(loop, c1);
        printf("[%lu] %s\n", pthread_self(), img1);

        printf("[%lu] download image 2\n", pthread_self());
        string str2;
        Coroutine* c2 = EventLoop_async(loop, (Task){
                        .fn = get_image,
                        .parent = co,
                        .data = str2
                        });
        char* img2 = (char*)EventLoop_await(loop, c2);
        printf("[%lu] %s\n", pthread_self(), img2);

        return 0;
}

void* say_hello(Coroutine* co) {
        printf("[%lu] Hello\n", pthread_self());
        return NULL;
}

void* run_cli(void* arg) {
        char* line = NULL;
        size_t len = 0;
        ssize_t read;
        while ((read = getline(&line, &len, stdin)) != -1) {
                if (read>2 && strncmp("quit", line, read-1) == 0) {
                        EventLoop_stop(loop);
                        break;
                } else if (read>2 && strncmp("download", line, read-1) == 0) {
                        EventLoop_submit(loop, download_images);
                } else {
                        EventLoop_submit(loop, say_hello);
                }
        }
        if (line != NULL) {
                free(line);
        }

        return NULL;
}

void test_eventLoop() {
        loop = new_EventLoop();

        pthread_t t;
        pthread_create(&t, NULL, run_cli, NULL);

        EventLoop_run(loop);

        delete_EventLoop(loop);

        pthread_join(t, NULL);
}

int main() {
        srand(time(NULL));

        test_eventLoop();

        return 0;
}

And the corresponding output:

[139925417781056] Hello
download
[139925417781056] download image 1
[139925417781056] image id: 648455896
[139925417781056] download image 2
[139925417781056] image id: 39422855

Conclusion

As you can see, the client code is much easier to read using async/await. And it also helps us to manage memory. In this example we are using an static allocation of memory. But we could also use dynamic memory allocation, and it would be easy to match allocation and deallocation of memory, as both of them could happen in the same function.

And that's all. I hope you have enjoyed this journey through the internals of a toy async/await implementation. It definitely helped me understand how its logic works, which is something that is not pretty straightforward the first time you find out about this pattern. Thank you very much if you made it till the end of this series. And I would very much appreciate any comment or suggestion that you may have about it. See you in the next post!