From 294484cd044c27e3580653caf73915c828f2c8a8 Mon Sep 17 00:00:00 2001 From: Amitay Isaacs Date: Wed, 4 Jan 2017 16:20:17 +1100 Subject: [PATCH 1/5] ctdb-common: Simplify async computation for sock_socket_write_send/recv This is now just a wrapper around comm_write_send/recv. This avoids the extra tevent_req and fixes a bug in the termination of sock_socket_write computation. BUG: https://bugzilla.samba.org/show_bug.cgi?id=12500 Signed-off-by: Amitay Isaacs Reviewed-by: Martin Schwenke (cherry picked from commit d9370550a7f72ff02c296ef61bd7f86645167378) --- ctdb/common/sock_daemon.c | 56 +++++------------------------------------------ 1 file changed, 6 insertions(+), 50 deletions(-) diff --git a/ctdb/common/sock_daemon.c b/ctdb/common/sock_daemon.c index dc5dba0..ca4086d 100644 --- a/ctdb/common/sock_daemon.c +++ b/ctdb/common/sock_daemon.c @@ -420,77 +420,33 @@ static bool sock_socket_start_recv(struct tevent_req *req, int *perr) * Send message to a client */ -struct sock_socket_write_state { - int status; -}; - -static void sock_socket_write_done(struct tevent_req *subreq); - struct tevent_req *sock_socket_write_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct sock_client_context *client_ctx, uint8_t *buf, size_t buflen) { - struct tevent_req *req, *subreq; - struct sock_socket_write_state *state; - - req = tevent_req_create(mem_ctx, &state, - struct sock_socket_write_state); - if (req == NULL) { - return NULL; - } + struct tevent_req *req; - subreq = comm_write_send(state, ev, client_ctx->comm, buf, buflen); - if (tevent_req_nomem(subreq, req)) { - return tevent_req_post(req, ev); - } - tevent_req_set_callback(subreq, sock_socket_write_done, req); + req = comm_write_send(mem_ctx, ev, client_ctx->comm, buf, buflen); return req; } -static void sock_socket_write_done(struct tevent_req *subreq) +bool sock_socket_write_recv(struct tevent_req *req, int *perr) { - struct tevent_req *req = tevent_req_callback_data( - subreq, struct tevent_req); - struct sock_socket_write_state *state = tevent_req_data( - req, struct sock_socket_write_state); int ret; bool status; - status = comm_write_recv(subreq, &ret); - TALLOC_FREE(subreq); + status = comm_write_recv(req, &ret); if (! status) { - state->status = ret; - return; - } -} - -bool sock_socket_write_recv(struct tevent_req *req, int *perr) -{ - struct sock_socket_write_state *state = tevent_req_data( - req, struct sock_socket_write_state); - int ret; - - if (tevent_req_is_unix_error(req, &ret)) { if (perr != NULL) { *perr = ret; } - return false; } - if (state->status != 0) { - if (perr != NULL) { - *perr = state->status; - } - return false; - } - - if (perr != NULL) { - *perr = 0; - } - return true; + return status; } + /* * Socket daemon */ -- 2.9.3 From df6220f64596eeb62682d932c285d8263145db32 Mon Sep 17 00:00:00 2001 From: Amitay Isaacs Date: Thu, 5 Jan 2017 00:47:11 +1100 Subject: [PATCH 2/5] ctdb-tests: Add another test for sock_daemon BUG: https://bugzilla.samba.org/show_bug.cgi?id=12500 Signed-off-by: Amitay Isaacs Reviewed-by: Martin Schwenke (cherry picked from commit ebc60b2accbbb1586351d246d0bcc6a2dd528911) --- ctdb/tests/cunit/sock_daemon_test_001.sh | 23 ++- ctdb/tests/src/sock_daemon_test.c | 275 ++++++++++++++++++++++++++++++- 2 files changed, 290 insertions(+), 8 deletions(-) diff --git a/ctdb/tests/cunit/sock_daemon_test_001.sh b/ctdb/tests/cunit/sock_daemon_test_001.sh index 9555cdd..036b6ac 100755 --- a/ctdb/tests/cunit/sock_daemon_test_001.sh +++ b/ctdb/tests/cunit/sock_daemon_test_001.sh @@ -24,21 +24,42 @@ result_filter () ok <len == sizeof(struct test6_pkt)); + assert(pkt->data == 0xffeeddcc); + + state->done = true; +} + +static void test6_client(const char *sockpath) +{ + struct tevent_context *ev; + struct test6_client_state state; + struct sock_queue *queue; + struct test6_pkt pkt; + int conn, ret; + + ev = tevent_context_init(NULL); + assert(ev != NULL); + + conn = sock_connect(sockpath); + assert(conn != -1); + + state.done = false; + + queue = sock_queue_setup(ev, ev, conn, + test6_client_callback, &state); + assert(queue != NULL); + + pkt.len = 8; + pkt.data = 0xaabbccdd; + + ret = sock_queue_write(queue, (uint8_t *)&pkt, + sizeof(struct test6_pkt)); + assert(ret == 0); + + while (! state.done) { + tevent_loop_once(ev); + } + + talloc_free(ev); +} + +struct test6_server_state { + struct sock_daemon_context *sockd; + int done; +}; + +struct test6_read_state { + struct test6_server_state *server_state; + struct test6_pkt reply; +}; + +static void test6_read_done(struct tevent_req *subreq); + +static struct tevent_req *test6_read_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct sock_client_context *client, + uint8_t *buf, size_t buflen, + void *private_data) +{ + struct test6_server_state *server_state = + (struct test6_server_state *)private_data; + struct tevent_req *req, *subreq; + struct test6_read_state *state; + struct test6_pkt *pkt; + + req = tevent_req_create(mem_ctx, &state, struct test6_read_state); + assert(req != NULL); + + state->server_state = server_state; + + assert(buflen == sizeof(struct test6_pkt)); + + pkt = (struct test6_pkt *)buf; + assert(pkt->data == 0xaabbccdd); + + state->reply.len = sizeof(struct test6_pkt); + state->reply.data = 0xffeeddcc; + + subreq = sock_socket_write_send(state, ev, client, + (uint8_t *)&state->reply, + state->reply.len); + assert(subreq != NULL); + + tevent_req_set_callback(subreq, test6_read_done, req); + + return req; +} + +static void test6_read_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct test6_read_state *state = tevent_req_data( + req, struct test6_read_state); + int ret; + bool status; + + status = sock_socket_write_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ret); + return; + } + + state->server_state->done = 1; + tevent_req_done(req); +} + +static bool test6_read_recv(struct tevent_req *req, int *perr) +{ + int ret; + + if (tevent_req_is_unix_error(req, &ret)) { + if (perr != NULL) { + *perr = ret; + } + return false; + } + + return true; +} + +static struct sock_socket_funcs test6_client_funcs = { + .read_send = test6_read_send, + .read_recv = test6_read_recv, +}; + +static void test6_startup(void *private_data) +{ + int fd = *(int *)private_data; + int ret = 1; + ssize_t nwritten; + + nwritten = write(fd, &ret, sizeof(ret)); + assert(nwritten == sizeof(ret)); + close(fd); +} + +static struct sock_daemon_funcs test6_funcs = { + .startup = test6_startup, +}; + +static void test6_handler(struct tevent_context *ev, + struct tevent_timer *te, + struct timeval curtime, + void *private_data) +{ + struct test6_server_state *state = + (struct test6_server_state *)private_data; + + if (state->done == 0) { + kill(0, SIGTERM); + return; + } + + talloc_free(state->sockd); +} + +static void test6(TALLOC_CTX *mem_ctx, const char *pidfile, + const char *sockpath) +{ + pid_t pid_server, pid; + int fd[2], ret; + ssize_t n; + + pid = getpid(); + + ret = pipe(fd); + assert(ret == 0); + + pid_server = fork(); + assert(pid_server != -1); + + if (pid_server == 0) { + struct tevent_context *ev; + struct sock_daemon_context *sockd; + struct test6_server_state state; + struct tevent_timer *te; + + close(fd[0]); + + ev = tevent_context_init(mem_ctx); + assert(ev != NULL); + + ret = sock_daemon_setup(mem_ctx, "test6", "file:", "NOTICE", + pidfile, &test6_funcs, &fd[1], &sockd); + assert(ret == 0); + + state.sockd = sockd; + state.done = 0; + + ret = sock_daemon_add_unix(sockd, sockpath, + &test6_client_funcs, &state); + assert(ret == 0); + + te = tevent_add_timer(ev, ev, tevent_timeval_current_ofs(10,0), + test6_handler, &state); + assert(te != NULL); + + ret = sock_daemon_run(ev, sockd, pid); + assert(ret == 0); + + exit(0); + } + + close(fd[1]); + + n = read(fd[0], &ret, sizeof(ret)); + assert(n == sizeof(ret)); + assert(ret == 1); + + close(fd[0]); + + test6_client(sockpath); + + pid = wait(&ret); + assert(pid != -1); +} + int main(int argc, const char **argv) { TALLOC_CTX *mem_ctx; const char *pidfile, *sockpath; + int num; - if (argc != 3) { - fprintf(stderr, "%s \n", argv[0]); + if (argc != 4) { + fprintf(stderr, "%s \n", argv[0]); exit(1); } pidfile = argv[1]; sockpath = argv[2]; + num = atoi(argv[3]); mem_ctx = talloc_new(NULL); assert(mem_ctx != NULL); - test1(mem_ctx, pidfile, sockpath); - test2(mem_ctx, pidfile, sockpath); - test3(mem_ctx, pidfile, sockpath); - test4(mem_ctx, pidfile, sockpath); - test5(mem_ctx, pidfile, sockpath); + switch (num) { + case 1: + test1(mem_ctx, pidfile, sockpath); + break; + + case 2: + test2(mem_ctx, pidfile, sockpath); + break; + + case 3: + test3(mem_ctx, pidfile, sockpath); + break; + + case 4: + test4(mem_ctx, pidfile, sockpath); + break; + + case 5: + test5(mem_ctx, pidfile, sockpath); + break; + + case 6: + test6(mem_ctx, pidfile, sockpath); + break; + + default: + fprintf(stderr, "Unknown test number %d\n", num); + } return 0; } -- 2.9.3 From bb8dcd811afb2fd90bf9a6994a20172a2e92829b Mon Sep 17 00:00:00 2001 From: Amitay Isaacs Date: Wed, 4 Jan 2017 17:46:54 +1100 Subject: [PATCH 3/5] ctdb-common: Fix a bug in packet reading code for generic socket I/O queue->offset currently points to the end of available data. However, after processing one packet the beginning of the next packet is not marked explicitly and caused the same packet to be processed again. BUG: https://bugzilla.samba.org/show_bug.cgi?id=12500 Signed-off-by: Amitay Isaacs Reviewed-by: Martin Schwenke (cherry picked from commit e3440d2bbc0e8f2cb09c94a1d77a60524017cfa0) --- ctdb/common/sock_io.c | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/ctdb/common/sock_io.c b/ctdb/common/sock_io.c index b3581fc..7431eec 100644 --- a/ctdb/common/sock_io.c +++ b/ctdb/common/sock_io.c @@ -76,7 +76,7 @@ struct sock_queue { struct tevent_queue *queue; struct tevent_fd *fde; uint8_t *buf; - size_t buflen, offset; + size_t buflen, begin, end; }; static bool sock_queue_set_fd(struct sock_queue *queue, int fd); @@ -181,20 +181,20 @@ static void sock_queue_handler(struct tevent_context *ev, goto fail; } - if (num_ready > queue->buflen - queue->offset) { + if (num_ready > queue->buflen - queue->end) { queue->buf = talloc_realloc_size(queue, queue->buf, - queue->offset + num_ready); + queue->end + num_ready); if (queue->buf == NULL) { goto fail; } - queue->buflen = queue->offset + num_ready; + queue->buflen = queue->end + num_ready; } - nread = sys_read(queue->fd, queue->buf + queue->offset, num_ready); + nread = sys_read(queue->fd, queue->buf + queue->end, num_ready); if (nread < 0) { goto fail; } - queue->offset += nread; + queue->end += nread; sock_queue_process(queue); return; @@ -207,33 +207,35 @@ static void sock_queue_process(struct sock_queue *queue) { uint32_t pkt_size; - if (queue->offset < sizeof(uint32_t)) { + if ((queue->end - queue->begin) < sizeof(uint32_t)) { /* not enough data */ return; } - pkt_size = *(uint32_t *)queue->buf; + pkt_size = *(uint32_t *)(queue->buf + queue->begin); if (pkt_size == 0) { D_ERR("Invalid packet of length 0\n"); queue->callback(NULL, 0, queue->private_data); } - if (queue->offset < pkt_size) { + if ((queue->end - queue->begin) < pkt_size) { /* not enough data */ return; } - queue->callback(queue->buf, pkt_size, queue->private_data); - queue->offset += pkt_size; + queue->callback(queue->buf + queue->begin, pkt_size, + queue->private_data); + queue->begin += pkt_size; - if (queue->offset < queue->buflen) { + if (queue->begin < queue->end) { /* more data to be processed */ tevent_schedule_immediate(queue->im, queue->ev, sock_queue_process_event, queue); } else { TALLOC_FREE(queue->buf); queue->buflen = 0; - queue->offset = 0; + queue->begin = 0; + queue->end = 0; } } -- 2.9.3 From fa5a9e70eb94c031837e031611e9aa66d7e4ac4c Mon Sep 17 00:00:00 2001 From: Amitay Isaacs Date: Thu, 5 Jan 2017 00:48:32 +1100 Subject: [PATCH 4/5] ctdb-tests: Add tests for generic socket I/O BUG: https://bugzilla.samba.org/show_bug.cgi?id=12500 Signed-off-by: Amitay Isaacs Reviewed-by: Martin Schwenke (cherry picked from commit 1dc1689e7402f6f90af3ddd4c7d33d140892ff2a) --- ctdb/tests/cunit/sock_io_test_001.sh | 9 ++ ctdb/tests/src/sock_io_test.c | 283 +++++++++++++++++++++++++++++++++++ ctdb/wscript | 1 + 3 files changed, 293 insertions(+) create mode 100755 ctdb/tests/cunit/sock_io_test_001.sh create mode 100644 ctdb/tests/src/sock_io_test.c diff --git a/ctdb/tests/cunit/sock_io_test_001.sh b/ctdb/tests/cunit/sock_io_test_001.sh new file mode 100755 index 0000000..1ead2f3 --- /dev/null +++ b/ctdb/tests/cunit/sock_io_test_001.sh @@ -0,0 +1,9 @@ +#!/bin/sh + +. "${TEST_SCRIPTS_DIR}/unit.sh" + +sockpath="${TEST_VAR_DIR}/sock_daemon_test.sock.$$" + +ok_null + +unit_test sock_io_test "$sockpath" diff --git a/ctdb/tests/src/sock_io_test.c b/ctdb/tests/src/sock_io_test.c new file mode 100644 index 0000000..d0048c1 --- /dev/null +++ b/ctdb/tests/src/sock_io_test.c @@ -0,0 +1,283 @@ +/* + sock I/O tests + + Copyright (C) Amitay Isaacs 2017 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see . +*/ + +#include "replace.h" +#include "system/filesys.h" +#include "system/network.h" +#include "system/wait.h" + +#include + +#include "common/sock_io.c" + +static int socket_init(const char *sockpath) +{ + struct sockaddr_un addr; + int fd, ret; + size_t len; + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + + len = strlcpy(addr.sun_path, sockpath, sizeof(addr.sun_path)); + assert(len < sizeof(addr.sun_path)); + + fd = socket(AF_UNIX, SOCK_STREAM, 0); + assert(fd != -1); + + ret = bind(fd, (struct sockaddr *)&addr, sizeof(addr)); + assert(ret != -1); + + ret = listen(fd, 10); + assert(ret != -1); + + return fd; +} + +static void test1_writer(int fd) +{ + uint8_t buf[1024]; + ssize_t nwritten; + uint32_t len; + + for (len = 10; len < 1000; len += 10) { + int value = len / 10; + uint32_t buflen = len + sizeof(uint32_t); + + memset(buf, value, buflen); + memcpy(buf, &buflen, sizeof(uint32_t)); + + nwritten = sys_write(fd, buf, buflen); + assert(nwritten == buflen); + } +} + +struct test1_reader_state { + size_t pkt_len; + bool done; +}; + +static void test1_reader(uint8_t *buf, size_t buflen, void *private_data) +{ + struct test1_reader_state *state = + (struct test1_reader_state *)private_data; + + if (buflen == 0) { + state->done = true; + return; + } + + assert(buflen == state->pkt_len); + + state->pkt_len += 10; +} + +static void test1(TALLOC_CTX *mem_ctx, const char *sockpath) +{ + struct test1_reader_state state; + struct tevent_context *ev; + struct sock_queue *queue; + pid_t pid; + int pfd[2], fd, ret; + ssize_t n; + + ret = pipe(pfd); + assert(ret == 0); + + pid = fork(); + assert(pid != -1); + + if (pid == 0) { + int newfd; + + close(pfd[0]); + + fd = socket_init(sockpath); + assert(fd != -1); + + ret = 1; + n = sys_write(pfd[1], &ret, sizeof(int)); + assert(n == sizeof(int)); + + newfd = accept(fd, NULL, NULL); + assert(newfd != -1); + + test1_writer(newfd); + close(newfd); + unlink(sockpath); + + exit(0); + } + + close(pfd[1]); + + n = sys_read(pfd[0], &ret, sizeof(int)); + assert(n == sizeof(int)); + assert(ret == 1); + + close(pfd[0]); + + fd = sock_connect(sockpath); + assert(fd != -1); + + ev = tevent_context_init(mem_ctx); + assert(ev != NULL); + + state.pkt_len = 10 + sizeof(uint32_t); + state.done = false; + + queue = sock_queue_setup(mem_ctx, ev, fd, test1_reader, &state); + assert(queue != NULL); + + while (! state.done) { + tevent_loop_once(ev); + } + + talloc_free(queue); + talloc_free(ev); + + pid = wait(&ret); + assert(pid != -1); +} + +static void test2_reader(int fd) +{ + uint8_t buf[1024]; + size_t pkt_len = 10 + sizeof(uint32_t); + ssize_t n; + + while (1) { + n = sys_read(fd, buf, 1024); + assert(n != -1); + + if (n == 0) { + return; + } + + assert(n == pkt_len); + pkt_len += 10; + } +} + +static void test2_dummy_reader(uint8_t *buf, size_t buflen, + void *private_data) +{ + assert(buflen == -1); +} + +static void test2_writer(struct sock_queue *queue) +{ + uint8_t buf[1024]; + uint32_t len; + int ret; + + for (len = 10; len < 1000; len += 10) { + int value = len / 10; + uint32_t buflen = len + sizeof(uint32_t); + + memset(buf, value, buflen); + memcpy(buf, &buflen, sizeof(uint32_t)); + + ret = sock_queue_write(queue, buf, buflen); + assert(ret == 0); + } +} + +static void test2(TALLOC_CTX *mem_ctx, const char *sockpath) +{ + struct tevent_context *ev; + struct sock_queue *queue; + pid_t pid; + int pfd[2], fd, ret; + ssize_t n; + + ret = pipe(pfd); + assert(ret == 0); + + pid = fork(); + assert(pid != -1); + + if (pid == 0) { + int newfd; + + close(pfd[0]); + + fd = socket_init(sockpath); + assert(fd != -1); + + ret = 1; + n = sys_write(pfd[1], &ret, sizeof(int)); + assert(n == sizeof(int)); + + newfd = accept(fd, NULL, NULL); + assert(newfd != -1); + + test2_reader(newfd); + close(newfd); + unlink(sockpath); + + exit(0); + } + + close(pfd[1]); + + n = sys_read(pfd[0], &ret, sizeof(int)); + assert(n == sizeof(int)); + assert(ret == 1); + + close(pfd[0]); + + fd = sock_connect(sockpath); + assert(fd != -1); + + ev = tevent_context_init(mem_ctx); + assert(ev != NULL); + + queue = sock_queue_setup(mem_ctx, ev, fd, test2_dummy_reader, NULL); + assert(queue != NULL); + + test2_writer(queue); + + talloc_free(queue); + talloc_free(ev); + + pid = wait(&ret); + assert(pid != -1); +} + +int main(int argc, const char **argv) +{ + TALLOC_CTX *mem_ctx; + const char *sockpath; + + if (argc != 2) { + fprintf(stderr, "%s \n", argv[0]); + exit(1); + } + + sockpath = argv[1]; + + mem_ctx = talloc_new(NULL); + assert(mem_ctx != NULL); + + test1(mem_ctx, sockpath); + test2(mem_ctx, sockpath); + + return 0; +} diff --git a/ctdb/wscript b/ctdb/wscript index b951dd6..4bd7d66 100644 --- a/ctdb/wscript +++ b/ctdb/wscript @@ -736,6 +736,7 @@ def build(bld): 'pidfile_test', 'run_proc_test', 'sock_daemon_test', + 'sock_io_test', ] for target in ctdb_unit_tests: -- 2.9.3 From 54f5ca9f8e810c3e7573e93d9eb571b51eeac326 Mon Sep 17 00:00:00 2001 From: Amitay Isaacs Date: Thu, 5 Jan 2017 15:05:56 +1100 Subject: [PATCH 5/5] ctdb-tests: Do not attempt to unregister the join handler multiple times MSG_ID_SYNC is broadcast to each node when a MSG_ID_JOIN has been received from all nodes. After MSG_ID_SYNC is successfully broadcast, the join handler is unregistered. However, if another MSG_ID_JOIN is received before the join handler is unregistered then MSG_ID_SYNC is re-broadcast. This results in multiple attempts to unregister the join handler. Once all MSG_ID_JOIN messages are received, unregister the join handler to ignore any extra MSG_ID_JOIN messages. Also, make sure that while join handler is being unregistered, MSG_ID_JOIN messages are ignored. BUG: https://bugzilla.samba.org/show_bug.cgi?id=12500 Identified-by: Martin Schwenke Signed-off-by: Amitay Isaacs Reviewed-by: Martin Schwenke Autobuild-User(master): Martin Schwenke Autobuild-Date(master): Fri Jan 6 12:27:23 CET 2017 on sn-devel-144 (cherry picked from commit 4635c22411a7864dd70703f854ec9844816e0294) --- ctdb/tests/src/cluster_wait.c | 40 ++++++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/ctdb/tests/src/cluster_wait.c b/ctdb/tests/src/cluster_wait.c index ddc3e02..1405738 100644 --- a/ctdb/tests/src/cluster_wait.c +++ b/ctdb/tests/src/cluster_wait.c @@ -36,6 +36,7 @@ struct cluster_wait_state { struct ctdb_client_context *client; int num_nodes; bool *ready; + bool join_done; }; static void cluster_wait_join_registered(struct tevent_req *subreq); @@ -44,8 +45,8 @@ static void cluster_wait_join(struct tevent_req *subreq); static void cluster_wait_join_sent(struct tevent_req *subreq); static void cluster_wait_join_handler(uint64_t srvid, TDB_DATA data, void *private_data); -static void cluster_wait_sync_sent(struct tevent_req *subreq); static void cluster_wait_join_unregistered(struct tevent_req *subreq); +static void cluster_wait_sync_sent(struct tevent_req *subreq); static void cluster_wait_sync_handler(uint64_t srvid, TDB_DATA data, void *private_data); static void cluster_wait_sync_unregistered(struct tevent_req *subreq); @@ -67,6 +68,8 @@ struct tevent_req *cluster_wait_send(TALLOC_CTX *mem_ctx, state->client = client; state->num_nodes = num_nodes; + state->join_done = false; + if (ctdb_client_pnn(client) == 0) { state->ready = talloc_zero_array(state, bool, num_nodes); if (tevent_req_nomem(state->ready, req)) { @@ -201,7 +204,6 @@ static void cluster_wait_join_handler(uint64_t srvid, TDB_DATA data, private_data, struct tevent_req); struct cluster_wait_state *state = tevent_req_data( req, struct cluster_wait_state); - struct ctdb_req_message msg; struct tevent_req *subreq; uint32_t pnn; int i; @@ -228,50 +230,56 @@ static void cluster_wait_join_handler(uint64_t srvid, TDB_DATA data, } } - msg.srvid = MSG_ID_SYNC; - msg.data.data = tdb_null; + if (state->join_done) { + return; + } - subreq = ctdb_client_message_send(state, state->ev, state->client, - CTDB_BROADCAST_ALL, &msg); + state->join_done = true; + subreq = ctdb_client_remove_message_handler_send( + state, state->ev, state->client, + MSG_ID_JOIN, req); if (tevent_req_nomem(subreq, req)) { return; } - tevent_req_set_callback(subreq, cluster_wait_sync_sent, req); + tevent_req_set_callback(subreq, cluster_wait_join_unregistered, req); } -static void cluster_wait_sync_sent(struct tevent_req *subreq) +static void cluster_wait_join_unregistered(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct cluster_wait_state *state = tevent_req_data( req, struct cluster_wait_state); + struct ctdb_req_message msg; bool status; int ret; - status = ctdb_client_message_recv(subreq, &ret); - TALLOC_FREE(subreq); + status = ctdb_client_remove_message_handler_recv(subreq, &ret); if (! status) { tevent_req_error(req, ret); return; } - subreq = ctdb_client_remove_message_handler_send( - state, state->ev, state->client, - MSG_ID_JOIN, req); + msg.srvid = MSG_ID_SYNC; + msg.data.data = tdb_null; + + subreq = ctdb_client_message_send(state, state->ev, state->client, + CTDB_BROADCAST_ALL, &msg); if (tevent_req_nomem(subreq, req)) { return; } - tevent_req_set_callback(subreq, cluster_wait_join_unregistered, req); + tevent_req_set_callback(subreq, cluster_wait_sync_sent, req); } -static void cluster_wait_join_unregistered(struct tevent_req *subreq) +static void cluster_wait_sync_sent(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); bool status; int ret; - status = ctdb_client_remove_message_handler_recv(subreq, &ret); + status = ctdb_client_message_recv(subreq, &ret); + TALLOC_FREE(subreq); if (! status) { tevent_req_error(req, ret); return; -- 2.9.3