Zig for C programmers - async/await - Part 3

Zig for C programmers - async/await - Part 3

Basic C implementation of an Event Loop

We saw on the part 2 of this series how to implement a basic Thread Pool. But there was a problem when we wanted to report values to the standard output from multiple threads, because there was no synchronization among them. And I mentioned that the solution for that would be the use of an Event Loop.

An Event Loop in a UI framework is a critical component responsible for managing and handling various events, such as user input (like mouse clicks or keyboard presses), system notifications, and other asynchronous operations. It ensures that the events are processes sequentially one by one, avoiding any overlapping among them.

In the case of asynchronous programming (i.e.: processing done by other threads), if we send the output generated by the worker threads back to the Event Loop, we are also able to sequence the processing of those outputs. Thus, avoiding any overlapping. The point being is that as an Event Loop is a single thread, we can use it as synchronization point whenever we need to sequence the processing of multiple sources of information.

One of the most famous Event Loops implementations is libuv, which is a library used for implementing an event-driven asynchronous I/O model in Node.js.

As you can see, every task in libuv is linked to a couple of callbacks. The work_cb, which is executed by the worker thread that processes that task. And the after_work_cb, which is executed by the Event Loop thread when it picks up a task from the done queue.

We can follow a similar approach of using a pair of callbacks for each event or task to be executed. But instead of using an input queue and a done queue, we can merge both of them together to simplify the implementation. So the user input events will be mixed with the done tasks in the same queue. This is just a simplification to keep the code shorter and easier to read in this example.

Starting from our basic Thread Pool implementation, we can develop a basic Event Loop on top of that as follows. It will just basically provide a run and stop functions to start and stop the Event Loop. But the key functions are submit and async.

typedef void (*TaskFn)(void*);

typedef struct Task {
        TaskFn cb;
        TaskFn done_cb;
        void*  data;
} Task;

typedef struct EventLoop EventLoop;

EventLoop* new_EventLoop();
void delete_EventLoop(EventLoop* l);

void EventLoop_submit(EventLoop* l, Task task);
void EventLoop_async(EventLoop* l, Task task);
void EventLoop_run(EventLoop* l);
void EventLoop_stop(EventLoop* l);

The submit function will insert a task into the done queue, for the Event Loop thread to process it. Both the user input thread, as well as the worker threads will use this function to insert tasks into that queue.

Whereas the async function will insert a task into the work queue, so that it gets picked up by a worker thread for its processing. This function will be called from the Event Loop thread.

typedef struct ThreadPool {
        SafeQueue*    queue;
        EventLoop*    loop;
        pthread_t*    threads;
        size_t        nthreads;
        AtomicBool*   running;
} ThreadPool;

static
void* ThreadPool_run(void* arg) {
        ThreadPool* p = (ThreadPool*) arg;

        while (AtomicBool_get(p->running)) {
                Task task;
                if (SafeQueue_pop(p->queue, &task)) {
                        task.cb(task.data);
                        EventLoop_submit(p->loop, task);
                }
        }

        return NULL;
}

static
void ThreadPool_start(ThreadPool* p) {
        AtomicBool_set(p->running, true);

        for (int i=0; i<p->nthreads; ++i) {
                pthread_create(&p->threads[i], NULL, ThreadPool_run, p);
        }
}

static
ThreadPool* new_ThreadPool(EventLoop* loop) {
        ThreadPool* p = calloc(1, sizeof(ThreadPool));

        p->nthreads = NUM_THREADS;
        p->queue    = new_SafeQueue(sizeof(Task), QUEUE_SIZE);
        p->threads  = calloc(p->nthreads, sizeof(pthread_t));
        p->loop = loop;
        p->running = new_AtomicBool();

        ThreadPool_start(p);

        return p;
}

static
void delete_ThreadPool(ThreadPool* p) {
        AtomicBool_set(p->running, false);

        for (int i=0; i<p->nthreads; ++i) {
                pthread_join(p->threads[i], NULL);
        }
        free(p->threads);

        delete_SafeQueue(p->queue);
        delete_AtomicBool(p->running);

        free(p);
}

static
void ThreadPool_submit(ThreadPool* p, Task task) {
        while(!SafeQueue_push(p->queue, &task));
}

struct EventLoop {
        ThreadPool* pool;
        SafeQueue*  queue;
        AtomicBool* running;
};

EventLoop* new_EventLoop() {
        EventLoop* l = calloc(1, sizeof(EventLoop));

        l->pool = new_ThreadPool(l);
        l->queue = new_SafeQueue(sizeof(Task), QUEUE_SIZE);
        l->running = new_AtomicBool();

        return l;
}

void delete_EventLoop(EventLoop* l) {
        delete_ThreadPool(l->pool);
        delete_SafeQueue(l->queue);
        delete_AtomicBool(l->running);
        free(l);
}

