From 4bf6469fe3869ffe7fd462fbb86690b6c4d27816 Mon Sep 17 00:00:00 2001 From: Jim Huang Date: Tue, 6 Apr 2021 19:41:24 +0800 Subject: [PATCH] Add concurrent programs: picosh, httpd, ringbuffer, mbus --- README.md | 13 + httpd/Makefile | 8 + httpd/httpd.c | 520 +++++++++++++++++++++++++++++++++++++ httpd/resources/index.html | 9 + mbus/Makefile | 9 + mbus/README.md | 79 ++++++ mbus/mbus.c | 250 ++++++++++++++++++ picosh/Makefile | 10 + picosh/README.md | 16 ++ picosh/picosh.c | 135 ++++++++++ ringbuffer/Makefile | 8 + ringbuffer/ringbuffer.c | 385 +++++++++++++++++++++++++++ 12 files changed, 1442 insertions(+) create mode 100644 README.md create mode 100644 httpd/Makefile create mode 100644 httpd/httpd.c create mode 100644 httpd/resources/index.html create mode 100644 mbus/Makefile create mode 100644 mbus/README.md create mode 100644 mbus/mbus.c create mode 100644 picosh/Makefile create mode 100644 picosh/README.md create mode 100644 picosh/picosh.c create mode 100644 ringbuffer/Makefile create mode 100644 ringbuffer/ringbuffer.c diff --git a/README.md b/README.md new file mode 100644 index 0000000..2944d7e --- /dev/null +++ b/README.md @@ -0,0 +1,13 @@ +# Complementary Programs for course "Linux Kernel Internals" + +## Project Listing +- [picosh](picosh/): A minimalist UNIX shell. +- [httpd](httpd/): A multi-threaded web server. +- [ringbuffer](ringbuffer/): A lock-less ring buffer. +- [mbus](mbus/): A concurrent message bus. + +## License + +The above projects are released under the BSD 2 clause license. +Use of this source code is governed by a BSD-style license that can be found +in the LICENSE file. diff --git a/httpd/Makefile b/httpd/Makefile new file mode 100644 index 0000000..b70a6c3 --- /dev/null +++ b/httpd/Makefile @@ -0,0 +1,8 @@ +httpd: httpd.c + gcc -Wall -Wextra -o httpd httpd.c -lpthread + +clean: + rm -f httpd + +indent: + clang-format -i httpd.c diff --git a/httpd/httpd.c b/httpd/httpd.c new file mode 100644 index 0000000..8998c3a --- /dev/null +++ b/httpd/httpd.c @@ -0,0 +1,520 @@ +/* System parameters */ +const int PORT = 9000; +#define BACKLOG 1024 +#define N_THREADS 24 * sysconf(_SC_NPROCESSORS_ONLN) +#define MAXMSG 1024 + +/* File parameters */ +const char *DOCUMENT_ROOT; +#define MAXPATH 1024 + +#include + +typedef struct __node { + int fd; + struct __node *next; +} node_t; + +typedef struct { + node_t *head, *tail; + pthread_mutex_t *head_lock, *tail_lock; /* guards head and tail */ + pthread_cond_t *non_empty; + int size; /* only used for connection timeout heuristic */ +} queue_t; + +#include +#include +#include + +/* Simple two-lock concurrent queue described in the paper: + * http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf + * The queue uses a head lock to protect concurrent dequeues and a tail lock + * to protect concurrent enqueues. Enqueue always succeeds, and their + * dequeue method may do nothing and return false if it sees that the queue + * may be empty. + */ +void queue_init(queue_t *q) +{ + node_t *dummy; + pthread_mutex_t *head_lock, *tail_lock; + pthread_cond_t *non_empty; + + if (!(dummy = malloc(sizeof(node_t)))) /* Out of memory */ + goto exit; + if (!(head_lock = malloc(sizeof(pthread_mutex_t)))) /* Out of memory */ + goto cleanup_dummy; + if (!(tail_lock = malloc(sizeof(pthread_mutex_t)))) /* Out of memory */ + goto cleanup_head_lock; + if (!(non_empty = malloc(sizeof(pthread_cond_t)))) /* Out of memory */ + goto cleanup_tail_lock; + if (pthread_mutex_init(head_lock, NULL) || + pthread_mutex_init(tail_lock, NULL)) /* Fail to initialize mutex */ + goto cleanup_non_empty; + + dummy->next = NULL; + q->head = (q->tail = dummy); + q->head_lock = head_lock, q->tail_lock = tail_lock; + q->non_empty = non_empty; + pthread_cond_init(q->non_empty, NULL); + + q->size = 0; + return; + +cleanup_non_empty: + free(non_empty); +cleanup_tail_lock: + free(tail_lock); +cleanup_head_lock: + free(head_lock); +cleanup_dummy: + free(dummy); +exit: + exit(1); +} + +static void enqueue(queue_t *q, int fd) +{ + /* Construct new node */ + node_t *node = malloc(sizeof(node_t)); + node->fd = fd, node->next = NULL; + + pthread_mutex_lock(q->tail_lock); + /* Add node to end of queue */ + q->tail->next = node; + q->tail = node; + q->size++; + + /* Wake any sleeping worker threads */ + pthread_cond_signal(q->non_empty); + pthread_mutex_unlock(q->tail_lock); +} + +static void dequeue(queue_t *q, int *fd) +{ + node_t *old_head; + pthread_mutex_lock(q->head_lock); + /* Wait until signaled that queue is non_empty. + * Need while loop in case a new thread manages to steal the queue + * element after the waiting thread is signaled, but before it can + * re-acquire head_lock. + */ + while (!q->head->next) /* i.e. q is empty */ + pthread_cond_wait(q->non_empty, q->head_lock); + + /* Store dequeued value and update dummy head */ + old_head = q->head; + *fd = old_head->next->fd; + q->head = q->head->next; + q->size--; + pthread_mutex_unlock(q->head_lock); + free(old_head); +} + +typedef int status_t; +enum { + STATUS_OK = 200, + STATUS_BAD_REQUEST = 400, + STATUS_FORBIDDEN = 403, + STATUS_NOT_FOUND = 404, + STATUS_REQUEST_TIMEOUT = 408, + STATUS_REQUEST_TOO_LARGE = 413, + STATUS_SERVER_ERROR = 500, +}; + +typedef enum { GET, HEAD } http_method_t; + +typedef enum { + APPLICATION, + AUDIO, + IMAGE, + MESSAGE, + MULTIPART, + TEXT, + VIDEO +} content_type_t; + +typedef struct { + http_method_t method; + char path[MAXPATH]; + content_type_t type; + int protocol_version; +} http_request_t; + +#include + +/* A collection of useful functions for parsing HTTP messages. + * See function parse_request for high-level control flow and work upward. + */ + +/* TRY_CATCH and TRY_CATCH_S are private macros that "throw" appropriate + * status codes whenever a parsing method encounters an error. By wrapping + * every parsing method call in a TRY_CATCH, errors may be piped up to the + * original parse_request call. The second TRY_CATCH_S macro is for specially + * translating the error outputs of the string.h function strsep into the + * BAD_REQUEST (400) status code. + */ +#define TRY_CATCH(STATEMENT) \ + do { \ + status_t s = (STATEMENT); \ + if (s != STATUS_OK) \ + return s; \ + } while (0) + +#define TRY_CATCH_S(STATEMENT) \ + do { \ + if (!(STATEMENT)) \ + return STATUS_BAD_REQUEST; \ + } while (0) + +static const char *type_to_str(const content_type_t type) +{ + switch (type) { + case APPLICATION: + return "application"; + case AUDIO: + return "audio"; + case IMAGE: + return "image"; + case MESSAGE: + return "message"; + case MULTIPART: + return "multipart"; + case TEXT: + return "text"; + case VIDEO: + return "video"; + default: + return NULL; + } +} + +static const char *status_to_str(status_t status) +{ + switch (status) { + case STATUS_OK: + return "OK"; + case STATUS_BAD_REQUEST: + return "Bad Request"; + case STATUS_FORBIDDEN: + return "Forbidden"; + case STATUS_NOT_FOUND: + return "Not Found"; + case STATUS_REQUEST_TIMEOUT: + return "Request Timeout"; + case STATUS_REQUEST_TOO_LARGE: + return "Request Entity Too Large"; + case STATUS_SERVER_ERROR: + default: + return "Internal Server Error"; + } +} + +/* Private utility method that acts like strsep(s," \t"), but also advances + * s so that it skips any additional whitespace. + */ +static char *strsep_whitespace(char **s) +{ + char *ret = strsep(s, " \t"); + while (*s && (**s == ' ' || **s == '\t')) + (*s)++; /* extra whitespace */ + return ret; +} + +/* Same as strsep_whitespace, but for newlines. */ +static char *strsep_newline(char **s) +{ + char *ret; + char *r = strchr(*s, '\r'); + char *n = strchr(*s, '\n'); + + if (!r || n < r) + ret = strsep(s, "\n"); + else { + ret = strsep(s, "\r"); + (*s)++; /* advance past the trailing \n */ + } + return ret; +} + +static status_t parse_method(char *token, http_request_t *request) +{ + if (strcmp(token, "GET") == 0) + request->method = GET; + else if (strcmp(token, "HEAD") == 0) + request->method = HEAD; + else + return STATUS_BAD_REQUEST; + return STATUS_OK; +} + +static status_t parse_path(char *token, http_request_t *request) +{ + if (strcmp(token, "/") == 0 || strcmp(token, "/index.html") == 0) { + snprintf(request->path, MAXPATH, "%s/index.html", DOCUMENT_ROOT); + request->type = TEXT; + } else /* FIXME: handle images files and other resources */ + return STATUS_NOT_FOUND; + return STATUS_OK; +} + +static status_t parse_protocol_version(char *token, http_request_t *request) +{ + if (!strcmp(token, "HTTP/1.0")) + request->protocol_version = 0; + else if (!strcmp(token, "HTTP/1.1")) + request->protocol_version = 1; + else + return STATUS_BAD_REQUEST; + return STATUS_OK; +} + +static status_t parse_initial_line(char *line, http_request_t *request) +{ + char *token; + TRY_CATCH_S(token = strsep_whitespace(&line)); + TRY_CATCH(parse_method(token, request)); + TRY_CATCH_S(token = strsep_whitespace(&line)); + TRY_CATCH(parse_path(token, request)); + TRY_CATCH_S(token = strsep_whitespace(&line)); + TRY_CATCH(parse_protocol_version(token, request)); + + return STATUS_OK; +} + +/* FIXME: Currently ignores any request headers */ +static status_t parse_header(char *line, http_request_t *request) +{ + (void) line, (void) request; + return STATUS_OK; +} + +static status_t parse_request(char *msg, http_request_t *request) +{ + char *line; + TRY_CATCH_S(line = strsep_newline(&msg)); + TRY_CATCH(parse_initial_line(line, request)); + while ((line = strsep_newline(&msg)) != NULL && *line != '\0') + TRY_CATCH(parse_header(line, request)); + + return STATUS_OK; +} + +#include +#include +#include +#include +#include +#include + +/* Some wrapper functions for listening socket initalization, + * where we may simply exit if we cannot set up the socket. + */ + +static int socket_(int domain, int type, int protocol) +{ + int sockfd; + if ((sockfd = socket(domain, type, protocol)) < 0) { + fprintf(stderr, "Socket error!\n"); + exit(1); + } + return sockfd; +} + +static int bind_(int socket, + const struct sockaddr *address, + socklen_t address_len) +{ + int ret; + if ((ret = bind(socket, address, address_len)) < 0) { + perror("bind_"); + exit(1); + } + return ret; +} + +static int listen_(int socket, int backlog) +{ + int ret; + if ((ret = listen(socket, backlog)) < 0) { + fprintf(stderr, "Listen error!\n"); + exit(1); + } + return ret; +} + +/* Initialize listening socket */ +static int listening_socket() +{ + struct sockaddr_in serveraddr; + memset(&serveraddr, 0, sizeof(serveraddr)); + serveraddr.sin_family = AF_INET; + serveraddr.sin_port = htons(PORT); + serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); + + int listenfd = socket_(AF_INET, SOCK_STREAM, 0); + bind_(listenfd, (struct sockaddr *) &serveraddr, sizeof(serveraddr)); + listen_(listenfd, BACKLOG); + return listenfd; +} + +static void *worker_routine(void *arg) +{ + pthread_detach(pthread_self()); + + int connfd, file, len, recv_bytes; + char msg[MAXMSG], buf[1024]; + status_t status; + http_request_t *request = malloc(sizeof(http_request_t)); + queue_t *q = (queue_t *) arg; + struct stat st; + + while (1) { + loopstart: + dequeue(q, &connfd); + memset(msg, 0, MAXMSG); + recv_bytes = 0; + + /* Loop until full HTTP msg is received */ + while (strstr(strndup(msg, recv_bytes), "\r\n\r\n") == NULL && + strstr(strndup(msg, recv_bytes), "\n\n") == NULL && + recv_bytes < MAXMSG) { + if ((len = recv(connfd, msg + recv_bytes, MAXMSG - recv_bytes, + 0)) <= 0) { + /* If client has closed, then close and move on */ + if (len == 0) { + close(connfd); + goto loopstart; + } + /* If timeout or error, skip parsing and send appropriate + * error message + */ + if (errno == EWOULDBLOCK) { + status = STATUS_REQUEST_TIMEOUT; + } else { + status = STATUS_SERVER_ERROR; + perror("recv"); + } + goto send; + } + recv_bytes += len; + } + + /* Parse (complete) message */ + status = parse_request(msg, request); + + send: + /* Send initial line */ + len = sprintf(msg, "HTTP/1.%d %d %s\r\n", request->protocol_version, + status, status_to_str(status)); + send(connfd, msg, len, 0); + + /* Send header lines */ + time_t now; + time(&now); + len = strftime(buf, 1024, "Date: %a, %d %b %Y %H:%M:%S GMT\r\n", + gmtime(&now)); + send(connfd, buf, len, 0); + if (status == STATUS_OK && request->method == GET) { + stat(request->path, &st); + len = sprintf(msg, "Content-Length: %d\r\n", (int) st.st_size); + send(connfd, msg, len, 0); + len = sprintf(msg, "Content-Type: %s\r\n", + type_to_str(request->type)); + send(connfd, msg, len, 0); + } + send(connfd, "\r\n", 2, 0); + + /* If request was well-formed GET, then send file */ + if (status == STATUS_OK && request->method == GET) { + if ((file = open(request->path, O_RDONLY)) < 0) + perror("open"); + while ((len = read(file, msg, MAXMSG)) > 0) + if (send(connfd, msg, len, 0) < 0) + perror("sending file"); + close(file); + } + + /* If HTTP/1.0 or recv error, close connection. */ + if (request->protocol_version == 0 || status != STATUS_OK) + close(connfd); + else /* Otherwise, keep connection alive and re-enqueue */ + enqueue(q, connfd); + } + return NULL; +} + +struct greeter_args { + int listfd; + queue_t *q; +}; + +void *greeter_routine(void *arg) +{ + struct greeter_args *ga = (struct greeter_args *) arg; + int listfd = ga->listfd; + queue_t *q = ga->q; + + struct sockaddr_in clientaddr; + struct timeval timeout; + + /* Accept connections, set their timeouts, and enqueue them */ + while (1) { + socklen_t clientlen = sizeof(clientaddr); + int connfd = + accept(listfd, (struct sockaddr *) &clientaddr, &clientlen); + if (connfd < 0) { + perror("accept"); + continue; + } + + /* Basic heuristic for timeout based on queue length. + * Minimum timeout 10s + another second for every 50 connections on the + * queue. + */ + int n = q->size; + timeout.tv_sec = 10; + if (n > 0) + timeout.tv_sec += n / 50; + setsockopt(connfd, SOL_SOCKET, SO_RCVTIMEO, (void *) &timeout, + sizeof(timeout)); + enqueue(q, connfd); + } +} + +int main() +{ + queue_t *connections; + pthread_t workers[N_THREADS / 2], greeters[N_THREADS / 2]; + + /* Get current working directory */ + char cwd[1024]; + const char *RESOURCES = "/resources"; + if (getcwd(cwd, sizeof(cwd) - sizeof(RESOURCES)) == NULL) + perror("getcwd"); + + /* Assign document root */ + DOCUMENT_ROOT = strcat(cwd, RESOURCES); + + /* Initalize connections queue */ + connections = malloc(sizeof(queue_t)); + queue_init(connections); + + /* Initialize listening socket */ + int listfd = listening_socket(); + + /* Package arguments for greeter threads */ + struct greeter_args ga = {.listfd = listfd, .q = connections}; + + /* Spawn greeter threads. */ + for (int i = 0; i < N_THREADS / 2; i++) + pthread_create(&greeters[i], NULL, greeter_routine, (void *) (&ga)); + + /* Spawn worker threads. These will immediately block until signaled by + * main server thread pushes connections onto the queue and signals. + */ + for (int i = 0; i < N_THREADS / 2; i++) + pthread_create(&workers[i], NULL, worker_routine, (void *) connections); + + pthread_exit(NULL); + return 0; +} diff --git a/httpd/resources/index.html b/httpd/resources/index.html new file mode 100644 index 0000000..c5e11bd --- /dev/null +++ b/httpd/resources/index.html @@ -0,0 +1,9 @@ + + + + This is the title of the webpage! + + +

