/* * gcc -Wall -O2 -D_GNU_SOURCE -o ucontext-cp ucontext-cp.c -luring */ #define _POSIX_C_SOURCE 199309L #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "liburing.h" #define QD 64 #define BS 1024 #define SIGSTKSZ 8192 typedef struct { struct io_uring *ring; unsigned char stack_buf[SIGSTKSZ]; ucontext_t ctx_main, ctx_fnew; } async_context; typedef struct { async_context *pctx; int *psuccess; int *pfailure; int infd; int outfd; } arguments_bundle; #define DEFINE_AWAIT_OP(operation) \ static ssize_t await_##operation( \ async_context *pctx, \ int fd, \ const struct iovec *ioves, \ unsigned int nr_vecs, \ off_t offset) \ { \ struct io_uring_sqe *sqe = io_uring_get_sqe(pctx->ring); \ struct io_uring_cqe *cqe; \ \ if (!sqe) \ return -1; \ \ io_uring_prep_##operation(sqe, fd, ioves, nr_vecs, offset); \ io_uring_sqe_set_data(sqe, pctx); \ swapcontext(&pctx->ctx_fnew, &pctx->ctx_main); \ io_uring_peek_cqe(pctx->ring, &cqe); \ assert(cqe); \ io_uring_cqe_seen(pctx->ring, cqe); \ \ return cqe->res; \ } DEFINE_AWAIT_OP(readv) DEFINE_AWAIT_OP(writev) #undef DEFINE_AWAIT_OP int await_poll(async_context *pctx, int fd, short poll_mask) { struct io_uring_sqe *sqe = io_uring_get_sqe(pctx->ring); struct io_uring_cqe *cqe; if (!sqe) return -1; io_uring_prep_poll_add(sqe, fd, poll_mask); io_uring_sqe_set_data(sqe, pctx); swapcontext(&pctx->ctx_fnew, &pctx->ctx_main); io_uring_peek_cqe(pctx->ring, &cqe); assert(cqe); io_uring_cqe_seen(pctx->ring, cqe); return cqe->res; } int await_delay(async_context *pctx, time_t seconds) { struct itimerspec exp = { .it_interval = {}, .it_value = { seconds, 0 }, }; int tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); if (tfd < 0) { perror("timerfd_create"); return -1; } if (timerfd_settime(tfd, 0, &exp, NULL)) { perror("timerfd_settime"); close(tfd); return -1; } int ret = await_poll(pctx, tfd, POLLIN); assert(ret == POLLIN); close(tfd); return 0; } static int setup_context(async_context *pctx, struct io_uring *ring) { int ret; pctx->ring = ring; ret = getcontext(&pctx->ctx_fnew); if (ret < 0) { perror("getcontext"); return -1; } pctx->ctx_fnew.uc_stack.ss_sp = &pctx->stack_buf; pctx->ctx_fnew.uc_stack.ss_size = sizeof(pctx->stack_buf); pctx->ctx_fnew.uc_link = &pctx->ctx_main; return 0; } static int copy_file(async_context *pctx, int infd, int outfd, struct iovec* piov) { off_t offset = 0; for (;;) { ssize_t bytes_read; printf("%d->%d: readv %ld bytes from %ld\n", infd, outfd, (long) piov->iov_len, (long) offset); if ((bytes_read = await_readv(pctx, infd, piov, 1, offset)) < 0) { perror("await_readv"); return 1; } if (bytes_read == 0) return 0; piov->iov_len = bytes_read; printf("%d->%d: writev %ld bytes from %ld\n", infd, outfd, (long) piov->iov_len, (long) offset); if (await_writev(pctx, outfd, piov, 1, offset) != bytes_read) { perror("await_writev"); return 1; } if (bytes_read < BS) return 0; offset += bytes_read; printf("%d->%d: wait %ds\n", infd, outfd, 1); await_delay(pctx, 1); } } static void copy_file_wrapper(arguments_bundle *pbundle) { struct iovec iov = { .iov_base = malloc(BS), .iov_len = BS, }; async_context *pctx = pbundle->pctx; int ret = copy_file(pctx, pbundle->infd, pbundle->outfd, &iov); printf("%d->%d: done with ret code %d\n", pbundle->infd, pbundle->outfd, ret); if (ret == 0) { ++*pbundle->psuccess; } else { ++*pbundle->pfailure; } free(iov.iov_base); close(pbundle->infd); close(pbundle->outfd); free(pbundle->pctx); free(pbundle); swapcontext(&pctx->ctx_fnew, &pctx->ctx_main); } int main(int argc, char *argv[]) { struct io_uring ring; int i, req_count, ret; int success = 0, failure = 0; if (argc < 3) { fprintf(stderr, "%s: infile1 outfile1 [infile2 outfile2 [...]]\n", argv[0]); return 1; } ret = io_uring_queue_init(QD, &ring, 0); if (ret < 0) { fprintf(stderr, "queue_init: %s\n", strerror(-ret)); return -1; } req_count = (argc - 1) / 2; printf("copying %d files...\n", req_count); for (i = 1; i < argc; i += 2) { int infd, outfd; async_context *pctx = malloc(sizeof(*pctx)); if (!pctx || setup_context(pctx, &ring)) return 1; infd = open(argv[i], O_RDONLY); if (infd < 0) { perror("open infile"); return 1; } outfd = open(argv[i + 1], O_WRONLY | O_CREAT | O_TRUNC, 0644); if (outfd < 0) { perror("open outfile"); return 1; } arguments_bundle *pbundle = malloc(sizeof(*pbundle)); pbundle->pctx = pctx; pbundle->psuccess = &success; pbundle->pfailure = &failure; pbundle->infd = infd; pbundle->outfd = outfd; makecontext(&pctx->ctx_fnew, (void (*)(void)) copy_file_wrapper, 1, pbundle); if (swapcontext(&pctx->ctx_main, &pctx->ctx_fnew)) { perror("swapcontext"); return 1; } } /* event loop */ while (success + failure < req_count) { struct io_uring_cqe *cqe; /* usually be timed waiting */ ret = io_uring_submit_and_wait(&ring, 1); if (ret < 0) { fprintf(stderr, "submit_and_wait: %s\n", strerror(-ret)); return 1; } ret = io_uring_wait_cqe(&ring, &cqe); if (ret < 0) { fprintf(stderr, "wait_cqe: %s\n", strerror(-ret)); return 1; } async_context *pctx = io_uring_cqe_get_data(cqe); if (swapcontext(&pctx->ctx_main, &pctx->ctx_fnew)) { perror("swapcontext"); return 1; } } io_uring_queue_exit(&ring); printf("finished with %d success(es) and %d failure(s)\n", success, failure); return failure > 0; }