From baac8d196cf72a6264f865ceb7fd93f0d903609b Mon Sep 17 00:00:00 2001 From: Jeremy Allison Date: Thu, 7 May 2020 18:35:14 -0700 Subject: [PATCH] WIP: io_uring short read fix. Signed-off-by: Jeremy Allison --- source3/modules/vfs_io_uring.c | 183 ++++++++++++++++++++++++++++++++- 1 file changed, 181 insertions(+), 2 deletions(-) diff --git a/source3/modules/vfs_io_uring.c b/source3/modules/vfs_io_uring.c index 378e48d112f..3a53a955a11 100644 --- a/source3/modules/vfs_io_uring.c +++ b/source3/modules/vfs_io_uring.c @@ -32,15 +32,24 @@ struct vfs_io_uring_request; struct vfs_io_uring_config { struct io_uring uring; + struct tevent_context *ev; struct tevent_fd *fde; + struct tevent_immediate *im; struct vfs_io_uring_request *queue; struct vfs_io_uring_request *pending; }; +enum vfs_io_uring_opcode { + VFS_IO_URING_PREAD, + VFS_IO_URING_PWRITE, + VFS_IO_URING_FSYNC +}; + struct vfs_io_uring_request { struct vfs_io_uring_request *prev, *next; struct vfs_io_uring_request **list_head; struct vfs_io_uring_config *config; + enum vfs_io_uring_opcode opcode; struct tevent_req *req; void *state; struct io_uring_sqe sqe; @@ -147,6 +156,10 @@ static void vfs_io_uring_fd_handler(struct tevent_context *ev, uint16_t flags, void *private_data); +static void vfs_io_uring_reschedule_run_handler(struct tevent_context *ctx, + struct tevent_immediate *im, + void *private_data); + static int vfs_io_uring_connect(vfs_handle_struct *handle, const char *service, const char *user) { @@ -216,9 +229,56 @@ static int vfs_io_uring_connect(vfs_handle_struct *handle, const char *service, return -1; } + config->ev = handle->conn->sconn->ev_ctx; + + config->im = tevent_create_immediate(config); + if (config->im == NULL) { + ret = errno; + SMB_VFS_NEXT_DISCONNECT(handle); + errno = ret; + return -1; + } return 0; } +static bool is_io_uring_pread_complete(struct vfs_io_uring_request *cur, + const struct io_uring_cqe *cqe); + +#if 0 +static bool is_io_uring_pwrite_complete(struct vfs_io_uring_request *cur, + const struct io_uring_cqe *cqe); +#endif + +/* + * Return true if it was a pread/pwrite and the bytes transferred + * are less than requested. + */ + +static bool vfs_io_uring_complete(struct vfs_io_uring_request *cur, + const struct io_uring_cqe *cqe) +{ + if (cur->opcode == VFS_IO_URING_PREAD) { + return is_io_uring_pread_complete(cur, cqe); + } +#if 0 + } else if (cur->opcode == VFS_IO_URING_PWRITE) { + return is_io_uring_pwrite_complete(cur, cqe); + } +#endif + return true; +} + +static void vfs_io_uring_pread_op_reschedule(struct vfs_io_uring_config *config, + struct vfs_io_uring_request *cur); + +static void vfs_io_uring_op_reschedule(struct vfs_io_uring_config *config, + struct vfs_io_uring_request *cur) +{ + if (cur->opcode == VFS_IO_URING_PREAD) { + vfs_io_uring_pread_op_reschedule(config, cur); + } +} + static void vfs_io_uring_queue_run(struct vfs_io_uring_config *config) { struct vfs_io_uring_request *cur = NULL, *next = NULL; @@ -228,6 +288,7 @@ static void vfs_io_uring_queue_run(struct vfs_io_uring_config *config) struct timespec start_time; struct timespec end_time; int ret; + bool need_resubmit = false; PROFILE_TIMESTAMP(&start_time); @@ -269,11 +330,34 @@ static void vfs_io_uring_queue_run(struct vfs_io_uring_config *config) io_uring_for_each_cqe(&config->uring, cqhead, cqe) { cur = (struct vfs_io_uring_request *)io_uring_cqe_get_data(cqe); - vfs_io_uring_finish_req(cur, cqe, end_time, __location__); + if (vfs_io_uring_complete(cur, cqe)) { + /* Done. */ + vfs_io_uring_finish_req(cur, cqe, end_time, __location__); + } else { + /* Take us off the pending list. */ + DLIST_REMOVE(config->pending, cur); + cur->list_head = NULL; + + vfs_io_uring_op_reschedule(config, cur); + + /* Put us back on the queued list. */ + DLIST_ADD_END(config->queue, cur); + cur->list_head = &config->queue; + + need_resubmit = true; + } nr++; } io_uring_cq_advance(&config->uring, nr); + + if (need_resubmit) { + /* This is essentially the same as GOTO top, but cleaner :-). */ + tevent_schedule_immediate(config->im, + config->ev, + vfs_io_uring_reschedule_run_handler, + config); + } } static void vfs_io_uring_fd_handler(struct tevent_context *ev, @@ -291,9 +375,25 @@ static void vfs_io_uring_fd_handler(struct tevent_context *ev, vfs_io_uring_queue_run(config); } +static void vfs_io_uring_reschedule_run_handler(struct tevent_context *ctx, + struct tevent_immediate *im, + void *private_data) +{ + struct vfs_io_uring_config *config = (struct vfs_io_uring_config *) + private_data; + + vfs_io_uring_queue_run(config); +} + struct vfs_io_uring_pread_state { struct vfs_io_uring_request ur; struct iovec iov; + /* We have to remember the original values in case of short read. */ + struct files_struct *fsp; + void *data; + size_t count; + ssize_t total_read; + off_t offset; }; static struct tevent_req *vfs_io_uring_pread_send(struct vfs_handle_struct *handle, @@ -319,6 +419,11 @@ static struct tevent_req *vfs_io_uring_pread_send(struct vfs_handle_struct *hand state->ur.config = config; state->ur.req = req; state->ur.state = state; + state->ur.opcode = VFS_IO_URING_PREAD; + state->fsp = fsp; + state->data = data; + state->count = n; + state->offset = offset; SMBPROFILE_BYTES_ASYNC_START(syscall_asys_pread, profile_p, state->ur.profile_bytes, n); @@ -326,6 +431,15 @@ static struct tevent_req *vfs_io_uring_pread_send(struct vfs_handle_struct *hand state->iov.iov_base = (void *)data; state->iov.iov_len = n; + +#if 0 +/* JRATEST */ + if (n > 4096) { + state->iov.iov_len -= 4096; + } +/* JRATEST */ +#endif + io_uring_prep_readv(&state->ur.sqe, fsp->fh->fd, &state->iov, 1, @@ -364,13 +478,76 @@ static ssize_t vfs_io_uring_pread_recv(struct tevent_req *req, ret = -1; } else { vfs_aio_state->error = 0; - ret = state->ur.cqe.res; + ret = state->total_read; } tevent_req_received(req); return ret; } +/* Returns false if more to read. Updates the total_read count. */ + +static bool is_io_uring_pread_complete(struct vfs_io_uring_request *cur, + const struct io_uring_cqe *cqe) +{ + struct tevent_req *req = talloc_get_type_abort(cur->req, + struct tevent_req); + struct vfs_io_uring_pread_state *state = tevent_req_data( + req, struct vfs_io_uring_pread_state); + + if (cqe->res < 0) { + /* Error. Deal with it as normal. */ + return true; + } + + if (cqe->res == 0) { + /* EOF. Deal with it as normal. */ + return true; + } + + state->total_read += cqe->res; + + if (state->total_read < state->count ) { + /* More needed. */ + DBG_DEBUG("short read, %p wanted %u at offset %u, got %u\n", + state->iov.iov_base, + (unsigned int)state->count, + (unsigned int)state->offset, + (unsigned int)state->total_read); + return false; + } + return true; +} + +/* Short read. Adjust the buffer, length and offsets and resubmit. */ +static void vfs_io_uring_pread_op_reschedule(struct vfs_io_uring_config *config, + struct vfs_io_uring_request *cur) +{ + struct tevent_req *req = talloc_get_type_abort(cur->req, + struct tevent_req); + struct vfs_io_uring_pread_state *state = tevent_req_data( + req, struct vfs_io_uring_pread_state); + off_t offset; + uint8_t *new_base = (uint8_t *)state->data + state->total_read; + + /* Set up the parameters for the remaining IO. */ + state->iov.iov_base = (void *)new_base; + state->iov.iov_len = state->count - state->total_read; + offset = state->offset + state->total_read; + + DBG_DEBUG("reschedule, %p get %u, at offset %u\n", + state->iov.iov_base, + (unsigned int)state->iov.iov_len, + (unsigned int)offset); + + io_uring_prep_readv(&cur->sqe, + state->fsp->fh->fd, + &state->iov, + 1, + offset); + io_uring_sqe_set_data(&cur->sqe, cur); +} + struct vfs_io_uring_pwrite_state { struct vfs_io_uring_request ur; struct iovec iov; @@ -399,6 +576,7 @@ static struct tevent_req *vfs_io_uring_pwrite_send(struct vfs_handle_struct *han state->ur.config = config; state->ur.req = req; state->ur.state = state; + state->ur.opcode = VFS_IO_URING_PWRITE; SMBPROFILE_BYTES_ASYNC_START(syscall_asys_pwrite, profile_p, state->ur.profile_bytes, n); @@ -476,6 +654,7 @@ static struct tevent_req *vfs_io_uring_fsync_send(struct vfs_handle_struct *hand state->ur.config = config; state->ur.req = req; state->ur.state = state; + state->ur.opcode = VFS_IO_URING_FSYNC; SMBPROFILE_BYTES_ASYNC_START(syscall_asys_fsync, profile_p, state->ur.profile_bytes, 0); -- 2.20.1