#define _GNU_SOURCE #include #include #include #include #include #define u8 unsigned char #define u32 unsigned int #define u64 unsigned long #define i32 int #define i64 long #define b32 u32 /* Size of a task's stack. 4mb is a bit large, probably should be decreased. There's still some code that depends on it being exactly 4mb though, so that will need to be updated first. */ #define STACK_SIZE 4194304l extern void switch_to_task(void *stack, i64 return_value); extern i64 switch_to_runtime(); struct Runtime { u64 *pending_tasks; u64 pending_task_count; u64 pending_task_cap; u64 pending_task_len; u64 pending_task_free_list; u64 uring_init; struct io_uring uring; void **free_tasks; u64 free_task_len; u64 free_task_cap; u64 next_stack_address; }; struct CpuState { u64 rsp; u64 rbx; u64 rbp; u64 r12; u64 r13; u64 r14; u64 r15; u64 runtime; // only used in task_state, not runtime_state }; /* The following are internal functions used by the async runtime. User-facing functions are defined below */ /* Get the base stack address of the current task. Must be called from a task. Getting the base of a normal thread's stack doesn't make any sense. */ u64 _async_task_stack() { u64 stack_pointer = (u64)__builtin_stack_address(); stack_pointer = stack_pointer & 0xffffffffff400000; return stack_pointer; } /* Get the runtime for the current task. Must be called from a task. Getting the runtime of a normal thread doesn't make any sense. */ struct Runtime* _get_runtime() { u64 stack = _async_task_stack(); stack = stack + STACK_SIZE - 8; return *((void**)stack); } /* Places current task on pending task list & returns an io request struct to prep an async syscall in. Call io_uring_submit and switch_to_runtime after setting up the uring request. */ void _async_prep_syscall(struct io_uring **ring, struct io_uring_sqe **request) { struct Runtime *runtime = _get_runtime(); u64 pending_slot_index = 0; if (0 != runtime->pending_task_free_list) { pending_slot_index = runtime->pending_task_free_list; runtime->pending_task_free_list = runtime->pending_tasks[pending_slot_index]; } else { if (runtime->pending_task_cap <= runtime->pending_task_len + 1) { u64 new_cap = runtime->pending_task_cap * 2; if (0 == new_cap) new_cap = 128; u64 new_cap_byte_count = new_cap * sizeof(void*); void *new_memory = 0; if (0 == runtime->pending_tasks) { new_memory = mmap(0, new_cap_byte_count, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, 0, 0); } else { new_memory = mremap(runtime->pending_tasks, runtime->pending_task_cap * sizeof(void*), new_cap_byte_count, 0); } assert(MAP_FAILED != new_memory); runtime->pending_tasks = new_memory; runtime->pending_task_cap = new_cap; } pending_slot_index = runtime->pending_task_len; runtime->pending_task_len += 1; } runtime->pending_tasks[pending_slot_index] = _async_task_stack(); if (!runtime->uring_init) { i64 result = io_uring_queue_init(32, &runtime->uring, 0); assert(0 == result); runtime->uring_init = true; } runtime->pending_task_count += 1; *ring = &runtime->uring; *request = io_uring_get_sqe(&runtime->uring); io_uring_sqe_set_data64(*request, pending_slot_index); } /* Spawn the async function 'entry' on 'runtime', passing it 'data' as an argument. */ void async_spawn(struct Runtime *runtime, void(*entry)(void*), void *data) { u64 *stack = 0; if (runtime->free_task_cap < 1 + runtime->free_task_len) { u64 new_cap = runtime->free_task_cap * 2; if (new_cap < 512) new_cap = 512; void *new_stack_vec; if (0 == runtime->free_tasks) { new_stack_vec = mmap(0, new_cap * sizeof(void*), PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, 0, 0); } else { new_stack_vec = mremap(runtime->free_tasks, runtime->free_task_cap, new_cap * sizeof(void*), 0); } assert(MAP_FAILED != new_stack_vec); runtime->free_tasks = new_stack_vec; runtime->free_task_cap = new_cap; } if (0 == runtime->free_task_len) { if (0 == runtime->next_stack_address) { runtime->next_stack_address = STACK_SIZE * 1000l; } void *stack_address = 0; while (0 == stack_address) { stack_address = mmap((void*)runtime->next_stack_address, STACK_SIZE, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS | MAP_FIXED_NOREPLACE, 0, 0); assert(errno == EEXIST || stack_address != MAP_FAILED); runtime->next_stack_address += STACK_SIZE; } runtime->free_tasks[runtime->free_task_len] = stack_address; runtime->free_task_len += 1; } runtime->free_task_len -= 1; void *stack_memory = runtime->free_tasks[runtime->free_task_len]; struct CpuState *state = (struct CpuState*)((u8*)stack_memory + STACK_SIZE - sizeof(struct CpuState)); memset(state, 0, sizeof(struct CpuState)); state->rsp = (u64)stack_memory + STACK_SIZE - (2 * sizeof(struct CpuState)) - 32; state->runtime = (u64)runtime; *((u64*)state->rsp + 1) = (u64)switch_to_runtime; *((u64*)state->rsp + 0) = (u64)entry; switch_to_task(stack_memory, (u64)data); } /* Progress a single task on the runtime. If block is true, will block until a task can be continued, then return after progressing that task. If block is false, will return immediately if there are no tasks to continue. Will return immediately if there are no tasks in the runtime at all, even if block is true. */ void async_tick(struct Runtime *runtime, b32 block) { if (!runtime->uring_init) return; if (runtime->pending_task_count <= 0) return; struct io_uring_cqe *completion = 0; if (block) { i64 result = io_uring_wait_cqe(&runtime->uring, &completion); assert(0 == result); } else { int result = io_uring_peek_cqe(&runtime->uring, &completion); if (-EAGAIN == result) return; } u64 syscall_result = completion->res; u64 task_index = io_uring_cqe_get_data64(completion); assert(0 <= task_index && task_index < runtime->pending_task_len); assert(0 != runtime->pending_tasks[task_index]); io_uring_cqe_seen(&runtime->uring, completion); u64 task = runtime->pending_tasks[task_index]; runtime->pending_tasks[task_index] = runtime->pending_task_free_list; runtime->pending_task_free_list = task_index; runtime->pending_task_count -= 1; switch_to_task((void*)task, syscall_result); } /* Read at most len bytes from fd at offset into buffer. Defers to other tasks until complete. */ i64 async_read(u32 fd, u64 offset, u8 *buffer, u64 len) { struct io_uring *ring; struct io_uring_sqe *request; _async_prep_syscall(&ring, &request); io_uring_prep_read(request, fd, buffer, len, offset); io_uring_submit(ring); return switch_to_runtime(); } /* Usage demo */ void task_entry(void *data) { printf("Task fd: %d\n", data); u8 buffer[128] = {0}; // The task will suspend here until the async i/o completes i64 result = async_read((u64)data, 0, buffer, sizeof(buffer)); assert(0 <= result); printf("Task read: %d\n", result); printf("Task data:\n%*s\n", result, buffer); } void main() { struct Runtime runtime = {0}; i64 fd = open("./build.sh", O_RDONLY); assert(-1 != fd); printf("Parent: spawn\n"); async_spawn(&runtime, task_entry, (void*)fd); async_spawn(&runtime, task_entry, (void*)fd); printf("Parent: tick\n"); async_tick(&runtime, true); printf("Parent: done\n"); async_tick(&runtime, true); }