This is an example paragraph. Anything in the body tag will appear on the page, just like this p tag and its contents.

+ + diff --git a/mbus/Makefile b/mbus/Makefile new file mode 100644 index 0000000..725d108 --- /dev/null +++ b/mbus/Makefile @@ -0,0 +1,9 @@ +all: + $(CC) -Wall -Wextra -o mbus mbus.c -lpthread -latomic \ + -Og -g3 -fsanitize=thread + +clean: + rm -f mbus + +indent: + clang-format -i mbus.c diff --git a/mbus/README.md b/mbus/README.md new file mode 100644 index 0000000..f0cc04f --- /dev/null +++ b/mbus/README.md @@ -0,0 +1,79 @@ +# mbus: concurrent message bus + +## Introduction + +`mbus` implements a concurrent data structure. Independent clients can register +their callback functions against this bus to receive messages directed to them. +Any user with a reference to the bus can send messages to a registered client +by just knowing the ID of the destination client. Users can also communicate +with all of these clients at once through broadcast messaging. + +Even though separate threads can register their own callbacks independently, +when Thread 1 sends Thread 2 a message, Thread 2's callback function is +executed in Thread 1's context. The only way to execute Thread 2's callback +function in that same thread would be to use some sort of low-level +interrupt, which might break the callee's execution flow. + +The following diagram illustrates this concept. +``` + +----------+ +-----------------+ + | Thread 1 | | callbackThread2 | + +----+-----+ +------------+----+ + | ^ | +bus_send(bus, | | +--- queue_push(ctx->queue, msg) + idThread2, | | + msg) | | + | +---------+ callbackThread2(ctxThread2, msg) + v | + +-----------------+-------------------+ + | mbus | + +-------------------------------------+ + ^ : : + | bus_register(bus, idThread2, + | : : callbackThread2, ctxThread2) + +-----+------+ : + | Thread 2 +--------------> queue_pop(ctx->queue) + +------------+ loops over + : : : + : : : Time + -------*----*------*-----------------------------------------------> +``` + +First, Thread 2 registers its callback function with `bus_register`. This +callback function is definitely user-defined, meaning that the user controls +what gets done when a message is sent to Thread 2. After some time, Thread 1 +needs to send a message to Thread 2, which is done through `bus_send`. This +redirects Thread 1's execution flow to Thread 2's callback function; as +mentioned above, this gets executed in Thread 1's context, meaning that the +callback function must handle the actual message passing to Thread 2. + +The simplest case is to push this message to Thread 2's task queue, but the +user could do something else, like changing a shared variable to signal +a condition. + +`mbus` essentially works as a synchronized callback table. This allows for +a very simple implementation while having an extendable usability. +By providing a generic callback API, clients can define specific behavior +to be invoked when a message is sent to them. + +## APIs + +`bus_send` has two modes of operation, individual and broadcast. Individual +calls a single client's registered callback. Broadcast does the same, but +for every registered client. + +If its callback is set (meaning that it is a registered client), it will be +called with the client's context and the message. The client's context is +a pointer to some opaque data that will be passed to the callback; it is +usually owned by whomever registered that callback. Reading the client is +atomic, meaning that there are no race conditions in which that client is +copied from memory at the same time as that client's information being +freed; in other words, `mbus` are guaranteed that right after calling +`__atomic_load`, client contains valid data. + +`bus_unregister` deletes a client's entry in the `bus->clients` array. +This way, no more messages can be sent to it, since the callback will be +set to `NULL`. + +The store is atomic, meaning that it cannot happen at the same time as the +load seen in `bus_send`. diff --git a/mbus/mbus.c b/mbus/mbus.c new file mode 100644 index 0000000..768e736 --- /dev/null +++ b/mbus/mbus.c @@ -0,0 +1,250 @@ +#include +#include +#include +#include +#include + +#define BUS_DEFAULT_CLIENTS 128 +#define BUS_MAX_CLIENTS UINT_MAX + +typedef unsigned int bus_client_id_t; +typedef void (*bus_client_cb_t)(void *ctx, void *msg); + +/* FIXME: rewrite with */ +#define CAS(dst, expected, value) \ + __atomic_compare_exchange(dst, expected, value, 0, __ATOMIC_SEQ_CST, \ + __ATOMIC_SEQ_CST) + +typedef struct { + bool registered; + unsigned int refcnt; + bus_client_cb_t callback; + void *ctx; +} bus_client_t; + +typedef struct { + bus_client_t *clients; + const unsigned int n_clients; +} bus_t; + +/* + * Allocate a new bus. If @n_clients is non-zero, it allocates space for + * specific number of clients; otherwise, it uses BUS_DEFAULT_CLIENTS. + * @n_clients can not be greater than BUS_MAX_CLIENTS. Returns true on success. + */ +bool __attribute__((warn_unused_result)) +bus_new(bus_t **bus, unsigned int n_clients) +{ + if (n_clients > BUS_MAX_CLIENTS) + return false; + + bus_t *b; + if (!(b = malloc(sizeof(bus_t)))) + return false; + + /* Initialize bus struct */ + *(unsigned int *) &b->n_clients = + !n_clients ? BUS_DEFAULT_CLIENTS : n_clients; + if (!(b->clients = calloc(b->n_clients, sizeof(bus_client_t)))) { + free(b); + return false; + } + + *bus = b; + return true; +} + +/* + * Register a new client with the specified @id. + * The ID must satisfy 0 <= ID < n_clients and not be in use; otherwise the + * function would fail. Whenever a message is sent to this client, @callback + * will be called. The first argument for @callback is the the user-supplied + * context, @ctx (can be ommitted by passing NULL). The second argument for + * @callback will be the received message. Returns true on success. + */ +bool __attribute__((warn_unused_result, nonnull(1))) +bus_register(bus_t *bus, + bus_client_id_t id, + bus_client_cb_t callback, + void *ctx) +{ + if (id >= bus->n_clients) + return false; + + bus_client_t null_client = {0}; + bus_client_t new_client = { + .registered = true, + .callback = callback, + .ctx = ctx, + .refcnt = 0, + }; + + return (bool) CAS(&(bus->clients[id]), &null_client, &new_client); +} + +/* + * Attempt to call a client's callback function to send a message. + * Might fail if such client gets unregistered while attempting to send message. + */ +static bool execute_client_callback(bus_client_t *client, void *msg) +{ + /* Load the client with which we are attempting to communicate. */ + bus_client_t local_client; + __atomic_load(client, &local_client, __ATOMIC_SEQ_CST); + + /* Loop until reference count isupdated or client becomes unregistered */ + while (local_client.registered) { + /* The expected reference count is the current one + 1 */ + bus_client_t new_client = local_client; + ++(new_client.refcnt); + + /* If CAS succeeds, the client had the expected reference count, and + * we updated it successfully. If CAS fails, the client was updated + * recently. The actual value is copied to local_client. + */ + if (CAS(client, &local_client, &new_client)) { + /* Send a message and decrease the reference count back */ + local_client.callback(local_client.ctx, msg); + __atomic_fetch_sub(&(client->refcnt), 1, __ATOMIC_SEQ_CST); + return true; + } + } + + /* Client was not registered or got unregistered while we attempted to send + * a message + */ + return false; +} + +/* + * If @broadcast is set to false, it sends a message to the client with the + * specified @id. If @broadcast is set to true, the message is sent to every + * registered client, and the supplied ID is ignored. Returns true on success. + */ +bool __attribute__((warn_unused_result, nonnull(1))) +bus_send(bus_t *bus, bus_client_id_t id, void *msg, bool broadcast) +{ + if (broadcast) { + for (id = 0; id < bus->n_clients; ++id) + execute_client_callback(&(bus->clients[id]), msg); + return true; + } + if (id >= bus->n_clients) + return false; + return execute_client_callback(&(bus->clients[id]), msg); +} + +/* + * Unregister the client with the specified @id. No additional can be made + * to the specified client. Returns true on success. + */ +bool __attribute__((warn_unused_result, nonnull(1))) +bus_unregister(bus_t *bus, bus_client_id_t id) +{ + if (id >= bus->n_clients) + return false; + + /* Load the client we are attempting to unregister */ + bus_client_t local_client, null_client = {0}; + __atomic_load(&(bus->clients[id]), &local_client, __ATOMIC_SEQ_CST); + + /* It was already unregistered */ + if (!local_client.registered) + return false; + + do { + local_client.refcnt = 0; /* the expected reference count */ + + /* If CAS succeeds, the client had refcnt = 0 and got unregistered. + * If CAS does not succeed, the value of the client gets copied into + * local_client. + */ + if (CAS(&(bus->clients[id]), &local_client, &null_client)) + return true; + } while (local_client.registered); + + /* Someone else unregistered this client */ + return true; +} + +/* Free the bus object */ +void bus_free(bus_t *bus) +{ + if (!bus) + return; + free(bus->clients); + free(bus); +} + +#include +#include +#include + +#define NUM_THREADS 4 + +/* Data passed to each thread */ +typedef struct { + bus_t *bus; + unsigned int id; +} thread_data_t; + +/* Function to be called by the bus for each new message */ +static void bus_callback(void *_ctx, void *_msg) +{ + unsigned int ctx = *(unsigned int *) _ctx, msg = *(unsigned int *) _msg; + printf("Callback for thread %u received: %u\n", ctx, msg); +} + +/* This funcion will be spawned NUM_THREADS times as a separate thread. */ +static void *thread_func(void *_data) +{ + thread_data_t *data = (thread_data_t *) _data; + bus_client_id_t dest = (data->id + 1) % NUM_THREADS; + + /* Register our callback */ + if (!bus_register(data->bus, data->id, &bus_callback, &(data->id))) { + perror("bus_register"); + return NULL; + } + printf("Registered callback from thread %u\n", data->id); + + /* Loop until the destination is registered from a separate thread */ + while (!bus_send(data->bus, dest, &(data->id), false)) + ; + + if (bus_unregister(data->bus, dest)) + return NULL; + + return NULL; +} + +int main() +{ + pthread_t threads[NUM_THREADS]; + thread_data_t ctx[NUM_THREADS]; + + bus_t *bus; + if (!bus_new(&bus, 0)) { + perror("bus_new"); + exit(EXIT_FAILURE); + } + + /* Launch threads, each with their own context containing a reference to the + * bus and their ID + */ + for (int i = 0; i < NUM_THREADS; ++i) { + ctx[i].bus = bus, ctx[i].id = i; + if (pthread_create(&threads[i], NULL, thread_func, &ctx[i])) + perror("pthread_create"); + } + + /* Wait until completion */ + for (int i = 0; i < NUM_THREADS; ++i) { + if (pthread_join(threads[i], NULL)) + perror("pthread_join"); + } + + bus_free(bus); + + return 0; +} diff --git a/picosh/Makefile b/picosh/Makefile new file mode 100644 index 0000000..f91e48f --- /dev/null +++ b/picosh/Makefile @@ -0,0 +1,10 @@ +picosh: picosh.o + $(CC) $^ -Wall -Wextra -std=c99 -o $@ + +clean: + rm -f picosh.o picosh + +indent: + clang-format -i picosh.c + +.PHONY: clean diff --git a/picosh/README.md b/picosh/README.md new file mode 100644 index 0000000..d0789e0 --- /dev/null +++ b/picosh/README.md @@ -0,0 +1,16 @@ +# picosh + +This is a tiny UNIX shell, implemented in C. + +The shell supports: +* Simple commands, i.e. `vim`, `echo hello world` etc. +* Pipelines, i.e. `ls | wc -l'. +* File redirection, i.e. `echo hello > x` and `cat < x | grep hello`. + +However, it does not support: +* `>>` append operator. +* `2>` or `2>&1` or anything more complex. +* `&`, although that should be trivial to add. +* Globs, variables, conditionals, loops, functions and it will never be a proper POSIX shell. + +Only a toy. Use and explore at your own risk. However, PRs are welcome for bugfixes, or if the additional functionality would not increase the complexity. diff --git a/picosh/picosh.c b/picosh/picosh.c new file mode 100644 index 0000000..0f1623d --- /dev/null +++ b/picosh/picosh.c @@ -0,0 +1,135 @@ +#include +#include +#include +#include +#include +#include + +/* Display prompt */ +static void prompt() +{ + write(2, "$ ", 2); +} + +/* Display error message, optionally - exit */ +static void fatal(int retval, int leave) +{ + if (retval >= 0) + return; + write(2, "?\n", 2); + if (leave) + exit(1); +} + +/* Helper functions to detect token class */ +static inline int is_delim(int c) +{ + return c == 0 || c == '|'; +} + +static inline int is_redir(int c) +{ + return c == '>' || c == '<'; +} + +static inline int is_blank(int c) +{ + return c == ' ' || c == '\t' || c == '\n'; +} + +static int is_special(int c) +{ + return is_delim(c) || is_redir(c) || is_blank(c); +} + +/* Recursively run right-most part of the command line printing output to the + * file descriptor @t + */ +static void run(char *c, int t) +{ + char *redir_stdin = NULL, *redir_stdout = NULL; + int pipefds[2] = {0, 0}, outfd = 0; + char *v[99] = {0}; + char **u = &v[98]; /* end of words */ + for (;;) { + c--; + if (is_delim(*c)) /* if NULL (start of string) or pipe: break */ + break; + if (!is_special(*c)) { + c++; /* Copy word of regular chars into previous u */ + *c = 0; /* null-terminator */ + for (; !is_special(*--c);) + ; + *--u = ++c; + } + if (is_redir(*c)) { /* If < or > */ + if (*c == '<') + redir_stdin = *u; + else + redir_stdout = *u; + if ((u - v) != 98) + u++; + } + } + if ((u - v) == 98) /* empty input */ + return; + + if (!strcmp(*u, "cd")) { /* built-in command: cd */ + fatal(chdir(u[1]), 0); + return; /* actually, should run() again */ + } + + if (*c) { + pipe(pipefds); + outfd = pipefds[1]; /* write end of the pipe */ + } + + pid_t pid = fork(); + if (pid) { /* Parent or error */ + fatal(pid, 1); + if (outfd) { + run(c, outfd); /* parse the rest of the cmdline */ + close(outfd); /* close output fd */ + close(pipefds[0]); /* close read end of the pipe */ + } + wait(0); + return; + } + + if (outfd) { + dup2(pipefds[0], 0); /* dup read fd to stdin */ + close(pipefds[0]); /* close read fd */ + close(outfd); /* close output */ + } + + if (redir_stdin) { + close(0); /* replace stdin with redir_stdin */ + fatal(open(redir_stdin, 0), 1); + } + + if (t) { + dup2(t, 1); /* replace stdout with t */ + close(t); + } + + if (redir_stdout) { + close(1); + fatal(creat(redir_stdout, 438), 1); /* replace stdout with redir */ + } + fatal(execvp(*u, u), 1); +} + +int main() +{ + while (1) { + prompt(); + char buf[512] = {0}; /* input buffer */ + char *c = buf; + if (!fgets(c + 1, sizeof(buf) - 1, stdin)) + exit(0); + for (; *++c;) /* skip to end of line */ + ; + run(c, 0); + } + return 0; +} diff --git a/ringbuffer/Makefile b/ringbuffer/Makefile new file mode 100644 index 0000000..ec41f45 --- /dev/null +++ b/ringbuffer/Makefile @@ -0,0 +1,8 @@ +all: + gcc -Wall -Wextra -o ringbuffer ringbuffer.c + +clean: + rm -f ringbuffer + +indent: + clang-format -i ringbuffer.c diff --git a/ringbuffer/ringbuffer.c b/ringbuffer/ringbuffer.c new file mode 100644 index 0000000..2829fc5 --- /dev/null +++ b/ringbuffer/ringbuffer.c @@ -0,0 +1,385 @@ +/** + * Ring buffer is a fixed-size queue, implemented as a table of + * pointers. Head and tail pointers are modified atomically, allowing + * concurrent access to it. It has the following features: + * - FIFO (First In First Out) + * - Maximum size is fixed; the pointers are stored in a table. + * - Lockless implementation. + * + * The ring buffer implementation is not preemptable. + */ + +#include +#include +#include +#include +#include +#include + +/* typically 64 bytes on x86/x64 CPUs */ +#define CACHE_LINE_SIZE 64 + +#ifndef __compiler_barrier +#define __compiler_barrier() \ + do { \ + asm volatile("" : : : "memory"); \ + } while (0) +#endif + +/** + * The producer and the consumer have a head and a tail index. The particularity + * of these index is that they are not between 0 and size(ring). These indexes + * are between 0 and 2^32, and we mask their value when we access the ring[] + * field. Thanks to this assumption, we can do subtractions between 2 index + * values in a modulo-32bit base: that is why the overflow of the indexes is not + * a problem. + */ +typedef struct { + struct { /** Ring producer status. */ + uint32_t watermark; /**< Maximum items before EDQUOT. */ + uint32_t size; /**< Size of ring. */ + uint32_t mask; /**< Mask (size - 1) of ring. */ + volatile uint32_t head, tail; /**< Producer head and tail. */ + } prod __attribute__((__aligned__(CACHE_LINE_SIZE))); + + struct { /** Ring consumer status. */ + uint32_t size; /**< Size of the ring. */ + uint32_t mask; /**< Mask (size - 1) of ring. */ + volatile uint32_t head, tail; /**< Consumer head and tail. */ + } cons __attribute__((__aligned__(CACHE_LINE_SIZE))); + + void *ring[] __attribute__((__aligned__(CACHE_LINE_SIZE))); +} ringbuf_t; + +/* true if x is a power of 2 */ +#define IS_POWEROF2(x) ((((x) -1) & (x)) == 0) +#define RING_SIZE_MASK (unsigned) (0x0fffffff) /**< Ring size mask */ +#define ALIGN_FLOOR(val, align) \ + (typeof(val))((val) & (~((typeof(val))((align) -1)))) + +/** + * Calculate the memory size needed for a ring buffer. + * + * This function returns the number of bytes needed for a ring buffer, given + * the number of elements in it. This value is the sum of the size of the + * structure ringbuf and the size of the memory needed by the objects pointers. + * The value is aligned to a cache line size. + * + * @param count + * The number of elements in the ring (must be a power of 2). + * @return + * - The memory size occupied by the ring on success. + * - -EINVAL if count is not a power of 2. + */ +ssize_t ringbuf_get_memsize(const unsigned count) +{ + /* Requested size is invalid, must be power of 2, and do not exceed the + * size limit RING_SIZE_MASK. + */ + if ((!IS_POWEROF2(count)) || (count > RING_SIZE_MASK)) + return -EINVAL; + + ssize_t sz = sizeof(ringbuf_t) + count * sizeof(void *); + sz = ALIGN_FLOOR(sz, CACHE_LINE_SIZE); + return sz; +} + +/** + * Initialize a ring buffer. + * + * The ring size is set to *count*, which must be a power of two. Water + * marking is disabled by default. The real usable ring size is (count - 1) + * instead of (count) to differentiate a free ring from an empty ring. + * + * @param r + * The pointer to the ring structure followed by the objects table. + * @param count + * The number of elements in the ring (must be a power of 2). + * @return + * 0 on success, or a negative value on error. + */ +int ringbuf_init(ringbuf_t *r, const unsigned count) +{ + memset(r, 0, sizeof(*r)); + r->prod.watermark = count, r->prod.size = r->cons.size = count; + r->prod.mask = r->cons.mask = count - 1; + r->prod.head = r->cons.head = 0, r->prod.tail = r->cons.tail = 0; + + return 0; +} + +/** + * Create a ring buffer. + * + * The real usable ring size is (count - 1) instead of (count) to + * differentiate a free ring from an empty ring. + * + * @param count + * The size of the ring (must be a power of 2). + * @return + * On success, the pointer to the new allocated ring. NULL on error with + * errno set appropriately. Possible errno values include: + * - EINVAL - count provided is not a power of 2 + * - ENOSPC - the maximum number of memzones has already been allocated + * - EEXIST - a memzone with the same name already exists + * - ENOMEM - no appropriate memory area found in which to create memzone + */ +ringbuf_t *ringbuf_create(const unsigned count) +{ + ssize_t ring_size = ringbuf_get_memsize(count); + if (ring_size < 0) + return NULL; + + ringbuf_t *r = malloc(ring_size); + if (r) + ringbuf_init(r, count); + return r; +} + +/** + * Release all memory used by the ring. + * + * @param r + * Ring to free + */ +void ringbuf_free(ringbuf_t *r) +{ + free(r); +} + +/* The actual enqueue of pointers on the ring. + * Placed here since identical code needed in both single- and multi- producer + * enqueue functions. + */ +#define ENQUEUE_PTRS() \ + do { \ + const uint32_t size = r->prod.size; \ + uint32_t i, idx = prod_head & mask; \ + if (idx + n < size) { \ + for (i = 0; i < (n & ((~(unsigned) 0x3))); i += 4, idx += 4) { \ + r->ring[idx] = obj_table[i]; \ + r->ring[idx + 1] = obj_table[i + 1]; \ + r->ring[idx + 2] = obj_table[i + 2]; \ + r->ring[idx + 3] = obj_table[i + 3]; \ + } \ + switch (n & 0x3) { \ + case 3: \ + r->ring[idx++] = obj_table[i++]; \ + case 2: \ + r->ring[idx++] = obj_table[i++]; \ + case 1: \ + r->ring[idx++] = obj_table[i++]; \ + } \ + } else { \ + for (i = 0; idx < size; i++, idx++) \ + r->ring[idx] = obj_table[i]; \ + for (idx = 0; i < n; i++, idx++) \ + r->ring[idx] = obj_table[i]; \ + } \ + } while (0) + +/* The actual copy of pointers on the ring to obj_table. + * Placed here since identical code needed in both single- and multi- consumer + * dequeue functions. + */ +#define DEQUEUE_PTRS() \ + do { \ + uint32_t idx = cons_head & mask; \ + uint32_t i, size = r->cons.size; \ + if (idx + n < size) { \ + for (i = 0; i < (n & (~(unsigned) 0x3)); i += 4, idx += 4) { \ + obj_table[i] = r->ring[idx]; \ + obj_table[i + 1] = r->ring[idx + 1]; \ + obj_table[i + 2] = r->ring[idx + 2]; \ + obj_table[i + 3] = r->ring[idx + 3]; \ + } \ + switch (n & 0x3) { \ + case 3: \ + obj_table[i++] = r->ring[idx++]; \ + case 2: \ + obj_table[i++] = r->ring[idx++]; \ + case 1: \ + obj_table[i++] = r->ring[idx++]; \ + } \ + } else { \ + for (i = 0; idx < size; i++, idx++) \ + obj_table[i] = r->ring[idx]; \ + for (idx = 0; i < n; i++, idx++) \ + obj_table[i] = r->ring[idx]; \ + } \ + } while (0) + +/** + * @internal Enqueue several objects on a ring buffer (NOT multi-producers + * safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @return + * - 0: Success; objects enqueue. + * - -EDQUOT: Quota exceeded. The objects have been enqueued, but the + * high water mark is exceeded. + * - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued. + */ +static inline int ringbuffer_sp_do_enqueue(ringbuf_t *r, + void *const *obj_table, + const unsigned n) +{ + uint32_t mask = r->prod.mask; + uint32_t prod_head = r->prod.head, cons_tail = r->cons.tail; + /* The subtraction is done between two unsigned 32-bits value (the result + * is always modulo 32 bits even if we have prod_head > cons_tail). So + * @free_entries is always between 0 and size(ring) - 1. + */ + uint32_t free_entries = mask + cons_tail - prod_head; + + /* check that we have enough room in ring */ + if ((n > free_entries)) + return -ENOBUFS; + + uint32_t prod_next = prod_head + n; + r->prod.head = prod_next; + + /* write entries in ring */ + ENQUEUE_PTRS(); + __compiler_barrier(); + + r->prod.tail = prod_next; + + /* if we exceed the watermark */ + return ((mask + 1) - free_entries + n) > r->prod.watermark ? -EDQUOT : 0; +} + +/** + * @internal Dequeue several objects from a ring buffer (NOT multi-consumers + * safe). When the request objects are more than the available objects, only + * dequeue the actual number of objects + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @return + * - 0: Success; objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue; no object is + * dequeued. + */ +static inline int ringbuffer_sc_do_dequeue(ringbuf_t *r, + void **obj_table, + const unsigned n) +{ + uint32_t mask = r->prod.mask; + uint32_t cons_head = r->cons.head, prod_tail = r->prod.tail; + /* The subtraction is done between two unsigned 32-bits value (the result + * is always modulo 32 bits even if we have cons_head > prod_tail). So + * @entries is always between 0 and size(ring) - 1. + */ + uint32_t entries = prod_tail - cons_head; + + if (n > entries) + return -ENOENT; + + uint32_t cons_next = cons_head + n; + r->cons.head = cons_next; + + /* copy in table */ + DEQUEUE_PTRS(); + __compiler_barrier(); + + r->cons.tail = cons_next; + return 0; +} + +/** + * Enqueue one object on a ring buffer (NOT multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj + * A pointer to the object to be added. + * @return + * - 0: Success; objects enqueued. + * - -EDQUOT: Quota exceeded. The objects have been enqueued, but the + * high water mark is exceeded. + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + */ +static inline int ringbuf_sp_enqueue(ringbuf_t *r, void *obj) +{ + return ringbuffer_sp_do_enqueue(r, &obj, 1); +} + +/** + * Dequeue one object from a ring buffer (NOT multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_p + * A pointer to a void * pointer (object) that will be filled. + * @return + * - 0: Success; objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue, no object is + * dequeued. + */ +static inline int ringbuf_sc_dequeue(ringbuf_t *r, void **obj_p) +{ + return ringbuffer_sc_do_dequeue(r, obj_p, 1); +} + +/** + * Test if a ring buffer is full. + * + * @param r + * A pointer to the ring structure. + * @return + * - true: The ring is full. + * - false: The ring is not full. + */ +static inline bool ringbuf_is_full(const ringbuf_t *r) +{ + uint32_t prod_tail = r->prod.tail, cons_tail = r->cons.tail; + return ((cons_tail - prod_tail - 1) & r->prod.mask) == 0; +} + +/** + * Test if a ring buffer is empty. + * + * @param r + * A pointer to the ring structure. + * @return + * - true: The ring is empty. + * - false: The ring is not empty. + */ +static inline bool ringbuf_is_empty(const ringbuf_t *r) +{ + uint32_t prod_tail = r->prod.tail, cons_tail = r->cons.tail; + return cons_tail == prod_tail; +} + +#include + +int main(void) +{ + ringbuf_t *r = ringbuf_create((1 << 6)); + if (!r) { + printf("Fail to create ring buffer.\n"); + return -1; + } + + for (int i = 0; !ringbuf_is_full(r); i++) + ringbuf_sp_enqueue(r, *(void **) &i); + + for (int i = 0; !ringbuf_is_empty(r); i++) { + void *obj; + ringbuf_sc_dequeue(r, &obj); + assert(i == *(int *) &obj); + } + + ringbuf_free(r); + return 0; +}