From 977d4fa2dad8d321cab39c18eba940451234d019 Mon Sep 17 00:00:00 2001 From: Jeremy Allison Date: Wed, 21 Dec 2011 20:38:32 -0800 Subject: [PATCH 1/5] Change the signature of pthreadpool_finished_job() to return 0 on success, errno on fail and return the jobid in a separate variable. I need this fix for my vfs_aio_pthread.c module. Autobuild-User: Jeremy Allison Autobuild-Date: Thu Dec 22 12:12:33 CET 2011 on sn-devel-104 (cherry picked from commit 711c18c2301d1bea35cac1144080a94e6b89be27) --- source3/lib/fncall.c | 3 +-- source3/lib/pthreadpool/pthreadpool.c | 9 +++++---- source3/lib/pthreadpool/pthreadpool.h | 5 +++-- source3/lib/pthreadpool/tests.c | 18 ++++++++++-------- 4 files changed, 19 insertions(+), 16 deletions(-) diff --git a/source3/lib/fncall.c b/source3/lib/fncall.c index 4a013e9..13bf093 100644 --- a/source3/lib/fncall.c +++ b/source3/lib/fncall.c @@ -280,8 +280,7 @@ static void fncall_handler(struct tevent_context *ev, struct tevent_fd *fde, int i, num_pending; int job_id; - job_id = pthreadpool_finished_job(ctx->pool); - if (job_id <= 0) { + if (pthreadpool_finished_job(ctx->pool, &job_id) != 0) { return; } diff --git a/source3/lib/pthreadpool/pthreadpool.c b/source3/lib/pthreadpool/pthreadpool.c index 7538fb7..c2dd92a 100644 --- a/source3/lib/pthreadpool/pthreadpool.c +++ b/source3/lib/pthreadpool/pthreadpool.c @@ -284,16 +284,16 @@ static void pthreadpool_join_children(struct pthreadpool *pool) * Fetch a finished job number from the signal pipe */ -int pthreadpool_finished_job(struct pthreadpool *pool) +int pthreadpool_finished_job(struct pthreadpool *pool, int *jobid) { - int result; + int ret_jobid; ssize_t nread; nread = -1; errno = EINTR; while ((nread == -1) && (errno == EINTR)) { - nread = read(pool->sig_pipe[0], &result, sizeof(int)); + nread = read(pool->sig_pipe[0], &ret_jobid, sizeof(int)); } if (nread == -1) { return errno; @@ -301,7 +301,8 @@ int pthreadpool_finished_job(struct pthreadpool *pool) if (nread != sizeof(int)) { return EINVAL; } - return result; + *jobid = ret_jobid; + return 0; } /* diff --git a/source3/lib/pthreadpool/pthreadpool.h b/source3/lib/pthreadpool/pthreadpool.h index 79704ea..0fde3c8 100644 --- a/source3/lib/pthreadpool/pthreadpool.h +++ b/source3/lib/pthreadpool/pthreadpool.h @@ -90,8 +90,9 @@ int pthreadpool_signal_fd(struct pthreadpool *pool); * pthreadpool_signal_fd() is readable. * * @param[in] pool The pool to query for finished jobs - * @return The job_id of the finished job + * @param[out] pjobid The job_id of the finished job + * @return success: 0, failure: errno */ -int pthreadpool_finished_job(struct pthreadpool *pool); +int pthreadpool_finished_job(struct pthreadpool *pool, int *jobid); #endif diff --git a/source3/lib/pthreadpool/tests.c b/source3/lib/pthreadpool/tests.c index 667ee01..95d37b6 100644 --- a/source3/lib/pthreadpool/tests.c +++ b/source3/lib/pthreadpool/tests.c @@ -68,12 +68,13 @@ static int test_jobs(int num_threads, int num_jobs) } for (i=0; i= num_jobs)) { - fprintf(stderr, "invalid job number %d\n", ret); + int jobid = -1; + ret = pthreadpool_finished_job(p, &jobid); + if ((ret != 0) || (jobid >= num_jobs)) { + fprintf(stderr, "invalid job number %d\n", jobid); return -1; } - finished[ret] += 1; + finished[jobid] += 1; } for (i=0; i= num_jobs * num_threads)) { + ret = pthreadpool_finished_job(pools[j], &jobid); + if ((ret != 0) || (jobid >= num_jobs * num_threads)) { fprintf(stderr, "invalid job number %d\n", - ret); + jobid); return -1; } - finished[ret] += 1; + finished[jobid] += 1; received += 1; } } -- 1.7.7.3 From 6d6541d8e9cf78cc4a6b210355890e4144b3a926 Mon Sep 17 00:00:00 2001 From: Jeremy Allison Date: Wed, 25 Jan 2012 14:11:12 -0800 Subject: [PATCH 2/5] Add man page for vfs_aio_pthread module. (cherry picked from commit d8c699190d2cc0ce64395c7b2b10bb25c98a2943) --- docs-xml/manpages-3/vfs_aio_pthread.8.xml | 118 +++++++++++++++++++++++++++++ 1 files changed, 118 insertions(+), 0 deletions(-) create mode 100644 docs-xml/manpages-3/vfs_aio_pthread.8.xml diff --git a/docs-xml/manpages-3/vfs_aio_pthread.8.xml b/docs-xml/manpages-3/vfs_aio_pthread.8.xml new file mode 100644 index 0000000..625d6f3 --- /dev/null +++ b/docs-xml/manpages-3/vfs_aio_pthread.8.xml @@ -0,0 +1,118 @@ + + + + + + vfs_aio_pthread + 8 + Samba + System Administration tools + 3.6 + + + + + vfs_aio_pthread + implement async I/O in Samba vfs using a pthread pool + + + + + vfs objects = aio_pthread + + + + + DESCRIPTION + + This VFS module is part of the + samba + 7 suite. + + The aio_pthread VFS module enables asynchronous + I/O for Samba on platforms which have the pthreads API available, + without using the Posix AIO interface. Posix AIO can suffer from severe + limitations. For example, on some Linux versions the + real-time signals that it uses are broken under heavy load. + Other systems only allow AIO when special kernel modules are + loaded or only allow a certain system-wide amount of async + requests being scheduled. Systems based on glibc (most Linux + systems) only allow a single outstanding request per file + descriptor which essentially makes Posix AIO useless on systems + using the glibc implementation. + + To work around all these limitations, the aio_pthread module + was written. It uses a pthread pool instead of the + internal Posix AIO interface to allow read and write calls + to be process asynchronously. A pthread pool is created + which expands dynamically by creating new threads as work is + given to it to a maximum of 100 threads per smbd process. + To change this limit see the "aio num threads" parameter + below. New threads are not created if idle threads are + available when a new read or write request is received, + the new work is given to an existing idle thread. Threads + terminate themselves if idle for one second. + + + + Note that the smb.conf parameters aio read size + and aio write size must also be set appropriately + for this module to be active. + + + This module MUST be listed last in any module stack as + the Samba VFS pread/pwrite interface is not thread-safe. This + module makes direct pread and pwrite system calls and does + NOT call the Samba VFS pread and pwrite interfaces. + + + + + + EXAMPLES + + Straight forward use: + + + + /data/ice + aio_fork + + + + + + OPTIONS + + + + + aio_pthread:aio num threads = INTEGER + + Limit the maximum number of threads per smbd that + will be created in the thread pool to service IO requests. + + By default this is set to 100. + + + + + + + VERSION + + This man page is correct for version 3.6.3 of the Samba suite. + + + + + AUTHOR + + The original Samba software and related utilities + were created by Andrew Tridgell. Samba is now developed + by the Samba Team as an Open Source project similar + to the way the Linux kernel is developed. + + + + -- 1.7.7.3 From f43ce80d9331de90dd523ba155832b0de3975e9a Mon Sep 17 00:00:00 2001 From: Jeremy Allison Date: Wed, 25 Jan 2012 16:27:54 -0800 Subject: [PATCH 3/5] Ensure we always free aio_ex on all error paths by moving the TALLOC_FREE call out of smbd_aio_complete_aio_ex() and into the caller. --- source3/modules/vfs_aio_fork.c | 1 + source3/smbd/aio.c | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source3/modules/vfs_aio_fork.c b/source3/modules/vfs_aio_fork.c index 41b5a89..7f6a021 100644 --- a/source3/modules/vfs_aio_fork.c +++ b/source3/modules/vfs_aio_fork.c @@ -434,6 +434,7 @@ static void handle_aio_completion(struct event_context *event_ctx, aio_ex = (struct aio_extra *)child->aiocb->aio_sigevent.sigev_value.sival_ptr; smbd_aio_complete_aio_ex(aio_ex); + TALLOC_FREE(aio_ex); } static int aio_child_destructor(struct aio_child *child) diff --git a/source3/smbd/aio.c b/source3/smbd/aio.c index e9d49ce..b0b90c0 100644 --- a/source3/smbd/aio.c +++ b/source3/smbd/aio.c @@ -66,6 +66,7 @@ static void smbd_aio_signal_handler(struct tevent_context *ev_ctx, info->si_value.sival_ptr; smbd_aio_complete_aio_ex(aio_ex); + TALLOC_FREE(aio_ex); } @@ -894,8 +895,6 @@ void smbd_aio_complete_aio_ex(struct aio_extra *aio_ex) if (!handle_aio_completed(aio_ex, &ret)) { return; } - - TALLOC_FREE(aio_ex); } /**************************************************************************** -- 1.7.7.3 From c4d07b81d0fe53b95d68d34cf8438531a184d24a Mon Sep 17 00:00:00 2001 From: Jeremy Allison Date: Wed, 25 Jan 2012 16:54:39 -0800 Subject: [PATCH 4/5] Add vfs_aio_pthread code. --- source3/Makefile.in | 5 + source3/configure.in | 4 + source3/modules/vfs_aio_pthread.c | 625 +++++++++++++++++++++++++++++++++++++ 3 files changed, 634 insertions(+), 0 deletions(-) create mode 100644 source3/modules/vfs_aio_pthread.c diff --git a/source3/Makefile.in b/source3/Makefile.in index 7e98db7..dc81c85 100644 --- a/source3/Makefile.in +++ b/source3/Makefile.in @@ -829,6 +829,7 @@ VFS_READAHEAD_OBJ = modules/vfs_readahead.o VFS_TSMSM_OBJ = modules/vfs_tsmsm.o VFS_FILEID_OBJ = modules/vfs_fileid.o VFS_AIO_FORK_OBJ = modules/vfs_aio_fork.o +VFS_AIO_PTHREAD_OBJ = modules/vfs_aio_pthread.o VFS_PREOPEN_OBJ = modules/vfs_preopen.o VFS_SYNCOPS_OBJ = modules/vfs_syncops.o VFS_ACL_XATTR_OBJ = modules/vfs_acl_xattr.o @@ -3029,6 +3030,10 @@ bin/aio_fork.@SHLIBEXT@: $(BINARY_PREREQS) $(VFS_AIO_FORK_OBJ) @echo "Building plugin $@" @$(SHLD_MODULE) $(VFS_AIO_FORK_OBJ) +bin/aio_pthread.@SHLIBEXT@: $(BINARY_PREREQS) $(VFS_AIO_PTHREAD_OBJ) + @echo "Building plugin $@" + @$(SHLD_MODULE) $(VFS_AIO_PTHREAD_OBJ) + bin/preopen.@SHLIBEXT@: $(BINARY_PREREQS) $(VFS_PREOPEN_OBJ) @echo "Building plugin $@" @$(SHLD_MODULE) $(VFS_PREOPEN_OBJ) diff --git a/source3/configure.in b/source3/configure.in index 398a4f8..d8d3a1f 100644 --- a/source3/configure.in +++ b/source3/configure.in @@ -6674,6 +6674,9 @@ if test x"$enable_pthreadpool" = x"yes" -a x"$samba_cv_HAVE_PTHREAD" = x"yes"; t AC_SUBST(PTHREADPOOL_OBJ, "lib/pthreadpool/pthreadpool.o") PTHREADPOOLTEST="bin/pthreadpooltest\$(EXEEXT)" AC_SUBST(PTHREADPOOLTEST) + if test x"$samba_cv_HAVE_AIO" = x"yes"; then + default_shared_modules="$default_shared_modules vfs_aio_pthread" + fi fi ################################################# @@ -6901,6 +6904,7 @@ SMB_MODULE(vfs_readahead, \$(VFS_READAHEAD_OBJ), "bin/readahead.$SHLIBEXT", VFS) SMB_MODULE(vfs_tsmsm, \$(VFS_TSMSM_OBJ), "bin/tsmsm.$SHLIBEXT", VFS) SMB_MODULE(vfs_fileid, \$(VFS_FILEID_OBJ), "bin/fileid.$SHLIBEXT", VFS) SMB_MODULE(vfs_aio_fork, \$(VFS_AIO_FORK_OBJ), "bin/aio_fork.$SHLIBEXT", VFS) +SMB_MODULE(vfs_aio_pthread, \$(VFS_AIO_PTHREAD_OBJ), "bin/aio_pthread.$SHLIBEXT", VFS) SMB_MODULE(vfs_preopen, \$(VFS_PREOPEN_OBJ), "bin/preopen.$SHLIBEXT", VFS) SMB_MODULE(vfs_syncops, \$(VFS_SYNCOPS_OBJ), "bin/syncops.$SHLIBEXT", VFS) SMB_MODULE(vfs_zfsacl, \$(VFS_ZFSACL_OBJ), "bin/zfsacl.$SHLIBEXT", VFS) diff --git a/source3/modules/vfs_aio_pthread.c b/source3/modules/vfs_aio_pthread.c new file mode 100644 index 0000000..ceef822 --- /dev/null +++ b/source3/modules/vfs_aio_pthread.c @@ -0,0 +1,625 @@ +/* + * Simulate Posix AIO using pthreads. + * + * Based on the aio_fork work from Volker and Volker's pthreadpool library. + * + * Copyright (C) Volker Lendecke 2008 + * Copyright (C) Jeremy Allison 2012 + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include "includes.h" +#include "system/filesys.h" +#include "system/shmem.h" +#include "smbd/smbd.h" +#include "lib/pthreadpool/pthreadpool.h" + +struct aio_extra; +static struct pthreadpool *pool; +static int aio_pthread_jobid; + +struct aio_private_data { + struct aio_private_data *prev, *next; + int jobid; + SMB_STRUCT_AIOCB *aiocb; + ssize_t ret_size; + int ret_errno; + bool cancelled; + bool write_command; +}; + +/* List of outstanding requests we have. */ +static struct aio_private_data *pd_list; + +static void aio_pthread_handle_completion(struct event_context *event_ctx, + struct fd_event *event, + uint16 flags, + void *p); + +/************************************************************************ + How many threads to initialize ? + 100 per process seems insane as a default until you realize that + (a) Threads terminate after 1 second when idle. + (b) Throttling is done in SMB2 via the crediting algorithm. + (c) SMB1 clients are limited to max_mux (50) outstanding requests and + Windows clients don't use this anyway. + Essentially we want this to be unlimited unless smb.conf says different. +***********************************************************************/ + +static int aio_get_num_threads(struct vfs_handle_struct *handle) +{ + return lp_parm_int(SNUM(handle->conn), + "aio_pthread", "aio num threads", 100); +} + +/************************************************************************ + Ensure thread pool is initialized. +***********************************************************************/ + +static bool init_aio_threadpool(struct vfs_handle_struct *handle) +{ + struct fd_event *sock_event = NULL; + int ret = 0; + int num_threads; + + if (pool) { + return true; + } + + num_threads = aio_get_num_threads(handle); + ret = pthreadpool_init(num_threads, &pool); + if (ret) { + errno = ret; + return false; + } + sock_event = tevent_add_fd(server_event_context(), + NULL, + pthreadpool_signal_fd(pool), + TEVENT_FD_READ, + aio_pthread_handle_completion, + NULL); + if (sock_event == NULL) { + pthreadpool_destroy(pool); + pool = NULL; + return false; + } + + DEBUG(10,("init_aio_threadpool: initialized with up to %d threads\n", + num_threads)); + + return true; +} + + +/************************************************************************ + Worker function - core of the pthread aio engine. + This is the function that actually does the IO. +***********************************************************************/ + +static void aio_worker(void *private_data) +{ + struct aio_private_data *pd = + (struct aio_private_data *)private_data; + + if (pd->write_command) { + pd->ret_size = sys_pwrite(pd->aiocb->aio_fildes, + (const void *)pd->aiocb->aio_buf, + pd->aiocb->aio_nbytes, + pd->aiocb->aio_offset); + if (pd->ret_size == -1 && errno == ESPIPE) { + /* Maintain the fiction that pipes can + be seeked (sought?) on. */ + pd->ret_size = sys_write(pd->aiocb->aio_fildes, + (const void *)pd->aiocb->aio_buf, + pd->aiocb->aio_nbytes); + } + } else { + pd->ret_size = sys_pread(pd->aiocb->aio_fildes, + (void *)pd->aiocb->aio_buf, + pd->aiocb->aio_nbytes, + pd->aiocb->aio_offset); + if (pd->ret_size == -1 && errno == ESPIPE) { + /* Maintain the fiction that pipes can + be seeked (sought?) on. */ + pd->ret_size = sys_read(pd->aiocb->aio_fildes, + (void *)pd->aiocb->aio_buf, + pd->aiocb->aio_nbytes); + } + } + if (pd->ret_size == -1) { + pd->ret_errno = errno; + } else { + pd->ret_errno = 0; + } +} + +/************************************************************************ + Private data destructor. +***********************************************************************/ + +static int pd_destructor(struct aio_private_data *pd) +{ + DLIST_REMOVE(pd_list, pd); + return 0; +} + +/************************************************************************ + Create and initialize a private data struct. +***********************************************************************/ + +static struct aio_private_data *create_private_data(TALLOC_CTX *ctx, + SMB_STRUCT_AIOCB *aiocb) +{ + struct aio_private_data *pd = talloc_zero(ctx, struct aio_private_data); + if (!pd) { + return NULL; + } + pd->jobid = aio_pthread_jobid++; + pd->aiocb = aiocb; + pd->ret_size = -1; + pd->ret_errno = EINPROGRESS; + talloc_set_destructor(pd, pd_destructor); + DLIST_ADD_END(pd_list, pd, struct aio_private_data *); + return pd; +} + +/************************************************************************ + Spin off a threadpool (if needed) and initiate a pread call. +***********************************************************************/ + +static int aio_pthread_read(struct vfs_handle_struct *handle, + struct files_struct *fsp, + SMB_STRUCT_AIOCB *aiocb) +{ + struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr; + struct aio_private_data *pd = NULL; + int ret; + + if (!init_aio_threadpool(handle)) { + return -1; + } + + pd = create_private_data(aio_ex, aiocb); + if (pd == NULL) { + DEBUG(10, ("aio_pthread_read: Could not create private data.\n")); + return -1; + } + + ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd); + if (ret) { + errno = ret; + return -1; + } + + DEBUG(10, ("aio_pthread_read: jobid=%d pread requested " + "of %llu bytes at offset %llu\n", + pd->jobid, + (unsigned long long)pd->aiocb->aio_nbytes, + (unsigned long long)pd->aiocb->aio_offset)); + + return 0; +} + +/************************************************************************ + Spin off a threadpool (if needed) and initiate a pwrite call. +***********************************************************************/ + +static int aio_pthread_write(struct vfs_handle_struct *handle, + struct files_struct *fsp, + SMB_STRUCT_AIOCB *aiocb) +{ + struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr; + struct aio_private_data *pd = NULL; + int ret; + + if (!init_aio_threadpool(handle)) { + return -1; + } + + pd = create_private_data(aio_ex, aiocb); + if (pd == NULL) { + DEBUG(10, ("aio_pthread_write: Could not create private data.\n")); + return -1; + } + + pd->write_command = true; + + ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd); + if (ret) { + errno = ret; + return -1; + } + + DEBUG(10, ("aio_pthread_write: jobid=%d pwrite requested " + "of %llu bytes at offset %llu\n", + pd->jobid, + (unsigned long long)pd->aiocb->aio_nbytes, + (unsigned long long)pd->aiocb->aio_offset)); + + return 0; +} + +/************************************************************************ + Find the private data by jobid. +***********************************************************************/ + +static struct aio_private_data *find_private_data_by_jobid(int jobid) +{ + struct aio_private_data *pd; + + for (pd = pd_list; pd != NULL; pd = pd->next) { + if (pd->jobid == jobid) { + return pd; + } + } + + return NULL; +} + +/************************************************************************ + Callback when an IO completes. +***********************************************************************/ + +static void aio_pthread_handle_completion(struct event_context *event_ctx, + struct fd_event *event, + uint16 flags, + void *p) +{ + struct aio_extra *aio_ex = NULL; + struct aio_private_data *pd = NULL; + int jobid = 0; + int ret; + + DEBUG(10, ("aio_pthread_handle_completion called with flags=%d\n", + (int)flags)); + + if ((flags & EVENT_FD_READ) == 0) { + return; + } + + ret = pthreadpool_finished_job(pool, &jobid); + if (ret) { + smb_panic("aio_pthread_handle_completion"); + return; + } + + pd = find_private_data_by_jobid(jobid); + if (pd == NULL) { + DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n", + jobid)); + return; + } + + aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr; + smbd_aio_complete_aio_ex(aio_ex); + + DEBUG(10,("aio_pthread_handle_completion: jobid %d completed\n", + jobid )); + TALLOC_FREE(aio_ex); +} + +/************************************************************************ + Find the private data by aiocb. +***********************************************************************/ + +static struct aio_private_data *find_private_data_by_aiocb(SMB_STRUCT_AIOCB *aiocb) +{ + struct aio_private_data *pd; + + for (pd = pd_list; pd != NULL; pd = pd->next) { + if (pd->aiocb == aiocb) { + return pd; + } + } + + return NULL; +} + +/************************************************************************ + Called to return the result of a completed AIO. + Should only be called if aio_error returns something other than EINPROGRESS. + Returns: + Any other value - return from IO operation. +***********************************************************************/ + +static ssize_t aio_pthread_return_fn(struct vfs_handle_struct *handle, + struct files_struct *fsp, + SMB_STRUCT_AIOCB *aiocb) +{ + struct aio_private_data *pd = find_private_data_by_aiocb(aiocb); + + if (pd == NULL) { + errno = EINVAL; + DEBUG(0, ("aio_pthread_return_fn: returning EINVAL\n")); + return -1; + } + + pd->aiocb = NULL; + + if (pd->ret_size == -1) { + errno = pd->ret_errno; + } + + return pd->ret_size; +} + +/************************************************************************ + Called to check the result of an AIO. + Returns: + EINPROGRESS - still in progress. + EINVAL - invalid aiocb. + ECANCELED - request was cancelled. + 0 - request completed successfully. + Any other value - errno from IO operation. +***********************************************************************/ + +static int aio_pthread_error_fn(struct vfs_handle_struct *handle, + struct files_struct *fsp, + SMB_STRUCT_AIOCB *aiocb) +{ + struct aio_private_data *pd = find_private_data_by_aiocb(aiocb); + + if (pd == NULL) { + return EINVAL; + } + if (pd->cancelled) { + return ECANCELED; + } + return pd->ret_errno; +} + +/************************************************************************ + Called to request the cancel of an AIO, or all of them on a specific + fsp if aiocb == NULL. +***********************************************************************/ + +static int aio_pthread_cancel(struct vfs_handle_struct *handle, + struct files_struct *fsp, + SMB_STRUCT_AIOCB *aiocb) +{ + struct aio_private_data *pd = NULL; + + for (pd = pd_list; pd != NULL; pd = pd->next) { + if (pd->aiocb == NULL) { + continue; + } + if (pd->aiocb->aio_fildes != fsp->fh->fd) { + continue; + } + if ((aiocb != NULL) && (pd->aiocb != aiocb)) { + continue; + } + + /* + * We let the child do its job, but we discard the result when + * it's finished. + */ + + pd->cancelled = true; + } + + return AIO_CANCELED; +} + +/************************************************************************ + Callback for a previously detected job completion. +***********************************************************************/ + +static void aio_pthread_handle_immediate(struct tevent_context *ctx, + struct tevent_immediate *im, + void *private_data) +{ + struct aio_extra *aio_ex = NULL; + int *pjobid = (int *)private_data; + struct aio_private_data *pd = find_private_data_by_jobid(*pjobid); + + if (pd == NULL) { + DEBUG(1, ("aio_pthread_handle_immediate cannot find jobid %d\n", + *pjobid)); + TALLOC_FREE(pjobid); + return; + } + + TALLOC_FREE(pjobid); + aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr; + smbd_aio_complete_aio_ex(aio_ex); + TALLOC_FREE(aio_ex); +} + +/************************************************************************ + Private data struct used in suspend completion code. +***********************************************************************/ + +struct suspend_private { + int num_entries; + int num_finished; + const SMB_STRUCT_AIOCB * const *aiocb_array; +}; + +/************************************************************************ + Callback when an IO completes from a suspend call. +***********************************************************************/ + +static void aio_pthread_handle_suspend_completion(struct event_context *event_ctx, + struct fd_event *event, + uint16 flags, + void *p) +{ + struct suspend_private *sp = (struct suspend_private *)p; + struct aio_private_data *pd = NULL; + struct tevent_immediate *im = NULL; + int *pjobid = NULL; + int i; + + DEBUG(10, ("aio_pthread_handle_suspend_completion called with flags=%d\n", + (int)flags)); + + if ((flags & EVENT_FD_READ) == 0) { + return; + } + + pjobid = talloc_array(NULL, int, 1); + if (pjobid == NULL) { + smb_panic("aio_pthread_handle_suspend_completion: no memory."); + } + + if (pthreadpool_finished_job(pool, pjobid)) { + smb_panic("aio_pthread_handle_suspend_completion: can't find job."); + return; + } + + pd = find_private_data_by_jobid(*pjobid); + if (pd == NULL) { + DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n", + *pjobid)); + TALLOC_FREE(pjobid); + return; + } + + /* Is this a jobid with an aiocb we're interested in ? */ + for (i = 0; i < sp->num_entries; i++) { + if (sp->aiocb_array[i] == pd->aiocb) { + sp->num_finished++; + TALLOC_FREE(pjobid); + return; + } + } + + /* Jobid completed we weren't waiting for. + We must reshedule this as an immediate event + on the main event context. */ + im = tevent_create_immediate(NULL); + if (!im) { + exit_server_cleanly("aio_pthread_handle_suspend_completion: no memory"); + } + + DEBUG(10,("aio_pthread_handle_suspend_completion: " + "re-scheduling job id %d\n", + *pjobid)); + + tevent_schedule_immediate(im, + server_event_context(), + aio_pthread_handle_immediate, + (void *)pjobid); +} + + +static void aio_pthread_suspend_timed_out(struct tevent_context *event_ctx, + struct tevent_timer *te, + struct timeval now, + void *private_data) +{ + bool *timed_out = (bool *)private_data; + /* Remove this timed event handler. */ + TALLOC_FREE(te); + *timed_out = true; +} + +/************************************************************************ + Called to request everything to stop until all IO is completed. +***********************************************************************/ + +static int aio_pthread_suspend(struct vfs_handle_struct *handle, + struct files_struct *fsp, + const SMB_STRUCT_AIOCB * const aiocb_array[], + int n, + const struct timespec *timeout) +{ + struct event_context *ev = NULL; + struct fd_event *sock_event = NULL; + int ret = -1; + struct suspend_private sp; + bool timed_out = false; + TALLOC_CTX *frame = talloc_stackframe(); + + /* This is a blocking call, and has to use a sub-event loop. */ + ev = event_context_init(frame); + if (ev == NULL) { + errno = ENOMEM; + goto out; + } + + if (timeout) { + struct timeval tv = convert_timespec_to_timeval(*timeout); + struct tevent_timer *te = tevent_add_timer(ev, + frame, + timeval_current_ofs(tv.tv_sec, + tv.tv_usec), + aio_pthread_suspend_timed_out, + &timed_out); + if (!te) { + errno = ENOMEM; + goto out; + } + } + + ZERO_STRUCT(sp); + sp.num_entries = n; + sp.aiocb_array = aiocb_array; + sp.num_finished = 0; + + sock_event = tevent_add_fd(ev, + frame, + pthreadpool_signal_fd(pool), + TEVENT_FD_READ, + aio_pthread_handle_suspend_completion, + (void *)&sp); + if (sock_event == NULL) { + pthreadpool_destroy(pool); + pool = NULL; + goto out; + } + /* + * We're going to cheat here. We know that smbd/aio.c + * only calls this when it's waiting for every single + * outstanding call to finish on a close, so just wait + * individually for each IO to complete. We don't care + * what order they finish - only that they all do. JRA. + */ + while (sp.num_entries != sp.num_finished) { + if (tevent_loop_once(ev) == -1) { + goto out; + } + + if (timed_out) { + errno = EAGAIN; + goto out; + } + } + + ret = 0; + + out: + + TALLOC_FREE(frame); + return ret; +} + +static struct vfs_fn_pointers vfs_aio_pthread_fns = { + .aio_read = aio_pthread_read, + .aio_write = aio_pthread_write, + .aio_return_fn = aio_pthread_return_fn, + .aio_cancel = aio_pthread_cancel, + .aio_error_fn = aio_pthread_error_fn, + .aio_suspend = aio_pthread_suspend, +}; + +NTSTATUS vfs_aio_pthread_init(void); +NTSTATUS vfs_aio_pthread_init(void) +{ + return smb_register_vfs(SMB_VFS_INTERFACE_VERSION, + "aio_pthread", &vfs_aio_pthread_fns); +} -- 1.7.7.3 From 83e3e1a2fef35415adc206d6900363d8a89e242f Mon Sep 17 00:00:00 2001 From: Jeremy Allison Date: Wed, 25 Jan 2012 17:17:48 -0800 Subject: [PATCH 5/5] Update man page to fix typo vfs_aio_fork -> vfs_aio_pthread, add aio read size, aio write size examples. (cherry picked from commit 12b614a9298974ba5daee7aa8d1aa47006de01e2) --- docs-xml/manpages-3/vfs_aio_pthread.8.xml | 4 +++- 1 files changed, 3 insertions(+), 1 deletions(-) diff --git a/docs-xml/manpages-3/vfs_aio_pthread.8.xml b/docs-xml/manpages-3/vfs_aio_pthread.8.xml index 625d6f3..3e41ee9 100644 --- a/docs-xml/manpages-3/vfs_aio_pthread.8.xml +++ b/docs-xml/manpages-3/vfs_aio_pthread.8.xml @@ -76,7 +76,9 @@ /data/ice - aio_fork + 1024 + 1024 + aio_pthread -- 1.7.7.3