void EventLoop_run(EventLoop* l) {
        AtomicBool_set(l->running, true);

        while (AtomicBool_get(l->running)) {
                Task task;
                if (SafeQueue_pop(l->queue, &task)) {
                        task.done_cb(task.data);
                }
        }
}

void EventLoop_submit(EventLoop* l, Task task) {
        while (!SafeQueue_push(l->queue, &task));
}

void EventLoop_async(EventLoop* l, Task task) {
        ThreadPool_submit(l->pool, task);
}

void EventLoop_stop(EventLoop* l) {
        AtomicBool_set(l->running, false);
}

We can now code a simple command line example that exercises our implementation. It will accept a "download" command to simulate a long running job that we want to delegate to the worker threads for asynchronous programming. Meanwhile, if the user enters any other command (apart from "quit"), the application will respond with a "hello" message just to confirm that the Event Loop is able to keep processing user input events, while the long running tasks are being processed by the worker threads.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <time.h>

#include "loop.h"

#define STR 128

static
EventLoop* loop;

void get_image(void* data) {
        char* result = (char*)data;

        int r = rand();
        char buffer[16];
        sprintf(buffer, "%d", r);
        strcat(result, buffer);
}

void done_get_image(void* arg) {
        printf("[%lu] BEGIN done_get_image ", pthread_self());

        sleep(1);
        char* img = (char*)arg;
        printf("%s ", img);
        free(img);

        printf("[%lu] END done_get_image\n", pthread_self());
}

void download_img(void* arg) {
        char* str = calloc(STR, sizeof(char));
        printf("[%lu] download image\n", pthread_self());
        EventLoop_async(loop, (Task){
                        .cb = get_image,
                        .done_cb = done_get_image,
                        .data = str
                        });
}

void say_hello(void* arg) {
        printf("[%lu] Hello\n", pthread_self());
}

void* run_cli(void* arg) {
        char* line = NULL;
        size_t len = 0;
        ssize_t read;
        while ((read = getline(&line, &len, stdin)) != -1) {
                if (read>2 && strncmp("quit", line, read-1) == 0) {
                        EventLoop_stop(loop);
                        break;
                } else if (read>2 && strncmp("download", line, read-1) == 0) {
                        EventLoop_submit(loop, (Task){
                                        .cb = NULL,
                                        .done_cb = download_img,
                                        .data = NULL
                                        });
                } else {
                        EventLoop_submit(loop, (Task){
                                        .cb = NULL,
                                        .done_cb = say_hello,
                                        .data = NULL
                                        });
                }
        }
        if (line != NULL) {
                free(line);
        }

        return NULL;
}

void test_eventLoop() {
        loop = new_EventLoop();

        pthread_t t;
        pthread_create(&t, NULL, run_cli, NULL);

        EventLoop_run(loop);

        delete_EventLoop(loop);

        pthread_join(t, NULL);
}

int main() {
        srand(time(NULL));

        test_eventLoop();

        return 0;
}

Unlike the output that we got in the previous post, where the text generated by the worker threads could be overlapped. In this case we cannot run into a situation where the output of one task overlaps another one, because all of them are processed by the single Event Loop thread. Thus, we managed to sequence the processing of the tasks outputs.

[139901546895168] Hello

[139901546895168] Hello
do
[139901546895168] download image
do
[139901546895168] BEGIN done_get_image 1087146321 [139901546895168] END done_get_image
[139901546895168] download image
[139901546895168] BEGIN done_get_image 925167204 [139901546895168] END done_get_image

[139901546895168] Hello

However, as you can tell from this ugly code, we have run into something similar to the famous callback hell that we can see in the Javascript world. It is really cumbersome to chain asynchronous calls using this strategy.

void done_get_image_1(void* arg) {
        char* img = (char*)arg;
        printf("[%lu] %s\n", pthread_self(), img);
        free(img);

        // We have to chain another async call from this first callback
        char* str = calloc(STR, sizeof(char));
        printf("[%lu] download image 2\n", pthread_self());
        EventLoop_async(loop, (Task){
                        .cb = get_image,
                        .done_cb = done_get_image_2,
                        .data = str
                        });
}

And it is also very uncomfortable to track the allocation of dynamic memory in languages that do not support garbage collection.

void done_get_image_2(void* arg) {
        char* img = (char*)arg;
        printf("[%lu] %s\n", pthread_self(), img);

        // It is really difficult to track where this memory was allocated
        free(img);
}

Here is where the async/await ergonomics come in handy. Using them we can translate the callbacks chain into sequential code. That way it feels like we were writing synchronous code. Making it much easier for the reader to understand the logic of the program. And it also makes handling the deallocation of dynamic memory easier. This is specially valuable for languages like C or Zig, where there is no garbage collection. But we will see that in the last part of our series, where we will implement the async/await logic using all the pieces that we have been developing so far.