diff options
author | Junio C Hamano <gitster@pobox.com> | 2021-04-02 14:43:14 -0700 |
---|---|---|
committer | Junio C Hamano <gitster@pobox.com> | 2021-04-02 14:43:14 -0700 |
commit | 861794b60d3a61822daf59bbde39a2a2e362738d (patch) | |
tree | 70d6c593034cedb5413b5216eb01ba810314d828 /compat | |
parent | Merge branch 'mt/parallel-checkout-part-1' (diff) | |
parent | t0052: add simple-ipc tests and t/helper/test-simple-ipc tool (diff) | |
download | tgif-861794b60d3a61822daf59bbde39a2a2e362738d.tar.xz |
Merge branch 'jh/simple-ipc'
A simple IPC interface gets introduced to build services like
fsmonitor on top.
* jh/simple-ipc:
t0052: add simple-ipc tests and t/helper/test-simple-ipc tool
simple-ipc: add Unix domain socket implementation
unix-stream-server: create unix domain socket under lock
unix-socket: disallow chdir() when creating unix domain sockets
unix-socket: add backlog size option to unix_stream_listen()
unix-socket: eliminate static unix_stream_socket() helper function
simple-ipc: add win32 implementation
simple-ipc: design documentation for new IPC mechanism
pkt-line: add options argument to read_packetized_to_strbuf()
pkt-line: add PACKET_READ_GENTLE_ON_READ_ERROR option
pkt-line: do not issue flush packets in write_packetized_*()
pkt-line: eliminate the need for static buffer in packet_write_gently()
Diffstat (limited to 'compat')
-rw-r--r-- | compat/simple-ipc/ipc-shared.c | 28 | ||||
-rw-r--r-- | compat/simple-ipc/ipc-unix-socket.c | 999 | ||||
-rw-r--r-- | compat/simple-ipc/ipc-win32.c | 751 |
3 files changed, 1778 insertions, 0 deletions
diff --git a/compat/simple-ipc/ipc-shared.c b/compat/simple-ipc/ipc-shared.c new file mode 100644 index 0000000000..1edec81595 --- /dev/null +++ b/compat/simple-ipc/ipc-shared.c @@ -0,0 +1,28 @@ +#include "cache.h" +#include "simple-ipc.h" +#include "strbuf.h" +#include "pkt-line.h" +#include "thread-utils.h" + +#ifdef SUPPORTS_SIMPLE_IPC + +int ipc_server_run(const char *path, const struct ipc_server_opts *opts, + ipc_server_application_cb *application_cb, + void *application_data) +{ + struct ipc_server_data *server_data = NULL; + int ret; + + ret = ipc_server_run_async(&server_data, path, opts, + application_cb, application_data); + if (ret) + return ret; + + ret = ipc_server_await(server_data); + + ipc_server_free(server_data); + + return ret; +} + +#endif /* SUPPORTS_SIMPLE_IPC */ diff --git a/compat/simple-ipc/ipc-unix-socket.c b/compat/simple-ipc/ipc-unix-socket.c new file mode 100644 index 0000000000..38689b278d --- /dev/null +++ b/compat/simple-ipc/ipc-unix-socket.c @@ -0,0 +1,999 @@ +#include "cache.h" +#include "simple-ipc.h" +#include "strbuf.h" +#include "pkt-line.h" +#include "thread-utils.h" +#include "unix-socket.h" +#include "unix-stream-server.h" + +#ifdef NO_UNIX_SOCKETS +#error compat/simple-ipc/ipc-unix-socket.c requires Unix sockets +#endif + +enum ipc_active_state ipc_get_active_state(const char *path) +{ + enum ipc_active_state state = IPC_STATE__OTHER_ERROR; + struct ipc_client_connect_options options + = IPC_CLIENT_CONNECT_OPTIONS_INIT; + struct stat st; + struct ipc_client_connection *connection_test = NULL; + + options.wait_if_busy = 0; + options.wait_if_not_found = 0; + + if (lstat(path, &st) == -1) { + switch (errno) { + case ENOENT: + case ENOTDIR: + return IPC_STATE__NOT_LISTENING; + default: + return IPC_STATE__INVALID_PATH; + } + } + + /* also complain if a plain file is in the way */ + if ((st.st_mode & S_IFMT) != S_IFSOCK) + return IPC_STATE__INVALID_PATH; + + /* + * Just because the filesystem has a S_IFSOCK type inode + * at `path`, doesn't mean it that there is a server listening. + * Ping it to be sure. + */ + state = ipc_client_try_connect(path, &options, &connection_test); + ipc_client_close_connection(connection_test); + + return state; +} + +/* + * Retry frequency when trying to connect to a server. + * + * This value should be short enough that we don't seriously delay our + * caller, but not fast enough that our spinning puts pressure on the + * system. + */ +#define WAIT_STEP_MS (50) + +/* + * Try to connect to the server. If the server is just starting up or + * is very busy, we may not get a connection the first time. + */ +static enum ipc_active_state connect_to_server( + const char *path, + int timeout_ms, + const struct ipc_client_connect_options *options, + int *pfd) +{ + int k; + + *pfd = -1; + + for (k = 0; k < timeout_ms; k += WAIT_STEP_MS) { + int fd = unix_stream_connect(path, options->uds_disallow_chdir); + + if (fd != -1) { + *pfd = fd; + return IPC_STATE__LISTENING; + } + + if (errno == ENOENT) { + if (!options->wait_if_not_found) + return IPC_STATE__PATH_NOT_FOUND; + + goto sleep_and_try_again; + } + + if (errno == ETIMEDOUT) { + if (!options->wait_if_busy) + return IPC_STATE__NOT_LISTENING; + + goto sleep_and_try_again; + } + + if (errno == ECONNREFUSED) { + if (!options->wait_if_busy) + return IPC_STATE__NOT_LISTENING; + + goto sleep_and_try_again; + } + + return IPC_STATE__OTHER_ERROR; + + sleep_and_try_again: + sleep_millisec(WAIT_STEP_MS); + } + + return IPC_STATE__NOT_LISTENING; +} + +/* + * The total amount of time that we are willing to wait when trying to + * connect to a server. + * + * When the server is first started, it might take a little while for + * it to become ready to service requests. Likewise, the server may + * be very (temporarily) busy and not respond to our connections. + * + * We should gracefully and silently handle those conditions and try + * again for a reasonable time period. + * + * The value chosen here should be long enough for the server + * to reliably heal from the above conditions. + */ +#define MY_CONNECTION_TIMEOUT_MS (1000) + +enum ipc_active_state ipc_client_try_connect( + const char *path, + const struct ipc_client_connect_options *options, + struct ipc_client_connection **p_connection) +{ + enum ipc_active_state state = IPC_STATE__OTHER_ERROR; + int fd = -1; + + *p_connection = NULL; + + trace2_region_enter("ipc-client", "try-connect", NULL); + trace2_data_string("ipc-client", NULL, "try-connect/path", path); + + state = connect_to_server(path, MY_CONNECTION_TIMEOUT_MS, + options, &fd); + + trace2_data_intmax("ipc-client", NULL, "try-connect/state", + (intmax_t)state); + trace2_region_leave("ipc-client", "try-connect", NULL); + + if (state == IPC_STATE__LISTENING) { + (*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection)); + (*p_connection)->fd = fd; + } + + return state; +} + +void ipc_client_close_connection(struct ipc_client_connection *connection) +{ + if (!connection) + return; + + if (connection->fd != -1) + close(connection->fd); + + free(connection); +} + +int ipc_client_send_command_to_connection( + struct ipc_client_connection *connection, + const char *message, struct strbuf *answer) +{ + int ret = 0; + + strbuf_setlen(answer, 0); + + trace2_region_enter("ipc-client", "send-command", NULL); + + if (write_packetized_from_buf_no_flush(message, strlen(message), + connection->fd) < 0 || + packet_flush_gently(connection->fd) < 0) { + ret = error(_("could not send IPC command")); + goto done; + } + + if (read_packetized_to_strbuf( + connection->fd, answer, + PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) { + ret = error(_("could not read IPC response")); + goto done; + } + +done: + trace2_region_leave("ipc-client", "send-command", NULL); + return ret; +} + +int ipc_client_send_command(const char *path, + const struct ipc_client_connect_options *options, + const char *message, struct strbuf *answer) +{ + int ret = -1; + enum ipc_active_state state; + struct ipc_client_connection *connection = NULL; + + state = ipc_client_try_connect(path, options, &connection); + + if (state != IPC_STATE__LISTENING) + return ret; + + ret = ipc_client_send_command_to_connection(connection, message, answer); + + ipc_client_close_connection(connection); + + return ret; +} + +static int set_socket_blocking_flag(int fd, int make_nonblocking) +{ + int flags; + + flags = fcntl(fd, F_GETFL, NULL); + + if (flags < 0) + return -1; + + if (make_nonblocking) + flags |= O_NONBLOCK; + else + flags &= ~O_NONBLOCK; + + return fcntl(fd, F_SETFL, flags); +} + +/* + * Magic numbers used to annotate callback instance data. + * These are used to help guard against accidentally passing the + * wrong instance data across multiple levels of callbacks (which + * is easy to do if there are `void*` arguments). + */ +enum magic { + MAGIC_SERVER_REPLY_DATA, + MAGIC_WORKER_THREAD_DATA, + MAGIC_ACCEPT_THREAD_DATA, + MAGIC_SERVER_DATA, +}; + +struct ipc_server_reply_data { + enum magic magic; + int fd; + struct ipc_worker_thread_data *worker_thread_data; +}; + +struct ipc_worker_thread_data { + enum magic magic; + struct ipc_worker_thread_data *next_thread; + struct ipc_server_data *server_data; + pthread_t pthread_id; +}; + +struct ipc_accept_thread_data { + enum magic magic; + struct ipc_server_data *server_data; + + struct unix_ss_socket *server_socket; + + int fd_send_shutdown; + int fd_wait_shutdown; + pthread_t pthread_id; +}; + +/* + * With unix-sockets, the conceptual "ipc-server" is implemented as a single + * controller "accept-thread" thread and a pool of "worker-thread" threads. + * The former does the usual `accept()` loop and dispatches connections + * to an idle worker thread. The worker threads wait in an idle loop for + * a new connection, communicate with the client and relay data to/from + * the `application_cb` and then wait for another connection from the + * server thread. This avoids the overhead of constantly creating and + * destroying threads. + */ +struct ipc_server_data { + enum magic magic; + ipc_server_application_cb *application_cb; + void *application_data; + struct strbuf buf_path; + + struct ipc_accept_thread_data *accept_thread; + struct ipc_worker_thread_data *worker_thread_list; + + pthread_mutex_t work_available_mutex; + pthread_cond_t work_available_cond; + + /* + * Accepted but not yet processed client connections are kept + * in a circular buffer FIFO. The queue is empty when the + * positions are equal. + */ + int *fifo_fds; + int queue_size; + int back_pos; + int front_pos; + + int shutdown_requested; + int is_stopped; +}; + +/* + * Remove and return the oldest queued connection. + * + * Returns -1 if empty. + */ +static int fifo_dequeue(struct ipc_server_data *server_data) +{ + /* ASSERT holding mutex */ + + int fd; + + if (server_data->back_pos == server_data->front_pos) + return -1; + + fd = server_data->fifo_fds[server_data->front_pos]; + server_data->fifo_fds[server_data->front_pos] = -1; + + server_data->front_pos++; + if (server_data->front_pos == server_data->queue_size) + server_data->front_pos = 0; + + return fd; +} + +/* + * Push a new fd onto the back of the queue. + * + * Drop it and return -1 if queue is already full. + */ +static int fifo_enqueue(struct ipc_server_data *server_data, int fd) +{ + /* ASSERT holding mutex */ + + int next_back_pos; + + next_back_pos = server_data->back_pos + 1; + if (next_back_pos == server_data->queue_size) + next_back_pos = 0; + + if (next_back_pos == server_data->front_pos) { + /* Queue is full. Just drop it. */ + close(fd); + return -1; + } + + server_data->fifo_fds[server_data->back_pos] = fd; + server_data->back_pos = next_back_pos; + + return fd; +} + +/* + * Wait for a connection to be queued to the FIFO and return it. + * + * Returns -1 if someone has already requested a shutdown. + */ +static int worker_thread__wait_for_connection( + struct ipc_worker_thread_data *worker_thread_data) +{ + /* ASSERT NOT holding mutex */ + + struct ipc_server_data *server_data = worker_thread_data->server_data; + int fd = -1; + + pthread_mutex_lock(&server_data->work_available_mutex); + for (;;) { + if (server_data->shutdown_requested) + break; + + fd = fifo_dequeue(server_data); + if (fd >= 0) + break; + + pthread_cond_wait(&server_data->work_available_cond, + &server_data->work_available_mutex); + } + pthread_mutex_unlock(&server_data->work_available_mutex); + + return fd; +} + +/* + * Forward declare our reply callback function so that any compiler + * errors are reported when we actually define the function (in addition + * to any errors reported when we try to pass this callback function as + * a parameter in a function call). The former are easier to understand. + */ +static ipc_server_reply_cb do_io_reply_callback; + +/* + * Relay application's response message to the client process. + * (We do not flush at this point because we allow the caller + * to chunk data to the client thru us.) + */ +static int do_io_reply_callback(struct ipc_server_reply_data *reply_data, + const char *response, size_t response_len) +{ + if (reply_data->magic != MAGIC_SERVER_REPLY_DATA) + BUG("reply_cb called with wrong instance data"); + + return write_packetized_from_buf_no_flush(response, response_len, + reply_data->fd); +} + +/* A randomly chosen value. */ +#define MY_WAIT_POLL_TIMEOUT_MS (10) + +/* + * If the client hangs up without sending any data on the wire, just + * quietly close the socket and ignore this client. + * + * This worker thread is committed to reading the IPC request data + * from the client at the other end of this fd. Wait here for the + * client to actually put something on the wire -- because if the + * client just does a ping (connect and hangup without sending any + * data), our use of the pkt-line read routines will spew an error + * message. + * + * Return -1 if the client hung up. + * Return 0 if data (possibly incomplete) is ready. + */ +static int worker_thread__wait_for_io_start( + struct ipc_worker_thread_data *worker_thread_data, + int fd) +{ + struct ipc_server_data *server_data = worker_thread_data->server_data; + struct pollfd pollfd[1]; + int result; + + for (;;) { + pollfd[0].fd = fd; + pollfd[0].events = POLLIN; + + result = poll(pollfd, 1, MY_WAIT_POLL_TIMEOUT_MS); + if (result < 0) { + if (errno == EINTR) + continue; + goto cleanup; + } + + if (result == 0) { + /* a timeout */ + + int in_shutdown; + + pthread_mutex_lock(&server_data->work_available_mutex); + in_shutdown = server_data->shutdown_requested; + pthread_mutex_unlock(&server_data->work_available_mutex); + + /* + * If a shutdown is already in progress and this + * client has not started talking yet, just drop it. + */ + if (in_shutdown) + goto cleanup; + continue; + } + + if (pollfd[0].revents & POLLHUP) + goto cleanup; + + if (pollfd[0].revents & POLLIN) + return 0; + + goto cleanup; + } + +cleanup: + close(fd); + return -1; +} + +/* + * Receive the request/command from the client and pass it to the + * registered request-callback. The request-callback will compose + * a response and call our reply-callback to send it to the client. + */ +static int worker_thread__do_io( + struct ipc_worker_thread_data *worker_thread_data, + int fd) +{ + /* ASSERT NOT holding lock */ + + struct strbuf buf = STRBUF_INIT; + struct ipc_server_reply_data reply_data; + int ret = 0; + + reply_data.magic = MAGIC_SERVER_REPLY_DATA; + reply_data.worker_thread_data = worker_thread_data; + + reply_data.fd = fd; + + ret = read_packetized_to_strbuf( + reply_data.fd, &buf, + PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR); + if (ret >= 0) { + ret = worker_thread_data->server_data->application_cb( + worker_thread_data->server_data->application_data, + buf.buf, do_io_reply_callback, &reply_data); + + packet_flush_gently(reply_data.fd); + } + else { + /* + * The client probably disconnected/shutdown before it + * could send a well-formed message. Ignore it. + */ + } + + strbuf_release(&buf); + close(reply_data.fd); + + return ret; +} + +/* + * Block SIGPIPE on the current thread (so that we get EPIPE from + * write() rather than an actual signal). + * + * Note that using sigchain_push() and _pop() to control SIGPIPE + * around our IO calls is not thread safe: + * [] It uses a global stack of handler frames. + * [] It uses ALLOC_GROW() to resize it. + * [] Finally, according to the `signal(2)` man-page: + * "The effects of `signal()` in a multithreaded process are unspecified." + */ +static void thread_block_sigpipe(sigset_t *old_set) +{ + sigset_t new_set; + + sigemptyset(&new_set); + sigaddset(&new_set, SIGPIPE); + + sigemptyset(old_set); + pthread_sigmask(SIG_BLOCK, &new_set, old_set); +} + +/* + * Thread proc for an IPC worker thread. It handles a series of + * connections from clients. It pulls the next fd from the queue + * processes it, and then waits for the next client. + * + * Block SIGPIPE in this worker thread for the life of the thread. + * This avoids stray (and sometimes delayed) SIGPIPE signals caused + * by client errors and/or when we are under extremely heavy IO load. + * + * This means that the application callback will have SIGPIPE blocked. + * The callback should not change it. + */ +static void *worker_thread_proc(void *_worker_thread_data) +{ + struct ipc_worker_thread_data *worker_thread_data = _worker_thread_data; + struct ipc_server_data *server_data = worker_thread_data->server_data; + sigset_t old_set; + int fd, io; + int ret; + + trace2_thread_start("ipc-worker"); + + thread_block_sigpipe(&old_set); + + for (;;) { + fd = worker_thread__wait_for_connection(worker_thread_data); + if (fd == -1) + break; /* in shutdown */ + + io = worker_thread__wait_for_io_start(worker_thread_data, fd); + if (io == -1) + continue; /* client hung up without sending anything */ + + ret = worker_thread__do_io(worker_thread_data, fd); + + if (ret == SIMPLE_IPC_QUIT) { + trace2_data_string("ipc-worker", NULL, "queue_stop_async", + "application_quit"); + /* + * The application layer is telling the ipc-server + * layer to shutdown. + * + * We DO NOT have a response to send to the client. + * + * Queue an async stop (to stop the other threads) and + * allow this worker thread to exit now (no sense waiting + * for the thread-pool shutdown signal). + * + * Other non-idle worker threads are allowed to finish + * responding to their current clients. + */ + ipc_server_stop_async(server_data); + break; + } + } + + trace2_thread_exit(); + return NULL; +} + +/* A randomly chosen value. */ +#define MY_ACCEPT_POLL_TIMEOUT_MS (60 * 1000) + +/* + * Accept a new client connection on our socket. This uses non-blocking + * IO so that we can also wait for shutdown requests on our socket-pair + * without actually spinning on a fast timeout. + */ +static int accept_thread__wait_for_connection( + struct ipc_accept_thread_data *accept_thread_data) +{ + struct pollfd pollfd[2]; + int result; + + for (;;) { + pollfd[0].fd = accept_thread_data->fd_wait_shutdown; + pollfd[0].events = POLLIN; + + pollfd[1].fd = accept_thread_data->server_socket->fd_socket; + pollfd[1].events = POLLIN; + + result = poll(pollfd, 2, MY_ACCEPT_POLL_TIMEOUT_MS); + if (result < 0) { + if (errno == EINTR) + continue; + return result; + } + + if (result == 0) { + /* a timeout */ + + /* + * If someone deletes or force-creates a new unix + * domain socket at our path, all future clients + * will be routed elsewhere and we silently starve. + * If that happens, just queue a shutdown. + */ + if (unix_ss_was_stolen( + accept_thread_data->server_socket)) { + trace2_data_string("ipc-accept", NULL, + "queue_stop_async", + "socket_stolen"); + ipc_server_stop_async( + accept_thread_data->server_data); + } + continue; + } + + if (pollfd[0].revents & POLLIN) { + /* shutdown message queued to socketpair */ + return -1; + } + + if (pollfd[1].revents & POLLIN) { + /* a connection is available on server_socket */ + + int client_fd = + accept(accept_thread_data->server_socket->fd_socket, + NULL, NULL); + if (client_fd >= 0) + return client_fd; + + /* + * An error here is unlikely -- it probably + * indicates that the connecting process has + * already dropped the connection. + */ + continue; + } + + BUG("unandled poll result errno=%d r[0]=%d r[1]=%d", + errno, pollfd[0].revents, pollfd[1].revents); + } +} + +/* + * Thread proc for the IPC server "accept thread". This waits for + * an incoming socket connection, appends it to the queue of available + * connections, and notifies a worker thread to process it. + * + * Block SIGPIPE in this thread for the life of the thread. This + * avoids any stray SIGPIPE signals when closing pipe fds under + * extremely heavy loads (such as when the fifo queue is full and we + * drop incomming connections). + */ +static void *accept_thread_proc(void *_accept_thread_data) +{ + struct ipc_accept_thread_data *accept_thread_data = _accept_thread_data; + struct ipc_server_data *server_data = accept_thread_data->server_data; + sigset_t old_set; + + trace2_thread_start("ipc-accept"); + + thread_block_sigpipe(&old_set); + + for (;;) { + int client_fd = accept_thread__wait_for_connection( + accept_thread_data); + + pthread_mutex_lock(&server_data->work_available_mutex); + if (server_data->shutdown_requested) { + pthread_mutex_unlock(&server_data->work_available_mutex); + if (client_fd >= 0) + close(client_fd); + break; + } + + if (client_fd < 0) { + /* ignore transient accept() errors */ + } + else { + fifo_enqueue(server_data, client_fd); + pthread_cond_broadcast(&server_data->work_available_cond); + } + pthread_mutex_unlock(&server_data->work_available_mutex); + } + + trace2_thread_exit(); + return NULL; +} + +/* + * We can't predict the connection arrival rate relative to the worker + * processing rate, therefore we allow the "accept-thread" to queue up + * a generous number of connections, since we'd rather have the client + * not unnecessarily timeout if we can avoid it. (The assumption is + * that this will be used for FSMonitor and a few second wait on a + * connection is better than having the client timeout and do the full + * computation itself.) + * + * The FIFO queue size is set to a multiple of the worker pool size. + * This value chosen at random. + */ +#define FIFO_SCALE (100) + +/* + * The backlog value for `listen(2)`. This doesn't need to huge, + * rather just large enough for our "accept-thread" to wake up and + * queue incoming connections onto the FIFO without the kernel + * dropping any. + * + * This value chosen at random. + */ +#define LISTEN_BACKLOG (50) + +static int create_listener_socket( + const char *path, + const struct ipc_server_opts *ipc_opts, + struct unix_ss_socket **new_server_socket) +{ + struct unix_ss_socket *server_socket = NULL; + struct unix_stream_listen_opts uslg_opts = UNIX_STREAM_LISTEN_OPTS_INIT; + int ret; + + uslg_opts.listen_backlog_size = LISTEN_BACKLOG; + uslg_opts.disallow_chdir = ipc_opts->uds_disallow_chdir; + + ret = unix_ss_create(path, &uslg_opts, -1, &server_socket); + if (ret) + return ret; + + if (set_socket_blocking_flag(server_socket->fd_socket, 1)) { + int saved_errno = errno; + unix_ss_free(server_socket); + errno = saved_errno; + return -1; + } + + *new_server_socket = server_socket; + + trace2_data_string("ipc-server", NULL, "listen-with-lock", path); + return 0; +} + +static int setup_listener_socket( + const char *path, + const struct ipc_server_opts *ipc_opts, + struct unix_ss_socket **new_server_socket) +{ + int ret, saved_errno; + + trace2_region_enter("ipc-server", "create-listener_socket", NULL); + + ret = create_listener_socket(path, ipc_opts, new_server_socket); + + saved_errno = errno; + trace2_region_leave("ipc-server", "create-listener_socket", NULL); + errno = saved_errno; + + return ret; +} + +/* + * Start IPC server in a pool of background threads. + */ +int ipc_server_run_async(struct ipc_server_data **returned_server_data, + const char *path, const struct ipc_server_opts *opts, + ipc_server_application_cb *application_cb, + void *application_data) +{ + struct unix_ss_socket *server_socket = NULL; + struct ipc_server_data *server_data; + int sv[2]; + int k; + int ret; + int nr_threads = opts->nr_threads; + + *returned_server_data = NULL; + + /* + * Create a socketpair and set sv[1] to non-blocking. This + * will used to send a shutdown message to the accept-thread + * and allows the accept-thread to wait on EITHER a client + * connection or a shutdown request without spinning. + */ + if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) < 0) + return -1; + + if (set_socket_blocking_flag(sv[1], 1)) { + int saved_errno = errno; + close(sv[0]); + close(sv[1]); + errno = saved_errno; + return -1; + } + + ret = setup_listener_socket(path, opts, &server_socket); + if (ret) { + int saved_errno = errno; + close(sv[0]); + close(sv[1]); + errno = saved_errno; + return ret; + } + + server_data = xcalloc(1, sizeof(*server_data)); + server_data->magic = MAGIC_SERVER_DATA; + server_data->application_cb = application_cb; + server_data->application_data = application_data; + strbuf_init(&server_data->buf_path, 0); + strbuf_addstr(&server_data->buf_path, path); + + if (nr_threads < 1) + nr_threads = 1; + + pthread_mutex_init(&server_data->work_available_mutex, NULL); + pthread_cond_init(&server_data->work_available_cond, NULL); + + server_data->queue_size = nr_threads * FIFO_SCALE; + CALLOC_ARRAY(server_data->fifo_fds, server_data->queue_size); + + server_data->accept_thread = + xcalloc(1, sizeof(*server_data->accept_thread)); + server_data->accept_thread->magic = MAGIC_ACCEPT_THREAD_DATA; + server_data->accept_thread->server_data = server_data; + server_data->accept_thread->server_socket = server_socket; + server_data->accept_thread->fd_send_shutdown = sv[0]; + server_data->accept_thread->fd_wait_shutdown = sv[1]; + + if (pthread_create(&server_data->accept_thread->pthread_id, NULL, + accept_thread_proc, server_data->accept_thread)) + die_errno(_("could not start accept_thread '%s'"), path); + + for (k = 0; k < nr_threads; k++) { + struct ipc_worker_thread_data *wtd; + + wtd = xcalloc(1, sizeof(*wtd)); + wtd->magic = MAGIC_WORKER_THREAD_DATA; + wtd->server_data = server_data; + + if (pthread_create(&wtd->pthread_id, NULL, worker_thread_proc, + wtd)) { + if (k == 0) + die(_("could not start worker[0] for '%s'"), + path); + /* + * Limp along with the thread pool that we have. + */ + break; + } + + wtd->next_thread = server_data->worker_thread_list; + server_data->worker_thread_list = wtd; + } + + *returned_server_data = server_data; + return 0; +} + +/* + * Gently tell the IPC server treads to shutdown. + * Can be run on any thread. + */ +int ipc_server_stop_async(struct ipc_server_data *server_data) +{ + /* ASSERT NOT holding mutex */ + + int fd; + + if (!server_data) + return 0; + + trace2_region_enter("ipc-server", "server-stop-async", NULL); + + pthread_mutex_lock(&server_data->work_available_mutex); + + server_data->shutdown_requested = 1; + + /* + * Write a byte to the shutdown socket pair to wake up the + * accept-thread. + */ + if (write(server_data->accept_thread->fd_send_shutdown, "Q", 1) < 0) + error_errno("could not write to fd_send_shutdown"); + + /* + * Drain the queue of existing connections. + */ + while ((fd = fifo_dequeue(server_data)) != -1) + close(fd); + + /* + * Gently tell worker threads to stop processing new connections + * and exit. (This does not abort in-process conversations.) + */ + pthread_cond_broadcast(&server_data->work_available_cond); + + pthread_mutex_unlock(&server_data->work_available_mutex); + + trace2_region_leave("ipc-server", "server-stop-async", NULL); + + return 0; +} + +/* + * Wait for all IPC server threads to stop. + */ +int ipc_server_await(struct ipc_server_data *server_data) +{ + pthread_join(server_data->accept_thread->pthread_id, NULL); + + if (!server_data->shutdown_requested) + BUG("ipc-server: accept-thread stopped for '%s'", + server_data->buf_path.buf); + + while (server_data->worker_thread_list) { + struct ipc_worker_thread_data *wtd = + server_data->worker_thread_list; + + pthread_join(wtd->pthread_id, NULL); + + server_data->worker_thread_list = wtd->next_thread; + free(wtd); + } + + server_data->is_stopped = 1; + + return 0; +} + +void ipc_server_free(struct ipc_server_data *server_data) +{ + struct ipc_accept_thread_data * accept_thread_data; + + if (!server_data) + return; + + if (!server_data->is_stopped) + BUG("cannot free ipc-server while running for '%s'", + server_data->buf_path.buf); + + accept_thread_data = server_data->accept_thread; + if (accept_thread_data) { + unix_ss_free(accept_thread_data->server_socket); + + if (accept_thread_data->fd_send_shutdown != -1) + close(accept_thread_data->fd_send_shutdown); + if (accept_thread_data->fd_wait_shutdown != -1) + close(accept_thread_data->fd_wait_shutdown); + + free(server_data->accept_thread); + } + + while (server_data->worker_thread_list) { + struct ipc_worker_thread_data *wtd = + server_data->worker_thread_list; + + server_data->worker_thread_list = wtd->next_thread; + free(wtd); + } + + pthread_cond_destroy(&server_data->work_available_cond); + pthread_mutex_destroy(&server_data->work_available_mutex); + + strbuf_release(&server_data->buf_path); + + free(server_data->fifo_fds); + free(server_data); +} diff --git a/compat/simple-ipc/ipc-win32.c b/compat/simple-ipc/ipc-win32.c new file mode 100644 index 0000000000..8f89c02037 --- /dev/null +++ b/compat/simple-ipc/ipc-win32.c @@ -0,0 +1,751 @@ +#include "cache.h" +#include "simple-ipc.h" +#include "strbuf.h" +#include "pkt-line.h" +#include "thread-utils.h" + +#ifndef GIT_WINDOWS_NATIVE +#error This file can only be compiled on Windows +#endif + +static int initialize_pipe_name(const char *path, wchar_t *wpath, size_t alloc) +{ + int off = 0; + struct strbuf realpath = STRBUF_INIT; + + if (!strbuf_realpath(&realpath, path, 0)) + return -1; + + off = swprintf(wpath, alloc, L"\\\\.\\pipe\\"); + if (xutftowcs(wpath + off, realpath.buf, alloc - off) < 0) + return -1; + + /* Handle drive prefix */ + if (wpath[off] && wpath[off + 1] == L':') { + wpath[off + 1] = L'_'; + off += 2; + } + + for (; wpath[off]; off++) + if (wpath[off] == L'/') + wpath[off] = L'\\'; + + strbuf_release(&realpath); + return 0; +} + +static enum ipc_active_state get_active_state(wchar_t *pipe_path) +{ + if (WaitNamedPipeW(pipe_path, NMPWAIT_USE_DEFAULT_WAIT)) + return IPC_STATE__LISTENING; + + if (GetLastError() == ERROR_SEM_TIMEOUT) + return IPC_STATE__NOT_LISTENING; + + if (GetLastError() == ERROR_FILE_NOT_FOUND) + return IPC_STATE__PATH_NOT_FOUND; + + return IPC_STATE__OTHER_ERROR; +} + +enum ipc_active_state ipc_get_active_state(const char *path) +{ + wchar_t pipe_path[MAX_PATH]; + + if (initialize_pipe_name(path, pipe_path, ARRAY_SIZE(pipe_path)) < 0) + return IPC_STATE__INVALID_PATH; + + return get_active_state(pipe_path); +} + +#define WAIT_STEP_MS (50) + +static enum ipc_active_state connect_to_server( + const wchar_t *wpath, + DWORD timeout_ms, + const struct ipc_client_connect_options *options, + int *pfd) +{ + DWORD t_start_ms, t_waited_ms; + DWORD step_ms; + HANDLE hPipe = INVALID_HANDLE_VALUE; + DWORD mode = PIPE_READMODE_BYTE; + DWORD gle; + + *pfd = -1; + + for (;;) { + hPipe = CreateFileW(wpath, GENERIC_READ | GENERIC_WRITE, + 0, NULL, OPEN_EXISTING, 0, NULL); + if (hPipe != INVALID_HANDLE_VALUE) + break; + + gle = GetLastError(); + + switch (gle) { + case ERROR_FILE_NOT_FOUND: + if (!options->wait_if_not_found) + return IPC_STATE__PATH_NOT_FOUND; + if (!timeout_ms) + return IPC_STATE__PATH_NOT_FOUND; + + step_ms = (timeout_ms < WAIT_STEP_MS) ? + timeout_ms : WAIT_STEP_MS; + sleep_millisec(step_ms); + + timeout_ms -= step_ms; + break; /* try again */ + + case ERROR_PIPE_BUSY: + if (!options->wait_if_busy) + return IPC_STATE__NOT_LISTENING; + if (!timeout_ms) + return IPC_STATE__NOT_LISTENING; + + t_start_ms = (DWORD)(getnanotime() / 1000000); + + if (!WaitNamedPipeW(wpath, timeout_ms)) { + if (GetLastError() == ERROR_SEM_TIMEOUT) + return IPC_STATE__NOT_LISTENING; + + return IPC_STATE__OTHER_ERROR; + } + + /* + * A pipe server instance became available. + * Race other client processes to connect to + * it. + * + * But first decrement our overall timeout so + * that we don't starve if we keep losing the + * race. But also guard against special + * NPMWAIT_ values (0 and -1). + */ + t_waited_ms = (DWORD)(getnanotime() / 1000000) - t_start_ms; + if (t_waited_ms < timeout_ms) + timeout_ms -= t_waited_ms; + else + timeout_ms = 1; + break; /* try again */ + + default: + return IPC_STATE__OTHER_ERROR; + } + } + + if (!SetNamedPipeHandleState(hPipe, &mode, NULL, NULL)) { + CloseHandle(hPipe); + return IPC_STATE__OTHER_ERROR; + } + + *pfd = _open_osfhandle((intptr_t)hPipe, O_RDWR|O_BINARY); + if (*pfd < 0) { + CloseHandle(hPipe); + return IPC_STATE__OTHER_ERROR; + } + + /* fd now owns hPipe */ + + return IPC_STATE__LISTENING; +} + +/* + * The default connection timeout for Windows clients. + * + * This is not currently part of the ipc_ API (nor the config settings) + * because of differences between Windows and other platforms. + * + * This value was chosen at random. + */ +#define WINDOWS_CONNECTION_TIMEOUT_MS (30000) + +enum ipc_active_state ipc_client_try_connect( + const char *path, + const struct ipc_client_connect_options *options, + struct ipc_client_connection **p_connection) +{ + wchar_t wpath[MAX_PATH]; + enum ipc_active_state state = IPC_STATE__OTHER_ERROR; + int fd = -1; + + *p_connection = NULL; + + trace2_region_enter("ipc-client", "try-connect", NULL); + trace2_data_string("ipc-client", NULL, "try-connect/path", path); + + if (initialize_pipe_name(path, wpath, ARRAY_SIZE(wpath)) < 0) + state = IPC_STATE__INVALID_PATH; + else + state = connect_to_server(wpath, WINDOWS_CONNECTION_TIMEOUT_MS, + options, &fd); + + trace2_data_intmax("ipc-client", NULL, "try-connect/state", + (intmax_t)state); + trace2_region_leave("ipc-client", "try-connect", NULL); + + if (state == IPC_STATE__LISTENING) { + (*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection)); + (*p_connection)->fd = fd; + } + + return state; +} + +void ipc_client_close_connection(struct ipc_client_connection *connection) +{ + if (!connection) + return; + + if (connection->fd != -1) + close(connection->fd); + + free(connection); +} + +int ipc_client_send_command_to_connection( + struct ipc_client_connection *connection, + const char *message, struct strbuf *answer) +{ + int ret = 0; + + strbuf_setlen(answer, 0); + + trace2_region_enter("ipc-client", "send-command", NULL); + + if (write_packetized_from_buf_no_flush(message, strlen(message), + connection->fd) < 0 || + packet_flush_gently(connection->fd) < 0) { + ret = error(_("could not send IPC command")); + goto done; + } + + FlushFileBuffers((HANDLE)_get_osfhandle(connection->fd)); + + if (read_packetized_to_strbuf( + connection->fd, answer, + PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) { + ret = error(_("could not read IPC response")); + goto done; + } + +done: + trace2_region_leave("ipc-client", "send-command", NULL); + return ret; +} + +int ipc_client_send_command(const char *path, + const struct ipc_client_connect_options *options, + const char *message, struct strbuf *response) +{ + int ret = -1; + enum ipc_active_state state; + struct ipc_client_connection *connection = NULL; + + state = ipc_client_try_connect(path, options, &connection); + + if (state != IPC_STATE__LISTENING) + return ret; + + ret = ipc_client_send_command_to_connection(connection, message, response); + + ipc_client_close_connection(connection); + + return ret; +} + +/* + * Duplicate the given pipe handle and wrap it in a file descriptor so + * that we can use pkt-line on it. + */ +static int dup_fd_from_pipe(const HANDLE pipe) +{ + HANDLE process = GetCurrentProcess(); + HANDLE handle; + int fd; + + if (!DuplicateHandle(process, pipe, process, &handle, 0, FALSE, + DUPLICATE_SAME_ACCESS)) { + errno = err_win_to_posix(GetLastError()); + return -1; + } + + fd = _open_osfhandle((intptr_t)handle, O_RDWR|O_BINARY); + if (fd < 0) { + errno = err_win_to_posix(GetLastError()); + CloseHandle(handle); + return -1; + } + + /* + * `handle` is now owned by `fd` and will be automatically closed + * when the descriptor is closed. + */ + + return fd; +} + +/* + * Magic numbers used to annotate callback instance data. + * These are used to help guard against accidentally passing the + * wrong instance data across multiple levels of callbacks (which + * is easy to do if there are `void*` arguments). + */ +enum magic { + MAGIC_SERVER_REPLY_DATA, + MAGIC_SERVER_THREAD_DATA, + MAGIC_SERVER_DATA, +}; + +struct ipc_server_reply_data { + enum magic magic; + int fd; + struct ipc_server_thread_data *server_thread_data; +}; + +struct ipc_server_thread_data { + enum magic magic; + struct ipc_server_thread_data *next_thread; + struct ipc_server_data *server_data; + pthread_t pthread_id; + HANDLE hPipe; +}; + +/* + * On Windows, the conceptual "ipc-server" is implemented as a pool of + * n idential/peer "server-thread" threads. That is, there is no + * hierarchy of threads; and therefore no controller thread managing + * the pool. Each thread has an independent handle to the named pipe, + * receives incoming connections, processes the client, and re-uses + * the pipe for the next client connection. + * + * Therefore, the "ipc-server" only needs to maintain a list of the + * spawned threads for eventual "join" purposes. + * + * A single "stop-event" is visible to all of the server threads to + * tell them to shutdown (when idle). + */ +struct ipc_server_data { + enum magic magic; + ipc_server_application_cb *application_cb; + void *application_data; + struct strbuf buf_path; + wchar_t wpath[MAX_PATH]; + + HANDLE hEventStopRequested; + struct ipc_server_thread_data *thread_list; + int is_stopped; +}; + +enum connect_result { + CR_CONNECTED = 0, + CR_CONNECT_PENDING, + CR_CONNECT_ERROR, + CR_WAIT_ERROR, + CR_SHUTDOWN, +}; + +static enum connect_result queue_overlapped_connect( + struct ipc_server_thread_data *server_thread_data, + OVERLAPPED *lpo) +{ + if (ConnectNamedPipe(server_thread_data->hPipe, lpo)) + goto failed; + + switch (GetLastError()) { + case ERROR_IO_PENDING: + return CR_CONNECT_PENDING; + + case ERROR_PIPE_CONNECTED: + SetEvent(lpo->hEvent); + return CR_CONNECTED; + + default: + break; + } + +failed: + error(_("ConnectNamedPipe failed for '%s' (%lu)"), + server_thread_data->server_data->buf_path.buf, + GetLastError()); + return CR_CONNECT_ERROR; +} + +/* + * Use Windows Overlapped IO to wait for a connection or for our event + * to be signalled. + */ +static enum connect_result wait_for_connection( + struct ipc_server_thread_data *server_thread_data, + OVERLAPPED *lpo) +{ + enum connect_result r; + HANDLE waitHandles[2]; + DWORD dwWaitResult; + + r = queue_overlapped_connect(server_thread_data, lpo); + if (r != CR_CONNECT_PENDING) + return r; + + waitHandles[0] = server_thread_data->server_data->hEventStopRequested; + waitHandles[1] = lpo->hEvent; + + dwWaitResult = WaitForMultipleObjects(2, waitHandles, FALSE, INFINITE); + switch (dwWaitResult) { + case WAIT_OBJECT_0 + 0: + return CR_SHUTDOWN; + + case WAIT_OBJECT_0 + 1: + ResetEvent(lpo->hEvent); + return CR_CONNECTED; + + default: + return CR_WAIT_ERROR; + } +} + +/* + * Forward declare our reply callback function so that any compiler + * errors are reported when we actually define the function (in addition + * to any errors reported when we try to pass this callback function as + * a parameter in a function call). The former are easier to understand. + */ +static ipc_server_reply_cb do_io_reply_callback; + +/* + * Relay application's response message to the client process. + * (We do not flush at this point because we allow the caller + * to chunk data to the client thru us.) + */ +static int do_io_reply_callback(struct ipc_server_reply_data *reply_data, + const char *response, size_t response_len) +{ + if (reply_data->magic != MAGIC_SERVER_REPLY_DATA) + BUG("reply_cb called with wrong instance data"); + + return write_packetized_from_buf_no_flush(response, response_len, + reply_data->fd); +} + +/* + * Receive the request/command from the client and pass it to the + * registered request-callback. The request-callback will compose + * a response and call our reply-callback to send it to the client. + * + * Simple-IPC only contains one round trip, so we flush and close + * here after the response. + */ +static int do_io(struct ipc_server_thread_data *server_thread_data) +{ + struct strbuf buf = STRBUF_INIT; + struct ipc_server_reply_data reply_data; + int ret = 0; + + reply_data.magic = MAGIC_SERVER_REPLY_DATA; + reply_data.server_thread_data = server_thread_data; + + reply_data.fd = dup_fd_from_pipe(server_thread_data->hPipe); + if (reply_data.fd < 0) + return error(_("could not create fd from pipe for '%s'"), + server_thread_data->server_data->buf_path.buf); + + ret = read_packetized_to_strbuf( + reply_data.fd, &buf, + PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR); + if (ret >= 0) { + ret = server_thread_data->server_data->application_cb( + server_thread_data->server_data->application_data, + buf.buf, do_io_reply_callback, &reply_data); + + packet_flush_gently(reply_data.fd); + + FlushFileBuffers((HANDLE)_get_osfhandle((reply_data.fd))); + } + else { + /* + * The client probably disconnected/shutdown before it + * could send a well-formed message. Ignore it. + */ + } + + strbuf_release(&buf); + close(reply_data.fd); + + return ret; +} + +/* + * Handle IPC request and response with this connected client. And reset + * the pipe to prepare for the next client. + */ +static int use_connection(struct ipc_server_thread_data *server_thread_data) +{ + int ret; + + ret = do_io(server_thread_data); + + FlushFileBuffers(server_thread_data->hPipe); + DisconnectNamedPipe(server_thread_data->hPipe); + + return ret; +} + +/* + * Thread proc for an IPC server worker thread. It handles a series of + * connections from clients. It cleans and reuses the hPipe between each + * client. + */ +static void *server_thread_proc(void *_server_thread_data) +{ + struct ipc_server_thread_data *server_thread_data = _server_thread_data; + HANDLE hEventConnected = INVALID_HANDLE_VALUE; + OVERLAPPED oConnect; + enum connect_result cr; + int ret; + + assert(server_thread_data->hPipe != INVALID_HANDLE_VALUE); + + trace2_thread_start("ipc-server"); + trace2_data_string("ipc-server", NULL, "pipe", + server_thread_data->server_data->buf_path.buf); + + hEventConnected = CreateEventW(NULL, TRUE, FALSE, NULL); + + memset(&oConnect, 0, sizeof(oConnect)); + oConnect.hEvent = hEventConnected; + + for (;;) { + cr = wait_for_connection(server_thread_data, &oConnect); + + switch (cr) { + case CR_SHUTDOWN: + goto finished; + + case CR_CONNECTED: + ret = use_connection(server_thread_data); + if (ret == SIMPLE_IPC_QUIT) { + ipc_server_stop_async( + server_thread_data->server_data); + goto finished; + } + if (ret > 0) { + /* + * Ignore (transient) IO errors with this + * client and reset for the next client. + */ + } + break; + + case CR_CONNECT_PENDING: + /* By construction, this should not happen. */ + BUG("ipc-server[%s]: unexpeced CR_CONNECT_PENDING", + server_thread_data->server_data->buf_path.buf); + + case CR_CONNECT_ERROR: + case CR_WAIT_ERROR: + /* + * Ignore these theoretical errors. + */ + DisconnectNamedPipe(server_thread_data->hPipe); + break; + + default: + BUG("unandled case after wait_for_connection"); + } + } + +finished: + CloseHandle(server_thread_data->hPipe); + CloseHandle(hEventConnected); + + trace2_thread_exit(); + return NULL; +} + +static HANDLE create_new_pipe(wchar_t *wpath, int is_first) +{ + HANDLE hPipe; + DWORD dwOpenMode, dwPipeMode; + LPSECURITY_ATTRIBUTES lpsa = NULL; + + dwOpenMode = PIPE_ACCESS_INBOUND | PIPE_ACCESS_OUTBOUND | + FILE_FLAG_OVERLAPPED; + + dwPipeMode = PIPE_TYPE_MESSAGE | PIPE_READMODE_BYTE | PIPE_WAIT | + PIPE_REJECT_REMOTE_CLIENTS; + + if (is_first) { + dwOpenMode |= FILE_FLAG_FIRST_PIPE_INSTANCE; + + /* + * On Windows, the first server pipe instance gets to + * set the ACL / Security Attributes on the named + * pipe; subsequent instances inherit and cannot + * change them. + * + * TODO Should we allow the application layer to + * specify security attributes, such as `LocalService` + * or `LocalSystem`, when we create the named pipe? + * This question is probably not important when the + * daemon is started by a foreground user process and + * only needs to talk to the current user, but may be + * if the daemon is run via the Control Panel as a + * System Service. + */ + } + + hPipe = CreateNamedPipeW(wpath, dwOpenMode, dwPipeMode, + PIPE_UNLIMITED_INSTANCES, 1024, 1024, 0, lpsa); + + return hPipe; +} + +int ipc_server_run_async(struct ipc_server_data **returned_server_data, + const char *path, const struct ipc_server_opts *opts, + ipc_server_application_cb *application_cb, + void *application_data) +{ + struct ipc_server_data *server_data; + wchar_t wpath[MAX_PATH]; + HANDLE hPipeFirst = INVALID_HANDLE_VALUE; + int k; + int ret = 0; + int nr_threads = opts->nr_threads; + + *returned_server_data = NULL; + + ret = initialize_pipe_name(path, wpath, ARRAY_SIZE(wpath)); + if (ret < 0) { + errno = EINVAL; + return -1; + } + + hPipeFirst = create_new_pipe(wpath, 1); + if (hPipeFirst == INVALID_HANDLE_VALUE) { + errno = EADDRINUSE; + return -2; + } + + server_data = xcalloc(1, sizeof(*server_data)); + server_data->magic = MAGIC_SERVER_DATA; + server_data->application_cb = application_cb; + server_data->application_data = application_data; + server_data->hEventStopRequested = CreateEvent(NULL, TRUE, FALSE, NULL); + strbuf_init(&server_data->buf_path, 0); + strbuf_addstr(&server_data->buf_path, path); + wcscpy(server_data->wpath, wpath); + + if (nr_threads < 1) + nr_threads = 1; + + for (k = 0; k < nr_threads; k++) { + struct ipc_server_thread_data *std; + + std = xcalloc(1, sizeof(*std)); + std->magic = MAGIC_SERVER_THREAD_DATA; + std->server_data = server_data; + std->hPipe = INVALID_HANDLE_VALUE; + + std->hPipe = (k == 0) + ? hPipeFirst + : create_new_pipe(server_data->wpath, 0); + + if (std->hPipe == INVALID_HANDLE_VALUE) { + /* + * If we've reached a pipe instance limit for + * this path, just use fewer threads. + */ + free(std); + break; + } + + if (pthread_create(&std->pthread_id, NULL, + server_thread_proc, std)) { + /* + * Likewise, if we're out of threads, just use + * fewer threads than requested. + * + * However, we just give up if we can't even get + * one thread. This should not happen. + */ + if (k == 0) + die(_("could not start thread[0] for '%s'"), + path); + + CloseHandle(std->hPipe); + free(std); + break; + } + + std->next_thread = server_data->thread_list; + server_data->thread_list = std; + } + + *returned_server_data = server_data; + return 0; +} + +int ipc_server_stop_async(struct ipc_server_data *server_data) +{ + if (!server_data) + return 0; + + /* + * Gently tell all of the ipc_server threads to shutdown. + * This will be seen the next time they are idle (and waiting + * for a connection). + * + * We DO NOT attempt to force them to drop an active connection. + */ + SetEvent(server_data->hEventStopRequested); + return 0; +} + +int ipc_server_await(struct ipc_server_data *server_data) +{ + DWORD dwWaitResult; + + if (!server_data) + return 0; + + dwWaitResult = WaitForSingleObject(server_data->hEventStopRequested, INFINITE); + if (dwWaitResult != WAIT_OBJECT_0) + return error(_("wait for hEvent failed for '%s'"), + server_data->buf_path.buf); + + while (server_data->thread_list) { + struct ipc_server_thread_data *std = server_data->thread_list; + + pthread_join(std->pthread_id, NULL); + + server_data->thread_list = std->next_thread; + free(std); + } + + server_data->is_stopped = 1; + + return 0; +} + +void ipc_server_free(struct ipc_server_data *server_data) +{ + if (!server_data) + return; + + if (!server_data->is_stopped) + BUG("cannot free ipc-server while running for '%s'", + server_data->buf_path.buf); + + strbuf_release(&server_data->buf_path); + + if (server_data->hEventStopRequested != INVALID_HANDLE_VALUE) + CloseHandle(server_data->hEventStopRequested); + + while (server_data->thread_list) { + struct ipc_server_thread_data *std = server_data->thread_list; + + server_data->thread_list = std->next_thread; + free(std); + } + + free(server_data); +} |