From 7acf891ba2a18aec8e36cdc6eeb81dd02d87a40a Mon Sep 17 00:00:00 2001 From: Poornima G Date: Wed, 24 Jul 2019 15:15:33 +0530 Subject: [PATCH] vfs_glusterfs: Use pthreadpool for scheduling aio operations BUG: https://bugzilla.samba.org/show_bug.cgi?id=14098 Signed-off-by: Poornima G Reviewed-by: Guenther Deschner Reviewed-by: Jeremy Allison Autobuild-User(master): Jeremy Allison Autobuild-Date(master): Fri Aug 23 18:40:08 UTC 2019 on sn-devel-184 (cherry picked from commit d8863dd8cb74bb0534457ca930a71e77c367d994) --- source3/modules/vfs_glusterfs.c | 562 +++++++++++++++++--------------- 1 file changed, 294 insertions(+), 268 deletions(-) diff --git a/source3/modules/vfs_glusterfs.c b/source3/modules/vfs_glusterfs.c index 483d28397f8..afb34b4b47c 100644 --- a/source3/modules/vfs_glusterfs.c +++ b/source3/modules/vfs_glusterfs.c @@ -45,14 +45,11 @@ #include "lib/util/sys_rw.h" #include "smbprofile.h" #include "modules/posixacl_xattr.h" +#include "lib/pthreadpool/pthreadpool_tevent.h" #define DEFAULT_VOLFILE_SERVER "localhost" #define GLUSTER_NAME_MAX 255 -static int read_fd = -1; -static int write_fd = -1; -static struct tevent_fd *aio_read_event = NULL; - /** * Helper to convert struct stat to struct stat_ex. */ @@ -713,326 +710,283 @@ static ssize_t vfs_gluster_pread(struct vfs_handle_struct *handle, return ret; } -struct glusterfs_aio_state; - -struct glusterfs_aio_wrapper { - struct glusterfs_aio_state *state; -}; - -struct glusterfs_aio_state { +struct vfs_gluster_pread_state { ssize_t ret; - struct tevent_req *req; - bool cancelled; + glfs_fd_t *fd; + void *buf; + size_t count; + off_t offset; + struct vfs_aio_state vfs_aio_state; - struct timespec start; SMBPROFILE_BYTES_ASYNC_STATE(profile_bytes); }; -static int aio_wrapper_destructor(struct glusterfs_aio_wrapper *wrap) -{ - if (wrap->state != NULL) { - wrap->state->cancelled = true; - } - - return 0; -} +static void vfs_gluster_pread_do(void *private_data); +static void vfs_gluster_pread_done(struct tevent_req *subreq); +static int vfs_gluster_pread_state_destructor(struct vfs_gluster_pread_state *state); -/* - * This function is the callback that will be called on glusterfs - * threads once the async IO submitted is complete. To notify - * Samba of the completion we use a pipe based queue. - */ -#ifdef HAVE_GFAPI_VER_7_6 -static void aio_glusterfs_done(glfs_fd_t *fd, ssize_t ret, - struct glfs_stat *prestat, - struct glfs_stat *poststat, - void *data) -#else -static void aio_glusterfs_done(glfs_fd_t *fd, ssize_t ret, void *data) -#endif +static struct tevent_req *vfs_gluster_pread_send(struct vfs_handle_struct + *handle, TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + files_struct *fsp, + void *data, size_t n, + off_t offset) { - struct glusterfs_aio_state *state = NULL; - int sts = 0; - struct timespec end; - - state = (struct glusterfs_aio_state *)data; + struct vfs_gluster_pread_state *state; + struct tevent_req *req, *subreq; - PROFILE_TIMESTAMP(&end); + glfs_fd_t *glfd = vfs_gluster_fetch_glfd(handle, fsp); + if (glfd == NULL) { + DBG_ERR("Failed to fetch gluster fd\n"); + return NULL; + } - if (ret < 0) { - state->ret = -1; - state->vfs_aio_state.error = errno; - } else { - state->ret = ret; + req = tevent_req_create(mem_ctx, &state, struct vfs_gluster_pread_state); + if (req == NULL) { + return NULL; } - state->vfs_aio_state.duration = nsec_time_diff(&end, &state->start); - SMBPROFILE_BYTES_ASYNC_END(state->profile_bytes); + state->ret = -1; + state->fd = glfd; + state->buf = data; + state->count = n; + state->offset = offset; - /* - * Write the state pointer to glusterfs_aio_state to the - * pipe, so we can call tevent_req_done() from the main thread, - * because tevent_req_done() is not designed to be executed in - * the multithread environment, so tevent_req_done() must be - * executed from the smbd main thread. - * - * write(2) on pipes with sizes under _POSIX_PIPE_BUF - * in size is atomic, without this, the use op pipes in this - * code would not work. - * - * sys_write is a thin enough wrapper around write(2) - * that we can trust it here. - */ + SMBPROFILE_BYTES_ASYNC_START(syscall_asys_pread, profile_p, + state->profile_bytes, n); + SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes); - sts = sys_write(write_fd, &state, sizeof(struct glusterfs_aio_state *)); - if (sts < 0) { - DEBUG(0,("\nWrite to pipe failed (%s)", strerror(errno))); + subreq = pthreadpool_tevent_job_send( + state, ev, handle->conn->sconn->pool, + vfs_gluster_pread_do, state); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); } + tevent_req_set_callback(subreq, vfs_gluster_pread_done, req); + + talloc_set_destructor(state, vfs_gluster_pread_state_destructor); - return; + return req; } -/* - * Read each req off the pipe and process it. - */ -static void aio_tevent_fd_done(struct tevent_context *event_ctx, - struct tevent_fd *fde, - uint16_t flags, void *data) +static void vfs_gluster_pread_do(void *private_data) { - struct tevent_req *req = NULL; - struct glusterfs_aio_state *state = NULL; - int sts = 0; + struct vfs_gluster_pread_state *state = talloc_get_type_abort( + private_data, struct vfs_gluster_pread_state); + struct timespec start_time; + struct timespec end_time; - /* - * read(2) on pipes is atomic if the needed data is available - * in the pipe, per SUS and POSIX. Because we always write - * to the pipe in sizeof(struct tevent_req *) chunks, we can - * always read in those chunks, atomically. - * - * sys_read is a thin enough wrapper around read(2) that we - * can trust it here. - */ + SMBPROFILE_BYTES_ASYNC_SET_BUSY(state->profile_bytes); - sts = sys_read(read_fd, &state, sizeof(struct glusterfs_aio_state *)); + PROFILE_TIMESTAMP(&start_time); - if (sts < 0) { - DEBUG(0,("\nRead from pipe failed (%s)", strerror(errno))); - } + do { +#ifdef HAVE_GFAPI_VER_7_6 + state->ret = glfs_pread(state->fd, state->buf, state->count, + state->offset, 0, NULL); +#else + state->ret = glfs_pread(state->fd, state->buf, state->count, + state->offset, 0); +#endif + } while ((state->ret == -1) && (errno == EINTR)); - /* if we've cancelled the op, there is no req, so just clean up. */ - if (state->cancelled == true) { - TALLOC_FREE(state); - return; + if (state->ret == -1) { + state->vfs_aio_state.error = errno; } - req = state->req; + PROFILE_TIMESTAMP(&end_time); - if (req) { - tevent_req_done(req); - } - return; + state->vfs_aio_state.duration = nsec_time_diff(&end_time, &start_time); + + SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes); } -static bool init_gluster_aio(struct vfs_handle_struct *handle) +static int vfs_gluster_pread_state_destructor(struct vfs_gluster_pread_state *state) { - int fds[2]; - int ret = -1; + return -1; +} - if (read_fd != -1) { +static void vfs_gluster_pread_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct vfs_gluster_pread_state *state = tevent_req_data( + req, struct vfs_gluster_pread_state); + int ret; + + ret = pthreadpool_tevent_job_recv(subreq); + TALLOC_FREE(subreq); + SMBPROFILE_BYTES_ASYNC_END(state->profile_bytes); + talloc_set_destructor(state, NULL); + if (ret != 0) { + if (ret != EAGAIN) { + tevent_req_error(req, ret); + return; + } /* - * Already initialized. + * If we get EAGAIN from pthreadpool_tevent_job_recv() this + * means the lower level pthreadpool failed to create a new + * thread. Fallback to sync processing in that case to allow + * some progress for the client. */ - return true; - } - - ret = pipe(fds); - if (ret == -1) { - goto fail; - } - - read_fd = fds[0]; - write_fd = fds[1]; - - aio_read_event = tevent_add_fd(handle->conn->sconn->ev_ctx, - NULL, - read_fd, - TEVENT_FD_READ, - aio_tevent_fd_done, - NULL); - if (aio_read_event == NULL) { - goto fail; + vfs_gluster_pread_do(state); } - return true; -fail: - TALLOC_FREE(aio_read_event); - if (read_fd != -1) { - close(read_fd); - close(write_fd); - read_fd = -1; - write_fd = -1; - } - return false; + tevent_req_done(req); } -static struct glusterfs_aio_state *aio_state_create(TALLOC_CTX *mem_ctx) +static ssize_t vfs_gluster_pread_recv(struct tevent_req *req, + struct vfs_aio_state *vfs_aio_state) { - struct tevent_req *req = NULL; - struct glusterfs_aio_state *state = NULL; - struct glusterfs_aio_wrapper *wrapper = NULL; + struct vfs_gluster_pread_state *state = tevent_req_data( + req, struct vfs_gluster_pread_state); - req = tevent_req_create(mem_ctx, &wrapper, struct glusterfs_aio_wrapper); - - if (req == NULL) { - return NULL; + if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) { + return -1; } - state = talloc_zero(NULL, struct glusterfs_aio_state); - - if (state == NULL) { - TALLOC_FREE(req); - return NULL; - } + *vfs_aio_state = state->vfs_aio_state; + return state->ret; +} - talloc_set_destructor(wrapper, aio_wrapper_destructor); - state->cancelled = false; - state->req = req; +struct vfs_gluster_pwrite_state { + ssize_t ret; + glfs_fd_t *fd; + const void *buf; + size_t count; + off_t offset; - wrapper->state = state; + struct vfs_aio_state vfs_aio_state; + SMBPROFILE_BYTES_ASYNC_STATE(profile_bytes); +}; - return state; -} +static void vfs_gluster_pwrite_do(void *private_data); +static void vfs_gluster_pwrite_done(struct tevent_req *subreq); +static int vfs_gluster_pwrite_state_destructor(struct vfs_gluster_pwrite_state *state); -static struct tevent_req *vfs_gluster_pread_send(struct vfs_handle_struct +static struct tevent_req *vfs_gluster_pwrite_send(struct vfs_handle_struct *handle, TALLOC_CTX *mem_ctx, struct tevent_context *ev, files_struct *fsp, - void *data, size_t n, + const void *data, size_t n, off_t offset) { - struct glusterfs_aio_state *state = NULL; - struct tevent_req *req = NULL; - int ret = 0; - glfs_fd_t *glfd = vfs_gluster_fetch_glfd(handle, fsp); + struct tevent_req *req, *subreq; + struct vfs_gluster_pwrite_state *state; + glfs_fd_t *glfd = vfs_gluster_fetch_glfd(handle, fsp); if (glfd == NULL) { DBG_ERR("Failed to fetch gluster fd\n"); return NULL; } - state = aio_state_create(mem_ctx); - - if (state == NULL) { + req = tevent_req_create(mem_ctx, &state, struct vfs_gluster_pwrite_state); + if (req == NULL) { return NULL; } - req = state->req; + state->ret = -1; + state->fd = glfd; + state->buf = data; + state->count = n; + state->offset = offset; - if (!init_gluster_aio(handle)) { - tevent_req_error(req, EIO); - return tevent_req_post(req, ev); - } - - /* - * aio_glusterfs_done and aio_tevent_fd_done() - * use the raw tevent context. We need to use - * tevent_req_defer_callback() in order to - * use the event context we're started with. - */ - tevent_req_defer_callback(req, ev); - - SMBPROFILE_BYTES_ASYNC_START(syscall_asys_pread, profile_p, + SMBPROFILE_BYTES_ASYNC_START(syscall_asys_pwrite, profile_p, state->profile_bytes, n); - PROFILE_TIMESTAMP(&state->start); - ret = glfs_pread_async(glfd, data, n, offset, 0, aio_glusterfs_done, - state); - if (ret < 0) { - tevent_req_error(req, -ret); + SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes); + + subreq = pthreadpool_tevent_job_send( + state, ev, handle->conn->sconn->pool, + vfs_gluster_pwrite_do, state); + if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } + tevent_req_set_callback(subreq, vfs_gluster_pwrite_done, req); + + talloc_set_destructor(state, vfs_gluster_pwrite_state_destructor); return req; } -static struct tevent_req *vfs_gluster_pwrite_send(struct vfs_handle_struct - *handle, TALLOC_CTX *mem_ctx, - struct tevent_context *ev, - files_struct *fsp, - const void *data, size_t n, - off_t offset) +static void vfs_gluster_pwrite_do(void *private_data) { - struct glusterfs_aio_state *state = NULL; - struct tevent_req *req = NULL; - int ret = 0; - glfs_fd_t *glfd = vfs_gluster_fetch_glfd(handle, fsp); - - if (glfd == NULL) { - DBG_ERR("Failed to fetch gluster fd\n"); - return NULL; - } + struct vfs_gluster_pwrite_state *state = talloc_get_type_abort( + private_data, struct vfs_gluster_pwrite_state); + struct timespec start_time; + struct timespec end_time; - state = aio_state_create(mem_ctx); + SMBPROFILE_BYTES_ASYNC_SET_BUSY(state->profile_bytes); - if (state == NULL) { - return NULL; - } + PROFILE_TIMESTAMP(&start_time); - req = state->req; + do { +#ifdef HAVE_GFAPI_VER_7_6 + state->ret = glfs_pwrite(state->fd, state->buf, state->count, + state->offset, 0, NULL, NULL); +#else + state->ret = glfs_pwrite(state->fd, state->buf, state->count, + state->offset, 0); +#endif + } while ((state->ret == -1) && (errno == EINTR)); - if (!init_gluster_aio(handle)) { - tevent_req_error(req, EIO); - return tevent_req_post(req, ev); + if (state->ret == -1) { + state->vfs_aio_state.error = errno; } - /* - * aio_glusterfs_done and aio_tevent_fd_done() - * use the raw tevent context. We need to use - * tevent_req_defer_callback() in order to - * use the event context we're started with. - */ - tevent_req_defer_callback(req, ev); + PROFILE_TIMESTAMP(&end_time); - SMBPROFILE_BYTES_ASYNC_START(syscall_asys_pwrite, profile_p, - state->profile_bytes, n); - PROFILE_TIMESTAMP(&state->start); - ret = glfs_pwrite_async(glfd, data, n, offset, 0, aio_glusterfs_done, - state); - if (ret < 0) { - tevent_req_error(req, -ret); - return tevent_req_post(req, ev); - } + state->vfs_aio_state.duration = nsec_time_diff(&end_time, &start_time); - return req; + SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes); } -static ssize_t vfs_gluster_recv(struct tevent_req *req, - struct vfs_aio_state *vfs_aio_state) +static int vfs_gluster_pwrite_state_destructor(struct vfs_gluster_pwrite_state *state) { - struct glusterfs_aio_wrapper *wrapper = NULL; - int ret = 0; + return -1; +} - wrapper = tevent_req_data(req, struct glusterfs_aio_wrapper); +static void vfs_gluster_pwrite_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct vfs_gluster_pwrite_state *state = tevent_req_data( + req, struct vfs_gluster_pwrite_state); + int ret; - if (wrapper == NULL) { - return -1; + ret = pthreadpool_tevent_job_recv(subreq); + TALLOC_FREE(subreq); + SMBPROFILE_BYTES_ASYNC_END(state->profile_bytes); + talloc_set_destructor(state, NULL); + if (ret != 0) { + if (ret != EAGAIN) { + tevent_req_error(req, ret); + return; + } + /* + * If we get EAGAIN from pthreadpool_tevent_job_recv() this + * means the lower level pthreadpool failed to create a new + * thread. Fallback to sync processing in that case to allow + * some progress for the client. + */ + vfs_gluster_pwrite_do(state); } - if (wrapper->state == NULL) { - return -1; - } + tevent_req_done(req); +} + +static ssize_t vfs_gluster_pwrite_recv(struct tevent_req *req, + struct vfs_aio_state *vfs_aio_state) +{ + struct vfs_gluster_pwrite_state *state = tevent_req_data( + req, struct vfs_gluster_pwrite_state); if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) { return -1; } - *vfs_aio_state = wrapper->state->vfs_aio_state; - ret = wrapper->state->ret; + *vfs_aio_state = state->vfs_aio_state; - /* Clean up the state, it is in a NULL context. */ - - TALLOC_FREE(wrapper->state); - - return ret; + return state->ret; } static ssize_t vfs_gluster_pwrite(struct vfs_handle_struct *handle, @@ -1113,60 +1067,132 @@ static int vfs_gluster_rename(struct vfs_handle_struct *handle, return ret; } +struct vfs_gluster_fsync_state { + ssize_t ret; + glfs_fd_t *fd; + + struct vfs_aio_state vfs_aio_state; + SMBPROFILE_BYTES_ASYNC_STATE(profile_bytes); +}; + +static void vfs_gluster_fsync_do(void *private_data); +static void vfs_gluster_fsync_done(struct tevent_req *subreq); +static int vfs_gluster_fsync_state_destructor(struct vfs_gluster_fsync_state *state); + static struct tevent_req *vfs_gluster_fsync_send(struct vfs_handle_struct *handle, TALLOC_CTX *mem_ctx, struct tevent_context *ev, files_struct *fsp) { - struct tevent_req *req = NULL; - struct glusterfs_aio_state *state = NULL; - int ret = 0; - glfs_fd_t *glfd = vfs_gluster_fetch_glfd(handle, fsp); + struct tevent_req *req, *subreq; + struct vfs_gluster_fsync_state *state; + glfs_fd_t *glfd = vfs_gluster_fetch_glfd(handle, fsp); if (glfd == NULL) { DBG_ERR("Failed to fetch gluster fd\n"); return NULL; } - state = aio_state_create(mem_ctx); - - if (state == NULL) { + req = tevent_req_create(mem_ctx, &state, struct vfs_gluster_fsync_state); + if (req == NULL) { return NULL; } - req = state->req; + state->ret = -1; + state->fd = glfd; - if (!init_gluster_aio(handle)) { - tevent_req_error(req, EIO); + SMBPROFILE_BYTES_ASYNC_START(syscall_asys_fsync, profile_p, + state->profile_bytes, 0); + SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes); + + subreq = pthreadpool_tevent_job_send( + state, ev, handle->conn->sconn->pool, vfs_gluster_fsync_do, state); + if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } + tevent_req_set_callback(subreq, vfs_gluster_fsync_done, req); - /* - * aio_glusterfs_done and aio_tevent_fd_done() - * use the raw tevent context. We need to use - * tevent_req_defer_callback() in order to - * use the event context we're started with. - */ - tevent_req_defer_callback(req, ev); + talloc_set_destructor(state, vfs_gluster_fsync_state_destructor); - SMBPROFILE_BYTES_ASYNC_START(syscall_asys_fsync, profile_p, - state->profile_bytes, 0); - PROFILE_TIMESTAMP(&state->start); - ret = glfs_fsync_async(glfd, aio_glusterfs_done, state); - if (ret < 0) { - tevent_req_error(req, -ret); - return tevent_req_post(req, ev); - } return req; } +static void vfs_gluster_fsync_do(void *private_data) +{ + struct vfs_gluster_fsync_state *state = talloc_get_type_abort( + private_data, struct vfs_gluster_fsync_state); + struct timespec start_time; + struct timespec end_time; + + SMBPROFILE_BYTES_ASYNC_SET_BUSY(state->profile_bytes); + + PROFILE_TIMESTAMP(&start_time); + + do { +#ifdef HAVE_GFAPI_VER_7_6 + state->ret = glfs_fsync(state->fd, NULL, NULL); +#else + state->ret = glfs_fsync(state->fd); +#endif + } while ((state->ret == -1) && (errno == EINTR)); + + if (state->ret == -1) { + state->vfs_aio_state.error = errno; + } + + PROFILE_TIMESTAMP(&end_time); + + state->vfs_aio_state.duration = nsec_time_diff(&end_time, &start_time); + + SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes); +} + +static int vfs_gluster_fsync_state_destructor(struct vfs_gluster_fsync_state *state) +{ + return -1; +} + +static void vfs_gluster_fsync_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct vfs_gluster_fsync_state *state = tevent_req_data( + req, struct vfs_gluster_fsync_state); + int ret; + + ret = pthreadpool_tevent_job_recv(subreq); + TALLOC_FREE(subreq); + SMBPROFILE_BYTES_ASYNC_END(state->profile_bytes); + talloc_set_destructor(state, NULL); + if (ret != 0) { + if (ret != EAGAIN) { + tevent_req_error(req, ret); + return; + } + /* + * If we get EAGAIN from pthreadpool_tevent_job_recv() this + * means the lower level pthreadpool failed to create a new + * thread. Fallback to sync processing in that case to allow + * some progress for the client. + */ + vfs_gluster_fsync_do(state); + } + + tevent_req_done(req); +} + static int vfs_gluster_fsync_recv(struct tevent_req *req, struct vfs_aio_state *vfs_aio_state) { - /* - * Use implicit conversion ssize_t->int - */ - return vfs_gluster_recv(req, vfs_aio_state); + struct vfs_gluster_fsync_state *state = tevent_req_data( + req, struct vfs_gluster_fsync_state); + + if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) { + return -1; + } + + *vfs_aio_state = state->vfs_aio_state; + return state->ret; } static int vfs_gluster_stat(struct vfs_handle_struct *handle, @@ -1862,10 +1888,10 @@ static struct vfs_fn_pointers glusterfs_fns = { .close_fn = vfs_gluster_close, .pread_fn = vfs_gluster_pread, .pread_send_fn = vfs_gluster_pread_send, - .pread_recv_fn = vfs_gluster_recv, + .pread_recv_fn = vfs_gluster_pread_recv, .pwrite_fn = vfs_gluster_pwrite, .pwrite_send_fn = vfs_gluster_pwrite_send, - .pwrite_recv_fn = vfs_gluster_recv, + .pwrite_recv_fn = vfs_gluster_pwrite_recv, .lseek_fn = vfs_gluster_lseek, .sendfile_fn = vfs_gluster_sendfile, .recvfile_fn = vfs_gluster_recvfile, -- 2.21.0