summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Documentation/technical/api-simple-ipc.txt105
-rw-r--r--Makefile9
-rw-r--r--builtin/credential-cache--daemon.c3
-rw-r--r--builtin/credential-cache.c2
-rw-r--r--compat/simple-ipc/ipc-shared.c28
-rw-r--r--compat/simple-ipc/ipc-unix-socket.c999
-rw-r--r--compat/simple-ipc/ipc-win32.c751
-rw-r--r--config.mak.uname2
-rw-r--r--contrib/buildsystems/CMakeLists.txt8
-rw-r--r--convert.c11
-rw-r--r--pkt-line.c59
-rw-r--r--pkt-line.h17
-rw-r--r--simple-ipc.h239
-rw-r--r--t/helper/test-simple-ipc.c787
-rw-r--r--t/helper/test-tool.c1
-rw-r--r--t/helper/test-tool.h1
-rwxr-xr-xt/t0052-simple-ipc.sh122
-rw-r--r--unix-socket.c53
-rw-r--r--unix-socket.h12
-rw-r--r--unix-stream-server.c125
-rw-r--r--unix-stream-server.h33
21 files changed, 3315 insertions, 52 deletions
diff --git a/Documentation/technical/api-simple-ipc.txt b/Documentation/technical/api-simple-ipc.txt
new file mode 100644
index 0000000000..d79ad323e6
--- /dev/null
+++ b/Documentation/technical/api-simple-ipc.txt
@@ -0,0 +1,105 @@
+Simple-IPC API
+==============
+
+The Simple-IPC API is a collection of `ipc_` prefixed library routines
+and a basic communication protocol that allow an IPC-client process to
+send an application-specific IPC-request message to an IPC-server
+process and receive an application-specific IPC-response message.
+
+Communication occurs over a named pipe on Windows and a Unix domain
+socket on other platforms. IPC-clients and IPC-servers rendezvous at
+a previously agreed-to application-specific pathname (which is outside
+the scope of this design) that is local to the computer system.
+
+The IPC-server routines within the server application process create a
+thread pool to listen for connections and receive request messages
+from multiple concurrent IPC-clients. When received, these messages
+are dispatched up to the server application callbacks for handling.
+IPC-server routines then incrementally relay responses back to the
+IPC-client.
+
+The IPC-client routines within a client application process connect
+to the IPC-server and send a request message and wait for a response.
+When received, the response is returned back the caller.
+
+For example, the `fsmonitor--daemon` feature will be built as a server
+application on top of the IPC-server library routines. It will have
+threads watching for file system events and a thread pool waiting for
+client connections. Clients, such as `git status` will request a list
+of file system events since a point in time and the server will
+respond with a list of changed files and directories. The formats of
+the request and response are application-specific; the IPC-client and
+IPC-server routines treat them as opaque byte streams.
+
+
+Comparison with sub-process model
+---------------------------------
+
+The Simple-IPC mechanism differs from the existing `sub-process.c`
+model (Documentation/technical/long-running-process-protocol.txt) and
+used by applications like Git-LFS. In the LFS-style sub-process model
+the helper is started by the foreground process, communication happens
+via a pair of file descriptors bound to the stdin/stdout of the
+sub-process, the sub-process only serves the current foreground
+process, and the sub-process exits when the foreground process
+terminates.
+
+In the Simple-IPC model the server is a very long-running service. It
+can service many clients at the same time and has a private socket or
+named pipe connection to each active client. It might be started
+(on-demand) by the current client process or it might have been
+started by a previous client or by the OS at boot time. The server
+process is not associated with a terminal and it persists after
+clients terminate. Clients do not have access to the stdin/stdout of
+the server process and therefore must communicate over sockets or
+named pipes.
+
+
+Server startup and shutdown
+---------------------------
+
+How an application server based upon IPC-server is started is also
+outside the scope of the Simple-IPC design and is a property of the
+application using it. For example, the server might be started or
+restarted during routine maintenance operations, or it might be
+started as a system service during the system boot-up sequence, or it
+might be started on-demand by a foreground Git command when needed.
+
+Similarly, server shutdown is a property of the application using
+the simple-ipc routines. For example, the server might decide to
+shutdown when idle or only upon explicit request.
+
+
+Simple-IPC protocol
+-------------------
+
+The Simple-IPC protocol consists of a single request message from the
+client and an optional response message from the server. Both the
+client and server messages are unlimited in length and are terminated
+with a flush packet.
+
+The pkt-line routines (Documentation/technical/protocol-common.txt)
+are used to simplify buffer management during message generation,
+transmission, and reception. A flush packet is used to mark the end
+of the message. This allows the sender to incrementally generate and
+transmit the message. It allows the receiver to incrementally receive
+the message in chunks and to know when they have received the entire
+message.
+
+The actual byte format of the client request and server response
+messages are application specific. The IPC layer transmits and
+receives them as opaque byte buffers without any concern for the
+content within. It is the job of the calling application layer to
+understand the contents of the request and response messages.
+
+
+Summary
+-------
+
+Conceptually, the Simple-IPC protocol is similar to an HTTP REST
+request. Clients connect, make an application-specific and
+stateless request, receive an application-specific
+response, and disconnect. It is a one round trip facility for
+querying the server. The Simple-IPC routines hide the socket,
+named pipe, and thread pool details and allow the application
+layer to focus on the application at hand.
diff --git a/Makefile b/Makefile
index 55c8035fa8..a6a73c5741 100644
--- a/Makefile
+++ b/Makefile
@@ -744,6 +744,7 @@ TEST_BUILTINS_OBJS += test-serve-v2.o
TEST_BUILTINS_OBJS += test-sha1.o
TEST_BUILTINS_OBJS += test-sha256.o
TEST_BUILTINS_OBJS += test-sigchain.o
+TEST_BUILTINS_OBJS += test-simple-ipc.o
TEST_BUILTINS_OBJS += test-strcmp-offset.o
TEST_BUILTINS_OBJS += test-string-list.o
TEST_BUILTINS_OBJS += test-submodule-config.o
@@ -1679,6 +1680,14 @@ ifdef NO_UNIX_SOCKETS
BASIC_CFLAGS += -DNO_UNIX_SOCKETS
else
LIB_OBJS += unix-socket.o
+ LIB_OBJS += unix-stream-server.o
+ LIB_OBJS += compat/simple-ipc/ipc-shared.o
+ LIB_OBJS += compat/simple-ipc/ipc-unix-socket.o
+endif
+
+ifdef USE_WIN32_IPC
+ LIB_OBJS += compat/simple-ipc/ipc-shared.o
+ LIB_OBJS += compat/simple-ipc/ipc-win32.o
endif
ifdef NO_ICONV
diff --git a/builtin/credential-cache--daemon.c b/builtin/credential-cache--daemon.c
index c61f123a3b..4c6c89ab0d 100644
--- a/builtin/credential-cache--daemon.c
+++ b/builtin/credential-cache--daemon.c
@@ -203,9 +203,10 @@ static int serve_cache_loop(int fd)
static void serve_cache(const char *socket_path, int debug)
{
+ struct unix_stream_listen_opts opts = UNIX_STREAM_LISTEN_OPTS_INIT;
int fd;
- fd = unix_stream_listen(socket_path);
+ fd = unix_stream_listen(socket_path, &opts);
if (fd < 0)
die_errno("unable to bind to '%s'", socket_path);
diff --git a/builtin/credential-cache.c b/builtin/credential-cache.c
index 9b3f709905..76a6ba3722 100644
--- a/builtin/credential-cache.c
+++ b/builtin/credential-cache.c
@@ -14,7 +14,7 @@
static int send_request(const char *socket, const struct strbuf *out)
{
int got_data = 0;
- int fd = unix_stream_connect(socket);
+ int fd = unix_stream_connect(socket, 0);
if (fd < 0)
return -1;
diff --git a/compat/simple-ipc/ipc-shared.c b/compat/simple-ipc/ipc-shared.c
new file mode 100644
index 0000000000..1edec81595
--- /dev/null
+++ b/compat/simple-ipc/ipc-shared.c
@@ -0,0 +1,28 @@
+#include "cache.h"
+#include "simple-ipc.h"
+#include "strbuf.h"
+#include "pkt-line.h"
+#include "thread-utils.h"
+
+#ifdef SUPPORTS_SIMPLE_IPC
+
+int ipc_server_run(const char *path, const struct ipc_server_opts *opts,
+ ipc_server_application_cb *application_cb,
+ void *application_data)
+{
+ struct ipc_server_data *server_data = NULL;
+ int ret;
+
+ ret = ipc_server_run_async(&server_data, path, opts,
+ application_cb, application_data);
+ if (ret)
+ return ret;
+
+ ret = ipc_server_await(server_data);
+
+ ipc_server_free(server_data);
+
+ return ret;
+}
+
+#endif /* SUPPORTS_SIMPLE_IPC */
diff --git a/compat/simple-ipc/ipc-unix-socket.c b/compat/simple-ipc/ipc-unix-socket.c
new file mode 100644
index 0000000000..38689b278d
--- /dev/null
+++ b/compat/simple-ipc/ipc-unix-socket.c
@@ -0,0 +1,999 @@
+#include "cache.h"
+#include "simple-ipc.h"
+#include "strbuf.h"
+#include "pkt-line.h"
+#include "thread-utils.h"
+#include "unix-socket.h"
+#include "unix-stream-server.h"
+
+#ifdef NO_UNIX_SOCKETS
+#error compat/simple-ipc/ipc-unix-socket.c requires Unix sockets
+#endif
+
+enum ipc_active_state ipc_get_active_state(const char *path)
+{
+ enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
+ struct ipc_client_connect_options options
+ = IPC_CLIENT_CONNECT_OPTIONS_INIT;
+ struct stat st;
+ struct ipc_client_connection *connection_test = NULL;
+
+ options.wait_if_busy = 0;
+ options.wait_if_not_found = 0;
+
+ if (lstat(path, &st) == -1) {
+ switch (errno) {
+ case ENOENT:
+ case ENOTDIR:
+ return IPC_STATE__NOT_LISTENING;
+ default:
+ return IPC_STATE__INVALID_PATH;
+ }
+ }
+
+ /* also complain if a plain file is in the way */
+ if ((st.st_mode & S_IFMT) != S_IFSOCK)
+ return IPC_STATE__INVALID_PATH;
+
+ /*
+ * Just because the filesystem has a S_IFSOCK type inode
+ * at `path`, doesn't mean it that there is a server listening.
+ * Ping it to be sure.
+ */
+ state = ipc_client_try_connect(path, &options, &connection_test);
+ ipc_client_close_connection(connection_test);
+
+ return state;
+}
+
+/*
+ * Retry frequency when trying to connect to a server.
+ *
+ * This value should be short enough that we don't seriously delay our
+ * caller, but not fast enough that our spinning puts pressure on the
+ * system.
+ */
+#define WAIT_STEP_MS (50)
+
+/*
+ * Try to connect to the server. If the server is just starting up or
+ * is very busy, we may not get a connection the first time.
+ */
+static enum ipc_active_state connect_to_server(
+ const char *path,
+ int timeout_ms,
+ const struct ipc_client_connect_options *options,
+ int *pfd)
+{
+ int k;
+
+ *pfd = -1;
+
+ for (k = 0; k < timeout_ms; k += WAIT_STEP_MS) {
+ int fd = unix_stream_connect(path, options->uds_disallow_chdir);
+
+ if (fd != -1) {
+ *pfd = fd;
+ return IPC_STATE__LISTENING;
+ }
+
+ if (errno == ENOENT) {
+ if (!options->wait_if_not_found)
+ return IPC_STATE__PATH_NOT_FOUND;
+
+ goto sleep_and_try_again;
+ }
+
+ if (errno == ETIMEDOUT) {
+ if (!options->wait_if_busy)
+ return IPC_STATE__NOT_LISTENING;
+
+ goto sleep_and_try_again;
+ }
+
+ if (errno == ECONNREFUSED) {
+ if (!options->wait_if_busy)
+ return IPC_STATE__NOT_LISTENING;
+
+ goto sleep_and_try_again;
+ }
+
+ return IPC_STATE__OTHER_ERROR;
+
+ sleep_and_try_again:
+ sleep_millisec(WAIT_STEP_MS);
+ }
+
+ return IPC_STATE__NOT_LISTENING;
+}
+
+/*
+ * The total amount of time that we are willing to wait when trying to
+ * connect to a server.
+ *
+ * When the server is first started, it might take a little while for
+ * it to become ready to service requests. Likewise, the server may
+ * be very (temporarily) busy and not respond to our connections.
+ *
+ * We should gracefully and silently handle those conditions and try
+ * again for a reasonable time period.
+ *
+ * The value chosen here should be long enough for the server
+ * to reliably heal from the above conditions.
+ */
+#define MY_CONNECTION_TIMEOUT_MS (1000)
+
+enum ipc_active_state ipc_client_try_connect(
+ const char *path,
+ const struct ipc_client_connect_options *options,
+ struct ipc_client_connection **p_connection)
+{
+ enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
+ int fd = -1;
+
+ *p_connection = NULL;
+
+ trace2_region_enter("ipc-client", "try-connect", NULL);
+ trace2_data_string("ipc-client", NULL, "try-connect/path", path);
+
+ state = connect_to_server(path, MY_CONNECTION_TIMEOUT_MS,
+ options, &fd);
+
+ trace2_data_intmax("ipc-client", NULL, "try-connect/state",
+ (intmax_t)state);
+ trace2_region_leave("ipc-client", "try-connect", NULL);
+
+ if (state == IPC_STATE__LISTENING) {
+ (*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection));
+ (*p_connection)->fd = fd;
+ }
+
+ return state;
+}
+
+void ipc_client_close_connection(struct ipc_client_connection *connection)
+{
+ if (!connection)
+ return;
+
+ if (connection->fd != -1)
+ close(connection->fd);
+
+ free(connection);
+}
+
+int ipc_client_send_command_to_connection(
+ struct ipc_client_connection *connection,
+ const char *message, struct strbuf *answer)
+{
+ int ret = 0;
+
+ strbuf_setlen(answer, 0);
+
+ trace2_region_enter("ipc-client", "send-command", NULL);
+
+ if (write_packetized_from_buf_no_flush(message, strlen(message),
+ connection->fd) < 0 ||
+ packet_flush_gently(connection->fd) < 0) {
+ ret = error(_("could not send IPC command"));
+ goto done;
+ }
+
+ if (read_packetized_to_strbuf(
+ connection->fd, answer,
+ PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) {
+ ret = error(_("could not read IPC response"));
+ goto done;
+ }
+
+done:
+ trace2_region_leave("ipc-client", "send-command", NULL);
+ return ret;
+}
+
+int ipc_client_send_command(const char *path,
+ const struct ipc_client_connect_options *options,
+ const char *message, struct strbuf *answer)
+{
+ int ret = -1;
+ enum ipc_active_state state;
+ struct ipc_client_connection *connection = NULL;
+
+ state = ipc_client_try_connect(path, options, &connection);
+
+ if (state != IPC_STATE__LISTENING)
+ return ret;
+
+ ret = ipc_client_send_command_to_connection(connection, message, answer);
+
+ ipc_client_close_connection(connection);
+
+ return ret;
+}
+
+static int set_socket_blocking_flag(int fd, int make_nonblocking)
+{
+ int flags;
+
+ flags = fcntl(fd, F_GETFL, NULL);
+
+ if (flags < 0)
+ return -1;
+
+ if (make_nonblocking)
+ flags |= O_NONBLOCK;
+ else
+ flags &= ~O_NONBLOCK;
+
+ return fcntl(fd, F_SETFL, flags);
+}
+
+/*
+ * Magic numbers used to annotate callback instance data.
+ * These are used to help guard against accidentally passing the
+ * wrong instance data across multiple levels of callbacks (which
+ * is easy to do if there are `void*` arguments).
+ */
+enum magic {
+ MAGIC_SERVER_REPLY_DATA,
+ MAGIC_WORKER_THREAD_DATA,
+ MAGIC_ACCEPT_THREAD_DATA,
+ MAGIC_SERVER_DATA,
+};
+
+struct ipc_server_reply_data {
+ enum magic magic;
+ int fd;
+ struct ipc_worker_thread_data *worker_thread_data;
+};
+
+struct ipc_worker_thread_data {
+ enum magic magic;
+ struct ipc_worker_thread_data *next_thread;
+ struct ipc_server_data *server_data;
+ pthread_t pthread_id;
+};
+
+struct ipc_accept_thread_data {
+ enum magic magic;
+ struct ipc_server_data *server_data;
+
+ struct unix_ss_socket *server_socket;
+
+ int fd_send_shutdown;
+ int fd_wait_shutdown;
+ pthread_t pthread_id;
+};
+
+/*
+ * With unix-sockets, the conceptual "ipc-server" is implemented as a single
+ * controller "accept-thread" thread and a pool of "worker-thread" threads.
+ * The former does the usual `accept()` loop and dispatches connections
+ * to an idle worker thread. The worker threads wait in an idle loop for
+ * a new connection, communicate with the client and relay data to/from
+ * the `application_cb` and then wait for another connection from the
+ * server thread. This avoids the overhead of constantly creating and
+ * destroying threads.
+ */
+struct ipc_server_data {
+ enum magic magic;
+ ipc_server_application_cb *application_cb;
+ void *application_data;
+ struct strbuf buf_path;
+
+ struct ipc_accept_thread_data *accept_thread;
+ struct ipc_worker_thread_data *worker_thread_list;
+
+ pthread_mutex_t work_available_mutex;
+ pthread_cond_t work_available_cond;
+
+ /*
+ * Accepted but not yet processed client connections are kept
+ * in a circular buffer FIFO. The queue is empty when the
+ * positions are equal.
+ */
+ int *fifo_fds;
+ int queue_size;
+ int back_pos;
+ int front_pos;
+
+ int shutdown_requested;
+ int is_stopped;
+};
+
+/*
+ * Remove and return the oldest queued connection.
+ *
+ * Returns -1 if empty.
+ */
+static int fifo_dequeue(struct ipc_server_data *server_data)
+{
+ /* ASSERT holding mutex */
+
+ int fd;
+
+ if (server_data->back_pos == server_data->front_pos)
+ return -1;
+
+ fd = server_data->fifo_fds[server_data->front_pos];
+ server_data->fifo_fds[server_data->front_pos] = -1;
+
+ server_data->front_pos++;
+ if (server_data->front_pos == server_data->queue_size)
+ server_data->front_pos = 0;
+
+ return fd;
+}
+
+/*
+ * Push a new fd onto the back of the queue.
+ *
+ * Drop it and return -1 if queue is already full.
+ */
+static int fifo_enqueue(struct ipc_server_data *server_data, int fd)
+{
+ /* ASSERT holding mutex */
+
+ int next_back_pos;
+
+ next_back_pos = server_data->back_pos + 1;
+ if (next_back_pos == server_data->queue_size)
+ next_back_pos = 0;
+
+ if (next_back_pos == server_data->front_pos) {
+ /* Queue is full. Just drop it. */
+ close(fd);
+ return -1;
+ }
+
+ server_data->fifo_fds[server_data->back_pos] = fd;
+ server_data->back_pos = next_back_pos;
+
+ return fd;
+}
+
+/*
+ * Wait for a connection to be queued to the FIFO and return it.
+ *
+ * Returns -1 if someone has already requested a shutdown.
+ */
+static int worker_thread__wait_for_connection(
+ struct ipc_worker_thread_data *worker_thread_data)
+{
+ /* ASSERT NOT holding mutex */
+
+ struct ipc_server_data *server_data = worker_thread_data->server_data;
+ int fd = -1;
+
+ pthread_mutex_lock(&server_data->work_available_mutex);
+ for (;;) {
+ if (server_data->shutdown_requested)
+ break;
+
+ fd = fifo_dequeue(server_data);
+ if (fd >= 0)
+ break;
+
+ pthread_cond_wait(&server_data->work_available_cond,
+ &server_data->work_available_mutex);
+ }
+ pthread_mutex_unlock(&server_data->work_available_mutex);
+
+ return fd;
+}
+
+/*
+ * Forward declare our reply callback function so that any compiler
+ * errors are reported when we actually define the function (in addition
+ * to any errors reported when we try to pass this callback function as
+ * a parameter in a function call). The former are easier to understand.
+ */
+static ipc_server_reply_cb do_io_reply_callback;
+
+/*
+ * Relay application's response message to the client process.
+ * (We do not flush at this point because we allow the caller
+ * to chunk data to the client thru us.)
+ */
+static int do_io_reply_callback(struct ipc_server_reply_data *reply_data,
+ const char *response, size_t response_len)
+{
+ if (reply_data->magic != MAGIC_SERVER_REPLY_DATA)
+ BUG("reply_cb called with wrong instance data");
+
+ return write_packetized_from_buf_no_flush(response, response_len,
+ reply_data->fd);
+}
+
+/* A randomly chosen value. */
+#define MY_WAIT_POLL_TIMEOUT_MS (10)
+
+/*
+ * If the client hangs up without sending any data on the wire, just
+ * quietly close the socket and ignore this client.
+ *
+ * This worker thread is committed to reading the IPC request data
+ * from the client at the other end of this fd. Wait here for the
+ * client to actually put something on the wire -- because if the
+ * client just does a ping (connect and hangup without sending any
+ * data), our use of the pkt-line read routines will spew an error
+ * message.
+ *
+ * Return -1 if the client hung up.
+ * Return 0 if data (possibly incomplete) is ready.
+ */
+static int worker_thread__wait_for_io_start(
+ struct ipc_worker_thread_data *worker_thread_data,
+ int fd)
+{
+ struct ipc_server_data *server_data = worker_thread_data->server_data;
+ struct pollfd pollfd[1];
+ int result;
+
+ for (;;) {
+ pollfd[0].fd = fd;
+ pollfd[0].events = POLLIN;
+
+ result = poll(pollfd, 1, MY_WAIT_POLL_TIMEOUT_MS);
+ if (result < 0) {
+ if (errno == EINTR)
+ continue;
+ goto cleanup;
+ }
+
+ if (result == 0) {
+ /* a timeout */
+
+ int in_shutdown;
+
+ pthread_mutex_lock(&server_data->work_available_mutex);
+ in_shutdown = server_data->shutdown_requested;
+ pthread_mutex_unlock(&server_data->work_available_mutex);
+
+ /*
+ * If a shutdown is already in progress and this
+ * client has not started talking yet, just drop it.
+ */
+ if (in_shutdown)
+ goto cleanup;
+ continue;
+ }
+
+ if (pollfd[0].revents & POLLHUP)
+ goto cleanup;
+
+ if (pollfd[0].revents & POLLIN)
+ return 0;
+
+ goto cleanup;
+ }
+
+cleanup:
+ close(fd);
+ return -1;
+}
+
+/*
+ * Receive the request/command from the client and pass it to the
+ * registered request-callback. The request-callback will compose
+ * a response and call our reply-callback to send it to the client.
+ */
+static int worker_thread__do_io(
+ struct ipc_worker_thread_data *worker_thread_data,
+ int fd)
+{
+ /* ASSERT NOT holding lock */
+
+ struct strbuf buf = STRBUF_INIT;
+ struct ipc_server_reply_data reply_data;
+ int ret = 0;
+
+ reply_data.magic = MAGIC_SERVER_REPLY_DATA;
+ reply_data.worker_thread_data = worker_thread_data;
+
+ reply_data.fd = fd;
+
+ ret = read_packetized_to_strbuf(
+ reply_data.fd, &buf,
+ PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR);
+ if (ret >= 0) {
+ ret = worker_thread_data->server_data->application_cb(
+ worker_thread_data->server_data->application_data,
+ buf.buf, do_io_reply_callback, &reply_data);
+
+ packet_flush_gently(reply_data.fd);
+ }
+ else {
+ /*
+ * The client probably disconnected/shutdown before it
+ * could send a well-formed message. Ignore it.
+ */
+ }
+
+ strbuf_release(&buf);
+ close(reply_data.fd);
+
+ return ret;
+}
+
+/*
+ * Block SIGPIPE on the current thread (so that we get EPIPE from
+ * write() rather than an actual signal).
+ *
+ * Note that using sigchain_push() and _pop() to control SIGPIPE
+ * around our IO calls is not thread safe:
+ * [] It uses a global stack of handler frames.
+ * [] It uses ALLOC_GROW() to resize it.
+ * [] Finally, according to the `signal(2)` man-page:
+ * "The effects of `signal()` in a multithreaded process are unspecified."
+ */
+static void thread_block_sigpipe(sigset_t *old_set)
+{
+ sigset_t new_set;
+
+ sigemptyset(&new_set);
+ sigaddset(&new_set, SIGPIPE);
+
+ sigemptyset(old_set);
+ pthread_sigmask(SIG_BLOCK, &new_set, old_set);
+}
+
+/*
+ * Thread proc for an IPC worker thread. It handles a series of
+ * connections from clients. It pulls the next fd from the queue
+ * processes it, and then waits for the next client.
+ *
+ * Block SIGPIPE in this worker thread for the life of the thread.
+ * This avoids stray (and sometimes delayed) SIGPIPE signals caused
+ * by client errors and/or when we are under extremely heavy IO load.
+ *
+ * This means that the application callback will have SIGPIPE blocked.
+ * The callback should not change it.
+ */
+static void *worker_thread_proc(void *_worker_thread_data)
+{
+ struct ipc_worker_thread_data *worker_thread_data = _worker_thread_data;
+ struct ipc_server_data *server_data = worker_thread_data->server_data;
+ sigset_t old_set;
+ int fd, io;
+ int ret;
+
+ trace2_thread_start("ipc-worker");
+
+ thread_block_sigpipe(&old_set);
+
+ for (;;) {
+ fd = worker_thread__wait_for_connection(worker_thread_data);
+ if (fd == -1)
+ break; /* in shutdown */
+
+ io = worker_thread__wait_for_io_start(worker_thread_data, fd);
+ if (io == -1)
+ continue; /* client hung up without sending anything */
+
+ ret = worker_thread__do_io(worker_thread_data, fd);
+
+ if (ret == SIMPLE_IPC_QUIT) {
+ trace2_data_string("ipc-worker", NULL, "queue_stop_async",
+ "application_quit");
+ /*
+ * The application layer is telling the ipc-server
+ * layer to shutdown.
+ *
+ * We DO NOT have a response to send to the client.
+ *
+ * Queue an async stop (to stop the other threads) and
+ * allow this worker thread to exit now (no sense waiting
+ * for the thread-pool shutdown signal).
+ *
+ * Other non-idle worker threads are allowed to finish
+ * responding to their current clients.
+ */
+ ipc_server_stop_async(server_data);
+ break;
+ }
+ }
+
+ trace2_thread_exit();
+ return NULL;
+}
+
+/* A randomly chosen value. */
+#define MY_ACCEPT_POLL_TIMEOUT_MS (60 * 1000)
+
+/*
+ * Accept a new client connection on our socket. This uses non-blocking
+ * IO so that we can also wait for shutdown requests on our socket-pair
+ * without actually spinning on a fast timeout.
+ */
+static int accept_thread__wait_for_connection(
+ struct ipc_accept_thread_data *accept_thread_data)
+{
+ struct pollfd pollfd[2];
+ int result;
+
+ for (;;) {
+ pollfd[0].fd = accept_thread_data->fd_wait_shutdown;
+ pollfd[0].events = POLLIN;
+
+ pollfd[1].fd = accept_thread_data->server_socket->fd_socket;
+ pollfd[1].events = POLLIN;
+
+ result = poll(pollfd, 2, MY_ACCEPT_POLL_TIMEOUT_MS);
+ if (result < 0) {
+ if (errno == EINTR)
+ continue;
+ return result;
+ }
+
+ if (result == 0) {
+ /* a timeout */
+
+ /*
+ * If someone deletes or force-creates a new unix
+ * domain socket at our path, all future clients
+ * will be routed elsewhere and we silently starve.
+ * If that happens, just queue a shutdown.
+ */
+ if (unix_ss_was_stolen(
+ accept_thread_data->server_socket)) {
+ trace2_data_string("ipc-accept", NULL,
+ "queue_stop_async",
+ "socket_stolen");
+ ipc_server_stop_async(
+ accept_thread_data->server_data);
+ }
+ continue;
+ }
+
+ if (pollfd[0].revents & POLLIN) {
+ /* shutdown message queued to socketpair */
+ return -1;
+ }
+
+ if (pollfd[1].revents & POLLIN) {
+ /* a connection is available on server_socket */
+
+ int client_fd =
+ accept(accept_thread_data->server_socket->fd_socket,
+ NULL, NULL);
+ if (client_fd >= 0)
+ return client_fd;
+
+ /*
+ * An error here is unlikely -- it probably
+ * indicates that the connecting process has
+ * already dropped the connection.
+ */
+ continue;
+ }
+
+ BUG("unandled poll result errno=%d r[0]=%d r[1]=%d",
+ errno, pollfd[0].revents, pollfd[1].revents);
+ }
+}
+
+/*
+ * Thread proc for the IPC server "accept thread". This waits for
+ * an incoming socket connection, appends it to the queue of available
+ * connections, and notifies a worker thread to process it.
+ *
+ * Block SIGPIPE in this thread for the life of the thread. This
+ * avoids any stray SIGPIPE signals when closing pipe fds under
+ * extremely heavy loads (such as when the fifo queue is full and we
+ * drop incomming connections).
+ */
+static void *accept_thread_proc(void *_accept_thread_data)
+{
+ struct ipc_accept_thread_data *accept_thread_data = _accept_thread_data;
+ struct ipc_server_data *server_data = accept_thread_data->server_data;
+ sigset_t old_set;
+
+ trace2_thread_start("ipc-accept");
+
+ thread_block_sigpipe(&old_set);
+
+ for (;;) {
+ int client_fd = accept_thread__wait_for_connection(
+ accept_thread_data);
+
+ pthread_mutex_lock(&server_data->work_available_mutex);
+ if (server_data->shutdown_requested) {
+ pthread_mutex_unlock(&server_data->work_available_mutex);
+ if (client_fd >= 0)
+ close(client_fd);
+ break;
+ }
+
+ if (client_fd < 0) {
+ /* ignore transient accept() errors */
+ }
+ else {
+ fifo_enqueue(server_data, client_fd);
+ pthread_cond_broadcast(&server_data->work_available_cond);
+ }
+ pthread_mutex_unlock(&server_data->work_available_mutex);
+ }
+
+ trace2_thread_exit();
+ return NULL;
+}
+
+/*
+ * We can't predict the connection arrival rate relative to the worker
+ * processing rate, therefore we allow the "accept-thread" to queue up
+ * a generous number of connections, since we'd rather have the client
+ * not unnecessarily timeout if we can avoid it. (The assumption is
+ * that this will be used for FSMonitor and a few second wait on a
+ * connection is better than having the client timeout and do the full
+ * computation itself.)
+ *
+ * The FIFO queue size is set to a multiple of the worker pool size.
+ * This value chosen at random.
+ */
+#define FIFO_SCALE (100)
+
+/*
+ * The backlog value for `listen(2)`. This doesn't need to huge,
+ * rather just large enough for our "accept-thread" to wake up and
+ * queue incoming connections onto the FIFO without the kernel
+ * dropping any.
+ *
+ * This value chosen at random.
+ */
+#define LISTEN_BACKLOG (50)
+
+static int create_listener_socket(
+ const char *path,
+ const struct ipc_server_opts *ipc_opts,
+ struct unix_ss_socket **new_server_socket)
+{
+ struct unix_ss_socket *server_socket = NULL;
+ struct unix_stream_listen_opts uslg_opts = UNIX_STREAM_LISTEN_OPTS_INIT;
+ int ret;
+
+ uslg_opts.listen_backlog_size = LISTEN_BACKLOG;
+ uslg_opts.disallow_chdir = ipc_opts->uds_disallow_chdir;
+
+ ret = unix_ss_create(path, &uslg_opts, -1, &server_socket);
+ if (ret)
+ return ret;
+
+ if (set_socket_blocking_flag(server_socket->fd_socket, 1)) {
+ int saved_errno = errno;
+ unix_ss_free(server_socket);
+ errno = saved_errno;
+ return -1;
+ }
+
+ *new_server_socket = server_socket;
+
+ trace2_data_string("ipc-server", NULL, "listen-with-lock", path);
+ return 0;
+}
+
+static int setup_listener_socket(
+ const char *path,
+ const struct ipc_server_opts *ipc_opts,
+ struct unix_ss_socket **new_server_socket)
+{
+ int ret, saved_errno;
+
+ trace2_region_enter("ipc-server", "create-listener_socket", NULL);
+
+ ret = create_listener_socket(path, ipc_opts, new_server_socket);
+
+ saved_errno = errno;
+ trace2_region_leave("ipc-server", "create-listener_socket", NULL);
+ errno = saved_errno;
+
+ return ret;
+}
+
+/*
+ * Start IPC server in a pool of background threads.
+ */
+int ipc_server_run_async(struct ipc_server_data **returned_server_data,
+ const char *path, const struct ipc_server_opts *opts,
+ ipc_server_application_cb *application_cb,
+ void *application_data)
+{
+ struct unix_ss_socket *server_socket = NULL;
+ struct ipc_server_data *server_data;
+ int sv[2];
+ int k;
+ int ret;
+ int nr_threads = opts->nr_threads;
+
+ *returned_server_data = NULL;
+
+ /*
+ * Create a socketpair and set sv[1] to non-blocking. This
+ * will used to send a shutdown message to the accept-thread
+ * and allows the accept-thread to wait on EITHER a client
+ * connection or a shutdown request without spinning.
+ */
+ if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) < 0)
+ return -1;
+
+ if (set_socket_blocking_flag(sv[1], 1)) {
+ int saved_errno = errno;
+ close(sv[0]);
+ close(sv[1]);
+ errno = saved_errno;
+ return -1;
+ }
+
+ ret = setup_listener_socket(path, opts, &server_socket);
+ if (ret) {
+ int saved_errno = errno;
+ close(sv[0]);
+ close(sv[1]);
+ errno = saved_errno;
+ return ret;
+ }
+
+ server_data = xcalloc(1, sizeof(*server_data));
+ server_data->magic = MAGIC_SERVER_DATA;
+ server_data->application_cb = application_cb;
+ server_data->application_data = application_data;
+ strbuf_init(&server_data->buf_path, 0);
+ strbuf_addstr(&server_data->buf_path, path);
+
+ if (nr_threads < 1)
+ nr_threads = 1;
+
+ pthread_mutex_init(&server_data->work_available_mutex, NULL);
+ pthread_cond_init(&server_data->work_available_cond, NULL);
+
+ server_data->queue_size = nr_threads * FIFO_SCALE;
+ CALLOC_ARRAY(server_data->fifo_fds, server_data->queue_size);
+
+ server_data->accept_thread =
+ xcalloc(1, sizeof(*server_data->accept_thread));
+ server_data->accept_thread->magic = MAGIC_ACCEPT_THREAD_DATA;
+ server_data->accept_thread->server_data = server_data;
+ server_data->accept_thread->server_socket = server_socket;
+ server_data->accept_thread->fd_send_shutdown = sv[0];
+ server_data->accept_thread->fd_wait_shutdown = sv[1];
+
+ if (pthread_create(&server_data->accept_thread->pthread_id, NULL,
+ accept_thread_proc, server_data->accept_thread))
+ die_errno(_("could not start accept_thread '%s'"), path);
+
+ for (k = 0; k < nr_threads; k++) {
+ struct ipc_worker_thread_data *wtd;
+
+ wtd = xcalloc(1, sizeof(*wtd));
+ wtd->magic = MAGIC_WORKER_THREAD_DATA;
+ wtd->server_data = server_data;
+
+ if (pthread_create(&wtd->pthread_id, NULL, worker_thread_proc,
+ wtd)) {
+ if (k == 0)
+ die(_("could not start worker[0] for '%s'"),
+ path);
+ /*
+ * Limp along with the thread pool that we have.
+ */
+ break;
+ }
+
+ wtd->next_thread = server_data->worker_thread_list;
+ server_data->worker_thread_list = wtd;
+ }
+
+ *returned_server_data = server_data;
+ return 0;
+}
+
+/*
+ * Gently tell the IPC server treads to shutdown.
+ * Can be run on any thread.
+ */
+int ipc_server_stop_async(struct ipc_server_data *server_data)
+{
+ /* ASSERT NOT holding mutex */
+
+ int fd;
+
+ if (!server_data)
+ return 0;
+
+ trace2_region_enter("ipc-server", "server-stop-async", NULL);
+
+ pthread_mutex_lock(&server_data->work_available_mutex);
+
+ server_data->shutdown_requested = 1;
+
+ /*
+ * Write a byte to the shutdown socket pair to wake up the
+ * accept-thread.
+ */
+ if (write(server_data->accept_thread->fd_send_shutdown, "Q", 1) < 0)
+ error_errno("could not write to fd_send_shutdown");
+
+ /*
+ * Drain the queue of existing connections.
+ */
+ while ((fd = fifo_dequeue(server_data)) != -1)
+ close(fd);
+
+ /*
+ * Gently tell worker threads to stop processing new connections
+ * and exit. (This does not abort in-process conversations.)
+ */
+ pthread_cond_broadcast(&server_data->work_available_cond);
+
+ pthread_mutex_unlock(&server_data->work_available_mutex);
+
+ trace2_region_leave("ipc-server", "server-stop-async", NULL);
+
+ return 0;
+}
+
+/*
+ * Wait for all IPC server threads to stop.
+ */
+int ipc_server_await(struct ipc_server_data *server_data)
+{
+ pthread_join(server_data->accept_thread->pthread_id, NULL);
+
+ if (!server_data->shutdown_requested)
+ BUG("ipc-server: accept-thread stopped for '%s'",
+ server_data->buf_path.buf);
+
+ while (server_data->worker_thread_list) {
+ struct ipc_worker_thread_data *wtd =
+ server_data->worker_thread_list;
+
+ pthread_join(wtd->pthread_id, NULL);
+
+ server_data->worker_thread_list = wtd->next_thread;
+ free(wtd);
+ }
+
+ server_data->is_stopped = 1;
+
+ return 0;
+}
+
+void ipc_server_free(struct ipc_server_data *server_data)
+{
+ struct ipc_accept_thread_data * accept_thread_data;
+
+ if (!server_data)
+ return;
+
+ if (!server_data->is_stopped)
+ BUG("cannot free ipc-server while running for '%s'",
+ server_data->buf_path.buf);
+
+ accept_thread_data = server_data->accept_thread;
+ if (accept_thread_data) {
+ unix_ss_free(accept_thread_data->server_socket);
+
+ if (accept_thread_data->fd_send_shutdown != -1)
+ close(accept_thread_data->fd_send_shutdown);
+ if (accept_thread_data->fd_wait_shutdown != -1)
+ close(accept_thread_data->fd_wait_shutdown);
+
+ free(server_data->accept_thread);
+ }
+
+ while (server_data->worker_thread_list) {
+ struct ipc_worker_thread_data *wtd =
+ server_data->worker_thread_list;
+
+ server_data->worker_thread_list = wtd->next_thread;
+ free(wtd);
+ }
+
+ pthread_cond_destroy(&server_data->work_available_cond);
+ pthread_mutex_destroy(&server_data->work_available_mutex);
+
+ strbuf_release(&server_data->buf_path);
+
+ free(server_data->fifo_fds);
+ free(server_data);
+}
diff --git a/compat/simple-ipc/ipc-win32.c b/compat/simple-ipc/ipc-win32.c
new file mode 100644
index 0000000000..8f89c02037
--- /dev/null
+++ b/compat/simple-ipc/ipc-win32.c
@@ -0,0 +1,751 @@
+#include "cache.h"
+#include "simple-ipc.h"
+#include "strbuf.h"
+#include "pkt-line.h"
+#include "thread-utils.h"
+
+#ifndef GIT_WINDOWS_NATIVE
+#error This file can only be compiled on Windows
+#endif
+
+static int initialize_pipe_name(const char *path, wchar_t *wpath, size_t alloc)
+{
+ int off = 0;
+ struct strbuf realpath = STRBUF_INIT;
+
+ if (!strbuf_realpath(&realpath, path, 0))
+ return -1;
+
+ off = swprintf(wpath, alloc, L"\\\\.\\pipe\\");
+ if (xutftowcs(wpath + off, realpath.buf, alloc - off) < 0)
+ return -1;
+
+ /* Handle drive prefix */
+ if (wpath[off] && wpath[off + 1] == L':') {
+ wpath[off + 1] = L'_';
+ off += 2;
+ }
+
+ for (; wpath[off]; off++)
+ if (wpath[off] == L'/')
+ wpath[off] = L'\\';
+
+ strbuf_release(&realpath);
+ return 0;
+}
+
+static enum ipc_active_state get_active_state(wchar_t *pipe_path)
+{
+ if (WaitNamedPipeW(pipe_path, NMPWAIT_USE_DEFAULT_WAIT))
+ return IPC_STATE__LISTENING;
+
+ if (GetLastError() == ERROR_SEM_TIMEOUT)
+ return IPC_STATE__NOT_LISTENING;
+
+ if (GetLastError() == ERROR_FILE_NOT_FOUND)
+ return IPC_STATE__PATH_NOT_FOUND;
+
+ return IPC_STATE__OTHER_ERROR;
+}
+
+enum ipc_active_state ipc_get_active_state(const char *path)
+{
+ wchar_t pipe_path[MAX_PATH];
+
+ if (initialize_pipe_name(path, pipe_path, ARRAY_SIZE(pipe_path)) < 0)
+ return IPC_STATE__INVALID_PATH;
+
+ return get_active_state(pipe_path);
+}
+
+#define WAIT_STEP_MS (50)
+
+static enum ipc_active_state connect_to_server(
+ const wchar_t *wpath,
+ DWORD timeout_ms,
+ const struct ipc_client_connect_options *options,
+ int *pfd)
+{
+ DWORD t_start_ms, t_waited_ms;
+ DWORD step_ms;
+ HANDLE hPipe = INVALID_HANDLE_VALUE;
+ DWORD mode = PIPE_READMODE_BYTE;
+ DWORD gle;
+
+ *pfd = -1;
+
+ for (;;) {
+ hPipe = CreateFileW(wpath, GENERIC_READ | GENERIC_WRITE,
+ 0, NULL, OPEN_EXISTING, 0, NULL);
+ if (hPipe != INVALID_HANDLE_VALUE)
+ break;
+
+ gle = GetLastError();
+
+ switch (gle) {
+ case ERROR_FILE_NOT_FOUND:
+ if (!options->wait_if_not_found)
+ return IPC_STATE__PATH_NOT_FOUND;
+ if (!timeout_ms)
+ return IPC_STATE__PATH_NOT_FOUND;
+
+ step_ms = (timeout_ms < WAIT_STEP_MS) ?
+ timeout_ms : WAIT_STEP_MS;
+ sleep_millisec(step_ms);
+
+ timeout_ms -= step_ms;
+ break; /* try again */
+
+ case ERROR_PIPE_BUSY:
+ if (!options->wait_if_busy)
+ return IPC_STATE__NOT_LISTENING;
+ if (!timeout_ms)
+ return IPC_STATE__NOT_LISTENING;
+
+ t_start_ms = (DWORD)(getnanotime() / 1000000);
+
+ if (!WaitNamedPipeW(wpath, timeout_ms)) {
+ if (GetLastError() == ERROR_SEM_TIMEOUT)
+ return IPC_STATE__NOT_LISTENING;
+
+ return IPC_STATE__OTHER_ERROR;
+ }
+
+ /*
+ * A pipe server instance became available.
+ * Race other client processes to connect to
+ * it.
+ *
+ * But first decrement our overall timeout so
+ * that we don't starve if we keep losing the
+ * race. But also guard against special
+ * NPMWAIT_ values (0 and -1).
+ */
+ t_waited_ms = (DWORD)(getnanotime() / 1000000) - t_start_ms;
+ if (t_waited_ms < timeout_ms)
+ timeout_ms -= t_waited_ms;
+ else
+ timeout_ms = 1;
+ break; /* try again */
+
+ default:
+ return IPC_STATE__OTHER_ERROR;
+ }
+ }
+
+ if (!SetNamedPipeHandleState(hPipe, &mode, NULL, NULL)) {
+ CloseHandle(hPipe);
+ return IPC_STATE__OTHER_ERROR;
+ }
+
+ *pfd = _open_osfhandle((intptr_t)hPipe, O_RDWR|O_BINARY);
+ if (*pfd < 0) {
+ CloseHandle(hPipe);
+ return IPC_STATE__OTHER_ERROR;
+ }
+
+ /* fd now owns hPipe */
+
+ return IPC_STATE__LISTENING;
+}
+
+/*
+ * The default connection timeout for Windows clients.
+ *
+ * This is not currently part of the ipc_ API (nor the config settings)
+ * because of differences between Windows and other platforms.
+ *
+ * This value was chosen at random.
+ */
+#define WINDOWS_CONNECTION_TIMEOUT_MS (30000)
+
+enum ipc_active_state ipc_client_try_connect(
+ const char *path,
+ const struct ipc_client_connect_options *options,
+ struct ipc_client_connection **p_connection)
+{
+ wchar_t wpath[MAX_PATH];
+ enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
+ int fd = -1;
+
+ *p_connection = NULL;
+
+ trace2_region_enter("ipc-client", "try-connect", NULL);
+ trace2_data_string("ipc-client", NULL, "try-connect/path", path);
+
+ if (initialize_pipe_name(path, wpath, ARRAY_SIZE(wpath)) < 0)
+ state = IPC_STATE__INVALID_PATH;
+ else
+ state = connect_to_server(wpath, WINDOWS_CONNECTION_TIMEOUT_MS,
+ options, &fd);
+
+ trace2_data_intmax("ipc-client", NULL, "try-connect/state",
+ (intmax_t)state);
+ trace2_region_leave("ipc-client", "try-connect", NULL);
+
+ if (state == IPC_STATE__LISTENING) {
+ (*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection));
+ (*p_connection)->fd = fd;
+ }
+
+ return state;
+}
+
+void ipc_client_close_connection(struct ipc_client_connection *connection)
+{
+ if (!connection)
+ return;
+
+ if (connection->fd != -1)
+ close(connection->fd);
+
+ free(connection);
+}
+
+int ipc_client_send_command_to_connection(
+ struct ipc_client_connection *connection,
+ const char *message, struct strbuf *answer)
+{
+ int ret = 0;
+
+ strbuf_setlen(answer, 0);
+
+ trace2_region_enter("ipc-client", "send-command", NULL);
+
+ if (write_packetized_from_buf_no_flush(message, strlen(message),
+ connection->fd) < 0 ||
+ packet_flush_gently(connection->fd) < 0) {
+ ret = error(_("could not send IPC command"));
+ goto done;
+ }
+
+ FlushFileBuffers((HANDLE)_get_osfhandle(connection->fd));
+
+ if (read_packetized_to_strbuf(
+ connection->fd, answer,
+ PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) {
+ ret = error(_("could not read IPC response"));
+ goto done;
+ }
+
+done:
+ trace2_region_leave("ipc-client", "send-command", NULL);
+ return ret;
+}
+
+int ipc_client_send_command(const char *path,
+ const struct ipc_client_connect_options *options,
+ const char *message, struct strbuf *response)
+{
+ int ret = -1;
+ enum ipc_active_state state;
+ struct ipc_client_connection *connection = NULL;
+
+ state = ipc_client_try_connect(path, options, &connection);
+
+ if (state != IPC_STATE__LISTENING)
+ return ret;
+
+ ret = ipc_client_send_command_to_connection(connection, message, response);
+
+ ipc_client_close_connection(connection);
+
+ return ret;
+}
+
+/*
+ * Duplicate the given pipe handle and wrap it in a file descriptor so
+ * that we can use pkt-line on it.
+ */
+static int dup_fd_from_pipe(const HANDLE pipe)
+{
+ HANDLE process = GetCurrentProcess();
+ HANDLE handle;
+ int fd;
+
+ if (!DuplicateHandle(process, pipe, process, &handle, 0, FALSE,
+ DUPLICATE_SAME_ACCESS)) {
+ errno = err_win_to_posix(GetLastError());
+ return -1;
+ }
+
+ fd = _open_osfhandle((intptr_t)handle, O_RDWR|O_BINARY);
+ if (fd < 0) {
+ errno = err_win_to_posix(GetLastError());
+ CloseHandle(handle);
+ return -1;
+ }
+
+ /*
+ * `handle` is now owned by `fd` and will be automatically closed
+ * when the descriptor is closed.
+ */
+
+ return fd;
+}
+
+/*
+ * Magic numbers used to annotate callback instance data.
+ * These are used to help guard against accidentally passing the
+ * wrong instance data across multiple levels of callbacks (which
+ * is easy to do if there are `void*` arguments).
+ */
+enum magic {
+ MAGIC_SERVER_REPLY_DATA,
+ MAGIC_SERVER_THREAD_DATA,
+ MAGIC_SERVER_DATA,
+};
+
+struct ipc_server_reply_data {
+ enum magic magic;
+ int fd;
+ struct ipc_server_thread_data *server_thread_data;
+};
+
+struct ipc_server_thread_data {
+ enum magic magic;
+ struct ipc_server_thread_data *next_thread;
+ struct ipc_server_data *server_data;
+ pthread_t pthread_id;
+ HANDLE hPipe;
+};
+
+/*
+ * On Windows, the conceptual "ipc-server" is implemented as a pool of
+ * n idential/peer "server-thread" threads. That is, there is no
+ * hierarchy of threads; and therefore no controller thread managing
+ * the pool. Each thread has an independent handle to the named pipe,
+ * receives incoming connections, processes the client, and re-uses
+ * the pipe for the next client connection.
+ *
+ * Therefore, the "ipc-server" only needs to maintain a list of the
+ * spawned threads for eventual "join" purposes.
+ *
+ * A single "stop-event" is visible to all of the server threads to
+ * tell them to shutdown (when idle).
+ */
+struct ipc_server_data {
+ enum magic magic;
+ ipc_server_application_cb *application_cb;
+ void *application_data;
+ struct strbuf buf_path;
+ wchar_t wpath[MAX_PATH];
+
+ HANDLE hEventStopRequested;
+ struct ipc_server_thread_data *thread_list;
+ int is_stopped;
+};
+
+enum connect_result {
+ CR_CONNECTED = 0,
+ CR_CONNECT_PENDING,
+ CR_CONNECT_ERROR,
+ CR_WAIT_ERROR,
+ CR_SHUTDOWN,
+};
+
+static enum connect_result queue_overlapped_connect(
+ struct ipc_server_thread_data *server_thread_data,
+ OVERLAPPED *lpo)
+{
+ if (ConnectNamedPipe(server_thread_data->hPipe, lpo))
+ goto failed;
+
+ switch (GetLastError()) {
+ case ERROR_IO_PENDING:
+ return CR_CONNECT_PENDING;
+
+ case ERROR_PIPE_CONNECTED:
+ SetEvent(lpo->hEvent);
+ return CR_CONNECTED;
+
+ default:
+ break;
+ }
+
+failed:
+ error(_("ConnectNamedPipe failed for '%s' (%lu)"),
+ server_thread_data->server_data->buf_path.buf,
+ GetLastError());
+ return CR_CONNECT_ERROR;
+}
+
+/*
+ * Use Windows Overlapped IO to wait for a connection or for our event
+ * to be signalled.
+ */
+static enum connect_result wait_for_connection(
+ struct ipc_server_thread_data *server_thread_data,
+ OVERLAPPED *lpo)
+{
+ enum connect_result r;
+ HANDLE waitHandles[2];
+ DWORD dwWaitResult;
+
+ r = queue_overlapped_connect(server_thread_data, lpo);
+ if (r != CR_CONNECT_PENDING)
+ return r;
+
+ waitHandles[0] = server_thread_data->server_data->hEventStopRequested;
+ waitHandles[1] = lpo->hEvent;
+
+ dwWaitResult = WaitForMultipleObjects(2, waitHandles, FALSE, INFINITE);
+ switch (dwWaitResult) {
+ case WAIT_OBJECT_0 + 0:
+ return CR_SHUTDOWN;
+
+ case WAIT_OBJECT_0 + 1:
+ ResetEvent(lpo->hEvent);
+ return CR_CONNECTED;
+
+ default:
+ return CR_WAIT_ERROR;
+ }
+}
+
+/*
+ * Forward declare our reply callback function so that any compiler
+ * errors are reported when we actually define the function (in addition
+ * to any errors reported when we try to pass this callback function as
+ * a parameter in a function call). The former are easier to understand.
+ */
+static ipc_server_reply_cb do_io_reply_callback;
+
+/*
+ * Relay application's response message to the client process.
+ * (We do not flush at this point because we allow the caller
+ * to chunk data to the client thru us.)
+ */
+static int do_io_reply_callback(struct ipc_server_reply_data *reply_data,
+ const char *response, size_t response_len)
+{
+ if (reply_data->magic != MAGIC_SERVER_REPLY_DATA)
+ BUG("reply_cb called with wrong instance data");
+
+ return write_packetized_from_buf_no_flush(response, response_len,
+ reply_data->fd);
+}
+
+/*
+ * Receive the request/command from the client and pass it to the
+ * registered request-callback. The request-callback will compose
+ * a response and call our reply-callback to send it to the client.
+ *
+ * Simple-IPC only contains one round trip, so we flush and close
+ * here after the response.
+ */
+static int do_io(struct ipc_server_thread_data *server_thread_data)
+{
+ struct strbuf buf = STRBUF_INIT;
+ struct ipc_server_reply_data reply_data;
+ int ret = 0;
+
+ reply_data.magic = MAGIC_SERVER_REPLY_DATA;
+ reply_data.server_thread_data = server_thread_data;
+
+ reply_data.fd = dup_fd_from_pipe(server_thread_data->hPipe);
+ if (reply_data.fd < 0)
+ return error(_("could not create fd from pipe for '%s'"),
+ server_thread_data->server_data->buf_path.buf);
+
+ ret = read_packetized_to_strbuf(
+ reply_data.fd, &buf,
+ PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR);
+ if (ret >= 0) {
+ ret = server_thread_data->server_data->application_cb(
+ server_thread_data->server_data->application_data,
+ buf.buf, do_io_reply_callback, &reply_data);
+
+ packet_flush_gently(reply_data.fd);
+
+ FlushFileBuffers((HANDLE)_get_osfhandle((reply_data.fd)));
+ }
+ else {
+ /*
+ * The client probably disconnected/shutdown before it
+ * could send a well-formed message. Ignore it.
+ */
+ }
+
+ strbuf_release(&buf);
+ close(reply_data.fd);
+
+ return ret;
+}
+
+/*
+ * Handle IPC request and response with this connected client. And reset
+ * the pipe to prepare for the next client.
+ */
+static int use_connection(struct ipc_server_thread_data *server_thread_data)
+{
+ int ret;
+
+ ret = do_io(server_thread_data);
+
+ FlushFileBuffers(server_thread_data->hPipe);
+ DisconnectNamedPipe(server_thread_data->hPipe);
+
+ return ret;
+}
+
+/*
+ * Thread proc for an IPC server worker thread. It handles a series of
+ * connections from clients. It cleans and reuses the hPipe between each
+ * client.
+ */
+static void *server_thread_proc(void *_server_thread_data)
+{
+ struct ipc_server_thread_data *server_thread_data = _server_thread_data;
+ HANDLE hEventConnected = INVALID_HANDLE_VALUE;
+ OVERLAPPED oConnect;
+ enum connect_result cr;
+ int ret;
+
+ assert(server_thread_data->hPipe != INVALID_HANDLE_VALUE);
+
+ trace2_thread_start("ipc-server");
+ trace2_data_string("ipc-server", NULL, "pipe",
+ server_thread_data->server_data->buf_path.buf);
+
+ hEventConnected = CreateEventW(NULL, TRUE, FALSE, NULL);
+
+ memset(&oConnect, 0, sizeof(oConnect));
+ oConnect.hEvent = hEventConnected;
+
+ for (;;) {
+ cr = wait_for_connection(server_thread_data, &oConnect);
+
+ switch (cr) {
+ case CR_SHUTDOWN:
+ goto finished;
+
+ case CR_CONNECTED:
+ ret = use_connection(server_thread_data);
+ if (ret == SIMPLE_IPC_QUIT) {
+ ipc_server_stop_async(
+ server_thread_data->server_data);
+ goto finished;
+ }
+ if (ret > 0) {
+ /*
+ * Ignore (transient) IO errors with this
+ * client and reset for the next client.
+ */
+ }
+ break;
+
+ case CR_CONNECT_PENDING:
+ /* By construction, this should not happen. */
+ BUG("ipc-server[%s]: unexpeced CR_CONNECT_PENDING",
+ server_thread_data->server_data->buf_path.buf);
+
+ case CR_CONNECT_ERROR:
+ case CR_WAIT_ERROR:
+ /*
+ * Ignore these theoretical errors.
+ */
+ DisconnectNamedPipe(server_thread_data->hPipe);
+ break;
+
+ default:
+ BUG("unandled case after wait_for_connection");
+ }
+ }
+
+finished:
+ CloseHandle(server_thread_data->hPipe);
+ CloseHandle(hEventConnected);
+
+ trace2_thread_exit();
+ return NULL;
+}
+
+static HANDLE create_new_pipe(wchar_t *wpath, int is_first)
+{
+ HANDLE hPipe;
+ DWORD dwOpenMode, dwPipeMode;
+ LPSECURITY_ATTRIBUTES lpsa = NULL;
+
+ dwOpenMode = PIPE_ACCESS_INBOUND | PIPE_ACCESS_OUTBOUND |
+ FILE_FLAG_OVERLAPPED;
+
+ dwPipeMode = PIPE_TYPE_MESSAGE | PIPE_READMODE_BYTE | PIPE_WAIT |
+ PIPE_REJECT_REMOTE_CLIENTS;
+
+ if (is_first) {
+ dwOpenMode |= FILE_FLAG_FIRST_PIPE_INSTANCE;
+
+ /*
+ * On Windows, the first server pipe instance gets to
+ * set the ACL / Security Attributes on the named
+ * pipe; subsequent instances inherit and cannot
+ * change them.
+ *
+ * TODO Should we allow the application layer to
+ * specify security attributes, such as `LocalService`
+ * or `LocalSystem`, when we create the named pipe?
+ * This question is probably not important when the
+ * daemon is started by a foreground user process and
+ * only needs to talk to the current user, but may be
+ * if the daemon is run via the Control Panel as a
+ * System Service.
+ */
+ }
+
+ hPipe = CreateNamedPipeW(wpath, dwOpenMode, dwPipeMode,
+ PIPE_UNLIMITED_INSTANCES, 1024, 1024, 0, lpsa);
+
+ return hPipe;
+}
+
+int ipc_server_run_async(struct ipc_server_data **returned_server_data,
+ const char *path, const struct ipc_server_opts *opts,
+ ipc_server_application_cb *application_cb,
+ void *application_data)
+{
+ struct ipc_server_data *server_data;
+ wchar_t wpath[MAX_PATH];
+ HANDLE hPipeFirst = INVALID_HANDLE_VALUE;
+ int k;
+ int ret = 0;
+ int nr_threads = opts->nr_threads;
+
+ *returned_server_data = NULL;
+
+ ret = initialize_pipe_name(path, wpath, ARRAY_SIZE(wpath));
+ if (ret < 0) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ hPipeFirst = create_new_pipe(wpath, 1);
+ if (hPipeFirst == INVALID_HANDLE_VALUE) {
+ errno = EADDRINUSE;
+ return -2;
+ }
+
+ server_data = xcalloc(1, sizeof(*server_data));
+ server_data->magic = MAGIC_SERVER_DATA;
+ server_data->application_cb = application_cb;
+ server_data->application_data = application_data;
+ server_data->hEventStopRequested = CreateEvent(NULL, TRUE, FALSE, NULL);
+ strbuf_init(&server_data->buf_path, 0);
+ strbuf_addstr(&server_data->buf_path, path);
+ wcscpy(server_data->wpath, wpath);
+
+ if (nr_threads < 1)
+ nr_threads = 1;
+
+ for (k = 0; k < nr_threads; k++) {
+ struct ipc_server_thread_data *std;
+
+ std = xcalloc(1, sizeof(*std));
+ std->magic = MAGIC_SERVER_THREAD_DATA;
+ std->server_data = server_data;
+ std->hPipe = INVALID_HANDLE_VALUE;
+
+ std->hPipe = (k == 0)
+ ? hPipeFirst
+ : create_new_pipe(server_data->wpath, 0);
+
+ if (std->hPipe == INVALID_HANDLE_VALUE) {
+ /*
+ * If we've reached a pipe instance limit for
+ * this path, just use fewer threads.
+ */
+ free(std);
+ break;
+ }
+
+ if (pthread_create(&std->pthread_id, NULL,
+ server_thread_proc, std)) {
+ /*
+ * Likewise, if we're out of threads, just use
+ * fewer threads than requested.
+ *
+ * However, we just give up if we can't even get
+ * one thread. This should not happen.
+ */
+ if (k == 0)
+ die(_("could not start thread[0] for '%s'"),
+ path);
+
+ CloseHandle(std->hPipe);
+ free(std);
+ break;
+ }
+
+ std->next_thread = server_data->thread_list;
+ server_data->thread_list = std;
+ }
+
+ *returned_server_data = server_data;
+ return 0;
+}
+
+int ipc_server_stop_async(struct ipc_server_data *server_data)
+{
+ if (!server_data)
+ return 0;
+
+ /*
+ * Gently tell all of the ipc_server threads to shutdown.
+ * This will be seen the next time they are idle (and waiting
+ * for a connection).
+ *
+ * We DO NOT attempt to force them to drop an active connection.
+ */
+ SetEvent(server_data->hEventStopRequested);
+ return 0;
+}
+
+int ipc_server_await(struct ipc_server_data *server_data)
+{
+ DWORD dwWaitResult;
+
+ if (!server_data)
+ return 0;
+
+ dwWaitResult = WaitForSingleObject(server_data->hEventStopRequested, INFINITE);
+ if (dwWaitResult != WAIT_OBJECT_0)
+ return error(_("wait for hEvent failed for '%s'"),
+ server_data->buf_path.buf);
+
+ while (server_data->thread_list) {
+ struct ipc_server_thread_data *std = server_data->thread_list;
+
+ pthread_join(std->pthread_id, NULL);
+
+ server_data->thread_list = std->next_thread;
+ free(std);
+ }
+
+ server_data->is_stopped = 1;
+
+ return 0;
+}
+
+void ipc_server_free(struct ipc_server_data *server_data)
+{
+ if (!server_data)
+ return;
+
+ if (!server_data->is_stopped)
+ BUG("cannot free ipc-server while running for '%s'",
+ server_data->buf_path.buf);
+
+ strbuf_release(&server_data->buf_path);
+
+ if (server_data->hEventStopRequested != INVALID_HANDLE_VALUE)
+ CloseHandle(server_data->hEventStopRequested);
+
+ while (server_data->thread_list) {
+ struct ipc_server_thread_data *std = server_data->thread_list;
+
+ server_data->thread_list = std->next_thread;
+ free(std);
+ }
+
+ free(server_data);
+}
diff --git a/config.mak.uname b/config.mak.uname
index d204c20a64..cb443b4e02 100644
--- a/config.mak.uname
+++ b/config.mak.uname
@@ -424,6 +424,7 @@ ifeq ($(uname_S),Windows)
RUNTIME_PREFIX = YesPlease
HAVE_WPGMPTR = YesWeDo
NO_ST_BLOCKS_IN_STRUCT_STAT = YesPlease
+ USE_WIN32_IPC = YesPlease
USE_WIN32_MMAP = YesPlease
MMAP_PREVENTS_DELETE = UnfortunatelyYes
# USE_NED_ALLOCATOR = YesPlease
@@ -600,6 +601,7 @@ ifneq (,$(findstring MINGW,$(uname_S)))
RUNTIME_PREFIX = YesPlease
HAVE_WPGMPTR = YesWeDo
NO_ST_BLOCKS_IN_STRUCT_STAT = YesPlease
+ USE_WIN32_IPC = YesPlease
USE_WIN32_MMAP = YesPlease
MMAP_PREVENTS_DELETE = UnfortunatelyYes
USE_NED_ALLOCATOR = YesPlease
diff --git a/contrib/buildsystems/CMakeLists.txt b/contrib/buildsystems/CMakeLists.txt
index ac3dbc079a..9897fcc8ea 100644
--- a/contrib/buildsystems/CMakeLists.txt
+++ b/contrib/buildsystems/CMakeLists.txt
@@ -243,7 +243,13 @@ if(CMAKE_SYSTEM_NAME STREQUAL "Windows")
elseif(CMAKE_SYSTEM_NAME STREQUAL "Linux")
add_compile_definitions(PROCFS_EXECUTABLE_PATH="/proc/self/exe" HAVE_DEV_TTY )
- list(APPEND compat_SOURCES unix-socket.c)
+ list(APPEND compat_SOURCES unix-socket.c unix-stream-server.c)
+endif()
+
+if(CMAKE_SYSTEM_NAME STREQUAL "Windows")
+ list(APPEND compat_SOURCES compat/simple-ipc/ipc-shared.c compat/simple-ipc/ipc-win32.c)
+else()
+ list(APPEND compat_SOURCES compat/simple-ipc/ipc-shared.c compat/simple-ipc/ipc-unix-socket.c)
endif()
set(EXE_EXTENSION ${CMAKE_EXECUTABLE_SUFFIX})
diff --git a/convert.c b/convert.c
index 3298e4acff..45ac75f80c 100644
--- a/convert.c
+++ b/convert.c
@@ -873,9 +873,13 @@ static int apply_multi_file_filter(const char *path, const char *src, size_t len
goto done;
if (fd >= 0)
- err = write_packetized_from_fd(fd, process->in);
+ err = write_packetized_from_fd_no_flush(fd, process->in);
else
- err = write_packetized_from_buf(src, len, process->in);
+ err = write_packetized_from_buf_no_flush(src, len, process->in);
+ if (err)
+ goto done;
+
+ err = packet_flush_gently(process->in);
if (err)
goto done;
@@ -892,7 +896,8 @@ static int apply_multi_file_filter(const char *path, const char *src, size_t len
if (err)
goto done;
- err = read_packetized_to_strbuf(process->out, &nbuf) < 0;
+ err = read_packetized_to_strbuf(process->out, &nbuf,
+ PACKET_READ_GENTLE_ON_EOF) < 0;
if (err)
goto done;
diff --git a/pkt-line.c b/pkt-line.c
index d633005ef7..0194137528 100644
--- a/pkt-line.c
+++ b/pkt-line.c
@@ -196,17 +196,26 @@ int packet_write_fmt_gently(int fd, const char *fmt, ...)
static int packet_write_gently(const int fd_out, const char *buf, size_t size)
{
- static char packet_write_buffer[LARGE_PACKET_MAX];
+ char header[4];
size_t packet_size;
- if (size > sizeof(packet_write_buffer) - 4)
+ if (size > LARGE_PACKET_DATA_MAX)
return error(_("packet write failed - data exceeds max packet size"));
packet_trace(buf, size, 1);
packet_size = size + 4;
- set_packet_header(packet_write_buffer, packet_size);
- memcpy(packet_write_buffer + 4, buf, size);
- if (write_in_full(fd_out, packet_write_buffer, packet_size) < 0)
+
+ set_packet_header(header, packet_size);
+
+ /*
+ * Write the header and the buffer in 2 parts so that we do
+ * not need to allocate a buffer or rely on a static buffer.
+ * This also avoids putting a large buffer on the stack which
+ * might have multi-threading issues.
+ */
+
+ if (write_in_full(fd_out, header, 4) < 0 ||
+ write_in_full(fd_out, buf, size) < 0)
return error(_("packet write failed"));
return 0;
}
@@ -242,26 +251,27 @@ void packet_buf_write_len(struct strbuf *buf, const char *data, size_t len)
packet_trace(data, len, 1);
}
-int write_packetized_from_fd(int fd_in, int fd_out)
+int write_packetized_from_fd_no_flush(int fd_in, int fd_out)
{
- static char buf[LARGE_PACKET_DATA_MAX];
+ char *buf = xmalloc(LARGE_PACKET_DATA_MAX);
int err = 0;
ssize_t bytes_to_write;
while (!err) {
- bytes_to_write = xread(fd_in, buf, sizeof(buf));
- if (bytes_to_write < 0)
+ bytes_to_write = xread(fd_in, buf, LARGE_PACKET_DATA_MAX);
+ if (bytes_to_write < 0) {
+ free(buf);
return COPY_READ_ERROR;
+ }
if (bytes_to_write == 0)
break;
err = packet_write_gently(fd_out, buf, bytes_to_write);
}
- if (!err)
- err = packet_flush_gently(fd_out);
+ free(buf);
return err;
}
-int write_packetized_from_buf(const char *src_in, size_t len, int fd_out)
+int write_packetized_from_buf_no_flush(const char *src_in, size_t len, int fd_out)
{
int err = 0;
size_t bytes_written = 0;
@@ -277,8 +287,6 @@ int write_packetized_from_buf(const char *src_in, size_t len, int fd_out)
err = packet_write_gently(fd_out, src_in + bytes_written, bytes_to_write);
bytes_written += bytes_to_write;
}
- if (!err)
- err = packet_flush_gently(fd_out);
return err;
}
@@ -298,8 +306,11 @@ static int get_packet_data(int fd, char **src_buf, size_t *src_size,
*src_size -= ret;
} else {
ret = read_in_full(fd, dst, size);
- if (ret < 0)
+ if (ret < 0) {
+ if (options & PACKET_READ_GENTLE_ON_READ_ERROR)
+ return error_errno(_("read error"));
die_errno(_("read error"));
+ }
}
/* And complain if we didn't get enough bytes to satisfy the read. */
@@ -307,6 +318,8 @@ static int get_packet_data(int fd, char **src_buf, size_t *src_size,
if (options & PACKET_READ_GENTLE_ON_EOF)
return -1;
+ if (options & PACKET_READ_GENTLE_ON_READ_ERROR)
+ return error(_("the remote end hung up unexpectedly"));
die(_("the remote end hung up unexpectedly"));
}
@@ -335,6 +348,9 @@ enum packet_read_status packet_read_with_status(int fd, char **src_buffer,
len = packet_length(linelen);
if (len < 0) {
+ if (options & PACKET_READ_GENTLE_ON_READ_ERROR)
+ return error(_("protocol error: bad line length "
+ "character: %.4s"), linelen);
die(_("protocol error: bad line length character: %.4s"), linelen);
} else if (!len) {
packet_trace("0000", 4, 0);
@@ -349,12 +365,19 @@ enum packet_read_status packet_read_with_status(int fd, char **src_buffer,
*pktlen = 0;
return PACKET_READ_RESPONSE_END;
} else if (len < 4) {
+ if (options & PACKET_READ_GENTLE_ON_READ_ERROR)
+ return error(_("protocol error: bad line length %d"),
+ len);
die(_("protocol error: bad line length %d"), len);
}
len -= 4;
- if ((unsigned)len >= size)
+ if ((unsigned)len >= size) {
+ if (options & PACKET_READ_GENTLE_ON_READ_ERROR)
+ return error(_("protocol error: bad line length %d"),
+ len);
die(_("protocol error: bad line length %d"), len);
+ }
if (get_packet_data(fd, src_buffer, src_len, buffer, len, options) < 0) {
*pktlen = -1;
@@ -421,7 +444,7 @@ char *packet_read_line_buf(char **src, size_t *src_len, int *dst_len)
return packet_read_line_generic(-1, src, src_len, dst_len);
}
-ssize_t read_packetized_to_strbuf(int fd_in, struct strbuf *sb_out)
+ssize_t read_packetized_to_strbuf(int fd_in, struct strbuf *sb_out, int options)
{
int packet_len;
@@ -437,7 +460,7 @@ ssize_t read_packetized_to_strbuf(int fd_in, struct strbuf *sb_out)
* that there is already room for the extra byte.
*/
sb_out->buf + sb_out->len, LARGE_PACKET_DATA_MAX+1,
- PACKET_READ_GENTLE_ON_EOF);
+ options);
if (packet_len <= 0)
break;
sb_out->len += packet_len;
diff --git a/pkt-line.h b/pkt-line.h
index 8c90daa59e..5af5f45687 100644
--- a/pkt-line.h
+++ b/pkt-line.h
@@ -32,8 +32,8 @@ void packet_buf_write(struct strbuf *buf, const char *fmt, ...) __attribute__((f
void packet_buf_write_len(struct strbuf *buf, const char *data, size_t len);
int packet_flush_gently(int fd);
int packet_write_fmt_gently(int fd, const char *fmt, ...) __attribute__((format (printf, 2, 3)));
-int write_packetized_from_fd(int fd_in, int fd_out);
-int write_packetized_from_buf(const char *src_in, size_t len, int fd_out);
+int write_packetized_from_fd_no_flush(int fd_in, int fd_out);
+int write_packetized_from_buf_no_flush(const char *src_in, size_t len, int fd_out);
/*
* Read a packetized line into the buffer, which must be at least size bytes
@@ -68,10 +68,15 @@ int write_packetized_from_buf(const char *src_in, size_t len, int fd_out);
*
* If options contains PACKET_READ_DIE_ON_ERR_PACKET, it dies when it sees an
* ERR packet.
+ *
+ * If options contains PACKET_READ_GENTLE_ON_READ_ERROR, we will not die
+ * on read errors, but instead return -1. However, we may still die on an
+ * ERR packet (if requested).
*/
-#define PACKET_READ_GENTLE_ON_EOF (1u<<0)
-#define PACKET_READ_CHOMP_NEWLINE (1u<<1)
-#define PACKET_READ_DIE_ON_ERR_PACKET (1u<<2)
+#define PACKET_READ_GENTLE_ON_EOF (1u<<0)
+#define PACKET_READ_CHOMP_NEWLINE (1u<<1)
+#define PACKET_READ_DIE_ON_ERR_PACKET (1u<<2)
+#define PACKET_READ_GENTLE_ON_READ_ERROR (1u<<3)
int packet_read(int fd, char **src_buffer, size_t *src_len, char
*buffer, unsigned size, int options);
@@ -131,7 +136,7 @@ char *packet_read_line_buf(char **src_buf, size_t *src_len, int *size);
/*
* Reads a stream of variable sized packets until a flush packet is detected.
*/
-ssize_t read_packetized_to_strbuf(int fd_in, struct strbuf *sb_out);
+ssize_t read_packetized_to_strbuf(int fd_in, struct strbuf *sb_out, int options);
/*
* Receive multiplexed output stream over git native protocol.
diff --git a/simple-ipc.h b/simple-ipc.h
new file mode 100644
index 0000000000..dc3606e30b
--- /dev/null
+++ b/simple-ipc.h
@@ -0,0 +1,239 @@
+#ifndef GIT_SIMPLE_IPC_H
+#define GIT_SIMPLE_IPC_H
+
+/*
+ * See Documentation/technical/api-simple-ipc.txt
+ */
+
+#if defined(GIT_WINDOWS_NATIVE) || !defined(NO_UNIX_SOCKETS)
+#define SUPPORTS_SIMPLE_IPC
+#endif
+
+#ifdef SUPPORTS_SIMPLE_IPC
+#include "pkt-line.h"
+
+/*
+ * Simple IPC Client Side API.
+ */
+
+enum ipc_active_state {
+ /*
+ * The pipe/socket exists and the daemon is waiting for connections.
+ */
+ IPC_STATE__LISTENING = 0,
+
+ /*
+ * The pipe/socket exists, but the daemon is not listening.
+ * Perhaps it is very busy.
+ * Perhaps the daemon died without deleting the path.
+ * Perhaps it is shutting down and draining existing clients.
+ * Perhaps it is dead, but other clients are lingering and
+ * still holding a reference to the pathname.
+ */
+ IPC_STATE__NOT_LISTENING,
+
+ /*
+ * The requested pathname is bogus and no amount of retries
+ * will fix that.
+ */
+ IPC_STATE__INVALID_PATH,
+
+ /*
+ * The requested pathname is not found. This usually means
+ * that there is no daemon present.
+ */
+ IPC_STATE__PATH_NOT_FOUND,
+
+ IPC_STATE__OTHER_ERROR,
+};
+
+struct ipc_client_connect_options {
+ /*
+ * Spin under timeout if the server is running but can't
+ * accept our connection yet. This should always be set
+ * unless you just want to poke the server and see if it
+ * is alive.
+ */
+ unsigned int wait_if_busy:1;
+
+ /*
+ * Spin under timeout if the pipe/socket is not yet present
+ * on the file system. This is useful if we just started
+ * the service and need to wait for it to become ready.
+ */
+ unsigned int wait_if_not_found:1;
+
+ /*
+ * Disallow chdir() when creating a Unix domain socket.
+ */
+ unsigned int uds_disallow_chdir:1;
+};
+
+#define IPC_CLIENT_CONNECT_OPTIONS_INIT { \
+ .wait_if_busy = 0, \
+ .wait_if_not_found = 0, \
+ .uds_disallow_chdir = 0, \
+}
+
+/*
+ * Determine if a server is listening on this named pipe or socket using
+ * platform-specific logic. This might just probe the filesystem or it
+ * might make a trivial connection to the server using this pathname.
+ */
+enum ipc_active_state ipc_get_active_state(const char *path);
+
+struct ipc_client_connection {
+ int fd;
+};
+
+/*
+ * Try to connect to the daemon on the named pipe or socket.
+ *
+ * Returns IPC_STATE__LISTENING and a connection handle.
+ *
+ * Otherwise, returns info to help decide whether to retry or to
+ * spawn/respawn the server.
+ */
+enum ipc_active_state ipc_client_try_connect(
+ const char *path,
+ const struct ipc_client_connect_options *options,
+ struct ipc_client_connection **p_connection);
+
+void ipc_client_close_connection(struct ipc_client_connection *connection);
+
+/*
+ * Used by the client to synchronously send and receive a message with
+ * the server on the provided client connection.
+ *
+ * Returns 0 when successful.
+ *
+ * Calls error() and returns non-zero otherwise.
+ */
+int ipc_client_send_command_to_connection(
+ struct ipc_client_connection *connection,
+ const char *message, struct strbuf *answer);
+
+/*
+ * Used by the client to synchronously connect and send and receive a
+ * message to the server listening at the given path.
+ *
+ * Returns 0 when successful.
+ *
+ * Calls error() and returns non-zero otherwise.
+ */
+int ipc_client_send_command(const char *path,
+ const struct ipc_client_connect_options *options,
+ const char *message, struct strbuf *answer);
+
+/*
+ * Simple IPC Server Side API.
+ */
+
+struct ipc_server_reply_data;
+
+typedef int (ipc_server_reply_cb)(struct ipc_server_reply_data *,
+ const char *response,
+ size_t response_len);
+
+/*
+ * Prototype for an application-supplied callback to process incoming
+ * client IPC messages and compose a reply. The `application_cb` should
+ * use the provided `reply_cb` and `reply_data` to send an IPC response
+ * back to the client. The `reply_cb` callback can be called multiple
+ * times for chunking purposes. A reply message is optional and may be
+ * omitted if not necessary for the application.
+ *
+ * The return value from the application callback is ignored.
+ * The value `SIMPLE_IPC_QUIT` can be used to shutdown the server.
+ */
+typedef int (ipc_server_application_cb)(void *application_data,
+ const char *request,
+ ipc_server_reply_cb *reply_cb,
+ struct ipc_server_reply_data *reply_data);
+
+#define SIMPLE_IPC_QUIT -2
+
+/*
+ * Opaque instance data to represent an IPC server instance.
+ */
+struct ipc_server_data;
+
+/*
+ * Control parameters for the IPC server instance.
+ * Use this to hide platform-specific settings.
+ */
+struct ipc_server_opts
+{
+ int nr_threads;
+
+ /*
+ * Disallow chdir() when creating a Unix domain socket.
+ */
+ unsigned int uds_disallow_chdir:1;
+};
+
+/*
+ * Start an IPC server instance in one or more background threads
+ * and return a handle to the pool.
+ *
+ * Returns 0 if the asynchronous server pool was started successfully.
+ * Returns -1 if not.
+ * Returns -2 if we could not startup because another server is using
+ * the socket or named pipe.
+ *
+ * When a client IPC message is received, the `application_cb` will be
+ * called (possibly on a random thread) to handle the message and
+ * optionally compose a reply message.
+ */
+int ipc_server_run_async(struct ipc_server_data **returned_server_data,
+ const char *path, const struct ipc_server_opts *opts,
+ ipc_server_application_cb *application_cb,
+ void *application_data);
+
+/*
+ * Gently signal the IPC server pool to shutdown. No new client
+ * connections will be accepted, but existing connections will be
+ * allowed to complete.
+ */
+int ipc_server_stop_async(struct ipc_server_data *server_data);
+
+/*
+ * Block the calling thread until all threads in the IPC server pool
+ * have completed and been joined.
+ */
+int ipc_server_await(struct ipc_server_data *server_data);
+
+/*
+ * Close and free all resource handles associated with the IPC server
+ * pool.
+ */
+void ipc_server_free(struct ipc_server_data *server_data);
+
+/*
+ * Run an IPC server instance and block the calling thread of the
+ * current process. It does not return until the IPC server has
+ * either shutdown or had an unrecoverable error.
+ *
+ * The IPC server handles incoming IPC messages from client processes
+ * and may use one or more background threads as necessary.
+ *
+ * Returns 0 after the server has completed successfully.
+ * Returns -1 if the server cannot be started.
+ * Returns -2 if we could not startup because another server is using
+ * the socket or named pipe.
+ *
+ * When a client IPC message is received, the `application_cb` will be
+ * called (possibly on a random thread) to handle the message and
+ * optionally compose a reply message.
+ *
+ * Note that `ipc_server_run()` is a synchronous wrapper around the
+ * above asynchronous routines. It effectively hides all of the
+ * server state and thread details from the caller and presents a
+ * simple synchronous interface.
+ */
+int ipc_server_run(const char *path, const struct ipc_server_opts *opts,
+ ipc_server_application_cb *application_cb,
+ void *application_data);
+
+#endif /* SUPPORTS_SIMPLE_IPC */
+#endif /* GIT_SIMPLE_IPC_H */
diff --git a/t/helper/test-simple-ipc.c b/t/helper/test-simple-ipc.c
new file mode 100644
index 0000000000..42040ef81b
--- /dev/null
+++ b/t/helper/test-simple-ipc.c
@@ -0,0 +1,787 @@
+/*
+ * test-simple-ipc.c: verify that the Inter-Process Communication works.
+ */
+
+#include "test-tool.h"
+#include "cache.h"
+#include "strbuf.h"
+#include "simple-ipc.h"
+#include "parse-options.h"
+#include "thread-utils.h"
+#include "strvec.h"
+
+#ifndef SUPPORTS_SIMPLE_IPC
+int cmd__simple_ipc(int argc, const char **argv)
+{
+ die("simple IPC not available on this platform");
+}
+#else
+
+/*
+ * The test daemon defines an "application callback" that supports a
+ * series of commands (see `test_app_cb()`).
+ *
+ * Unknown commands are caught here and we send an error message back
+ * to the client process.
+ */
+static int app__unhandled_command(const char *command,
+ ipc_server_reply_cb *reply_cb,
+ struct ipc_server_reply_data *reply_data)
+{
+ struct strbuf buf = STRBUF_INIT;
+ int ret;
+
+ strbuf_addf(&buf, "unhandled command: %s", command);
+ ret = reply_cb(reply_data, buf.buf, buf.len);
+ strbuf_release(&buf);
+
+ return ret;
+}
+
+/*
+ * Reply with a single very large buffer. This is to ensure that
+ * long response are properly handled -- whether the chunking occurs
+ * in the kernel or in the (probably pkt-line) layer.
+ */
+#define BIG_ROWS (10000)
+static int app__big_command(ipc_server_reply_cb *reply_cb,
+ struct ipc_server_reply_data *reply_data)
+{
+ struct strbuf buf = STRBUF_INIT;
+ int row;
+ int ret;
+
+ for (row = 0; row < BIG_ROWS; row++)
+ strbuf_addf(&buf, "big: %.75d\n", row);
+
+ ret = reply_cb(reply_data, buf.buf, buf.len);
+ strbuf_release(&buf);
+
+ return ret;
+}
+
+/*
+ * Reply with a series of lines. This is to ensure that we can incrementally
+ * compute the response and chunk it to the client.
+ */
+#define CHUNK_ROWS (10000)
+static int app__chunk_command(ipc_server_reply_cb *reply_cb,
+ struct ipc_server_reply_data *reply_data)
+{
+ struct strbuf buf = STRBUF_INIT;
+ int row;
+ int ret;
+
+ for (row = 0; row < CHUNK_ROWS; row++) {
+ strbuf_setlen(&buf, 0);
+ strbuf_addf(&buf, "big: %.75d\n", row);
+ ret = reply_cb(reply_data, buf.buf, buf.len);
+ }
+
+ strbuf_release(&buf);
+
+ return ret;
+}
+
+/*
+ * Slowly reply with a series of lines. This is to model an expensive to
+ * compute chunked response (which might happen if this callback is running
+ * in a thread and is fighting for a lock with other threads).
+ */
+#define SLOW_ROWS (1000)
+#define SLOW_DELAY_MS (10)
+static int app__slow_command(ipc_server_reply_cb *reply_cb,
+ struct ipc_server_reply_data *reply_data)
+{
+ struct strbuf buf = STRBUF_INIT;
+ int row;
+ int ret;
+
+ for (row = 0; row < SLOW_ROWS; row++) {
+ strbuf_setlen(&buf, 0);
+ strbuf_addf(&buf, "big: %.75d\n", row);
+ ret = reply_cb(reply_data, buf.buf, buf.len);
+ sleep_millisec(SLOW_DELAY_MS);
+ }
+
+ strbuf_release(&buf);
+
+ return ret;
+}
+
+/*
+ * The client sent a command followed by a (possibly very) large buffer.
+ */
+static int app__sendbytes_command(const char *received,
+ ipc_server_reply_cb *reply_cb,
+ struct ipc_server_reply_data *reply_data)
+{
+ struct strbuf buf_resp = STRBUF_INIT;
+ const char *p = "?";
+ int len_ballast = 0;
+ int k;
+ int errs = 0;
+ int ret;
+
+ if (skip_prefix(received, "sendbytes ", &p))
+ len_ballast = strlen(p);
+
+ /*
+ * Verify that the ballast is n copies of a single letter.
+ * And that the multi-threaded IO layer didn't cross the streams.
+ */
+ for (k = 1; k < len_ballast; k++)
+ if (p[k] != p[0])
+ errs++;
+
+ if (errs)
+ strbuf_addf(&buf_resp, "errs:%d\n", errs);
+ else
+ strbuf_addf(&buf_resp, "rcvd:%c%08d\n", p[0], len_ballast);
+
+ ret = reply_cb(reply_data, buf_resp.buf, buf_resp.len);
+
+ strbuf_release(&buf_resp);
+
+ return ret;
+}
+
+/*
+ * An arbitrary fixed address to verify that the application instance
+ * data is handled properly.
+ */
+static int my_app_data = 42;
+
+static ipc_server_application_cb test_app_cb;
+
+/*
+ * This is the "application callback" that sits on top of the
+ * "ipc-server". It completely defines the set of commands supported
+ * by this application.
+ */
+static int test_app_cb(void *application_data,
+ const char *command,
+ ipc_server_reply_cb *reply_cb,
+ struct ipc_server_reply_data *reply_data)
+{
+ /*
+ * Verify that we received the application-data that we passed
+ * when we started the ipc-server. (We have several layers of
+ * callbacks calling callbacks and it's easy to get things mixed
+ * up (especially when some are "void*").)
+ */
+ if (application_data != (void*)&my_app_data)
+ BUG("application_cb: application_data pointer wrong");
+
+ if (!strcmp(command, "quit")) {
+ /*
+ * The client sent a "quit" command. This is an async
+ * request for the server to shutdown.
+ *
+ * We DO NOT send the client a response message
+ * (because we have nothing to say and the other
+ * server threads have not yet stopped).
+ *
+ * Tell the ipc-server layer to start shutting down.
+ * This includes: stop listening for new connections
+ * on the socket/pipe and telling all worker threads
+ * to finish/drain their outgoing responses to other
+ * clients.
+ *
+ * This DOES NOT force an immediate sync shutdown.
+ */
+ return SIMPLE_IPC_QUIT;
+ }
+
+ if (!strcmp(command, "ping")) {
+ const char *answer = "pong";
+ return reply_cb(reply_data, answer, strlen(answer));
+ }
+
+ if (!strcmp(command, "big"))
+ return app__big_command(reply_cb, reply_data);
+
+ if (!strcmp(command, "chunk"))
+ return app__chunk_command(reply_cb, reply_data);
+
+ if (!strcmp(command, "slow"))
+ return app__slow_command(reply_cb, reply_data);
+
+ if (starts_with(command, "sendbytes "))
+ return app__sendbytes_command(command, reply_cb, reply_data);
+
+ return app__unhandled_command(command, reply_cb, reply_data);
+}
+
+struct cl_args
+{
+ const char *subcommand;
+ const char *path;
+ const char *token;
+
+ int nr_threads;
+ int max_wait_sec;
+ int bytecount;
+ int batchsize;
+
+ char bytevalue;
+};
+
+static struct cl_args cl_args = {
+ .subcommand = NULL,
+ .path = "ipc-test",
+ .token = NULL,
+
+ .nr_threads = 5,
+ .max_wait_sec = 60,
+ .bytecount = 1024,
+ .batchsize = 10,
+
+ .bytevalue = 'x',
+};
+
+/*
+ * This process will run as a simple-ipc server and listen for IPC commands
+ * from client processes.
+ */
+static int daemon__run_server(void)
+{
+ int ret;
+
+ struct ipc_server_opts opts = {
+ .nr_threads = cl_args.nr_threads,
+ };
+
+ /*
+ * Synchronously run the ipc-server. We don't need any application
+ * instance data, so pass an arbitrary pointer (that we'll later
+ * verify made the round trip).
+ */
+ ret = ipc_server_run(cl_args.path, &opts, test_app_cb, (void*)&my_app_data);
+ if (ret == -2)
+ error(_("socket/pipe already in use: '%s'"), cl_args.path);
+ else if (ret == -1)
+ error_errno(_("could not start server on: '%s'"), cl_args.path);
+
+ return ret;
+}
+
+#ifndef GIT_WINDOWS_NATIVE
+/*
+ * This is adapted from `daemonize()`. Use `fork()` to directly create and
+ * run the daemon in a child process.
+ */
+static int spawn_server(pid_t *pid)
+{
+ struct ipc_server_opts opts = {
+ .nr_threads = cl_args.nr_threads,
+ };
+
+ *pid = fork();
+
+ switch (*pid) {
+ case 0:
+ if (setsid() == -1)
+ error_errno(_("setsid failed"));
+ close(0);
+ close(1);
+ close(2);
+ sanitize_stdfds();
+
+ return ipc_server_run(cl_args.path, &opts, test_app_cb,
+ (void*)&my_app_data);
+
+ case -1:
+ return error_errno(_("could not spawn daemon in the background"));
+
+ default:
+ return 0;
+ }
+}
+#else
+/*
+ * Conceptually like `daemonize()` but different because Windows does not
+ * have `fork(2)`. Spawn a normal Windows child process but without the
+ * limitations of `start_command()` and `finish_command()`.
+ */
+static int spawn_server(pid_t *pid)
+{
+ char test_tool_exe[MAX_PATH];
+ struct strvec args = STRVEC_INIT;
+ int in, out;
+
+ GetModuleFileNameA(NULL, test_tool_exe, MAX_PATH);
+
+ in = open("/dev/null", O_RDONLY);
+ out = open("/dev/null", O_WRONLY);
+
+ strvec_push(&args, test_tool_exe);
+ strvec_push(&args, "simple-ipc");
+ strvec_push(&args, "run-daemon");
+ strvec_pushf(&args, "--name=%s", cl_args.path);
+ strvec_pushf(&args, "--threads=%d", cl_args.nr_threads);
+
+ *pid = mingw_spawnvpe(args.v[0], args.v, NULL, NULL, in, out, out);
+ close(in);
+ close(out);
+
+ strvec_clear(&args);
+
+ if (*pid < 0)
+ return error(_("could not spawn daemon in the background"));
+
+ return 0;
+}
+#endif
+
+/*
+ * This is adapted from `wait_or_whine()`. Watch the child process and
+ * let it get started and begin listening for requests on the socket
+ * before reporting our success.
+ */
+static int wait_for_server_startup(pid_t pid_child)
+{
+ int status;
+ pid_t pid_seen;
+ enum ipc_active_state s;
+ time_t time_limit, now;
+
+ time(&time_limit);
+ time_limit += cl_args.max_wait_sec;
+
+ for (;;) {
+ pid_seen = waitpid(pid_child, &status, WNOHANG);
+
+ if (pid_seen == -1)
+ return error_errno(_("waitpid failed"));
+
+ else if (pid_seen == 0) {
+ /*
+ * The child is still running (this should be
+ * the normal case). Try to connect to it on
+ * the socket and see if it is ready for
+ * business.
+ *
+ * If there is another daemon already running,
+ * our child will fail to start (possibly
+ * after a timeout on the lock), but we don't
+ * care (who responds) if the socket is live.
+ */
+ s = ipc_get_active_state(cl_args.path);
+ if (s == IPC_STATE__LISTENING)
+ return 0;
+
+ time(&now);
+ if (now > time_limit)
+ return error(_("daemon not online yet"));
+
+ continue;
+ }
+
+ else if (pid_seen == pid_child) {
+ /*
+ * The new child daemon process shutdown while
+ * it was starting up, so it is not listening
+ * on the socket.
+ *
+ * Try to ping the socket in the odd chance
+ * that another daemon started (or was already
+ * running) while our child was starting.
+ *
+ * Again, we don't care who services the socket.
+ */
+ s = ipc_get_active_state(cl_args.path);
+ if (s == IPC_STATE__LISTENING)
+ return 0;
+
+ /*
+ * We don't care about the WEXITSTATUS() nor
+ * any of the WIF*(status) values because
+ * `cmd__simple_ipc()` does the `!!result`
+ * trick on all function return values.
+ *
+ * So it is sufficient to just report the
+ * early shutdown as an error.
+ */
+ return error(_("daemon failed to start"));
+ }
+
+ else
+ return error(_("waitpid is confused"));
+ }
+}
+
+/*
+ * This process will start a simple-ipc server in a background process and
+ * wait for it to become ready. This is like `daemonize()` but gives us
+ * more control and better error reporting (and makes it easier to write
+ * unit tests).
+ */
+static int daemon__start_server(void)
+{
+ pid_t pid_child;
+ int ret;
+
+ /*
+ * Run the actual daemon in a background process.
+ */
+ ret = spawn_server(&pid_child);
+ if (pid_child <= 0)
+ return ret;
+
+ /*
+ * Let the parent wait for the child process to get started
+ * and begin listening for requests on the socket.
+ */
+ ret = wait_for_server_startup(pid_child);
+
+ return ret;
+}
+
+/*
+ * This process will run a quick probe to see if a simple-ipc server
+ * is active on this path.
+ *
+ * Returns 0 if the server is alive.
+ */
+static int client__probe_server(void)
+{
+ enum ipc_active_state s;
+
+ s = ipc_get_active_state(cl_args.path);
+ switch (s) {
+ case IPC_STATE__LISTENING:
+ return 0;
+
+ case IPC_STATE__NOT_LISTENING:
+ return error("no server listening at '%s'", cl_args.path);
+
+ case IPC_STATE__PATH_NOT_FOUND:
+ return error("path not found '%s'", cl_args.path);
+
+ case IPC_STATE__INVALID_PATH:
+ return error("invalid pipe/socket name '%s'", cl_args.path);
+
+ case IPC_STATE__OTHER_ERROR:
+ default:
+ return error("other error for '%s'", cl_args.path);
+ }
+}
+
+/*
+ * Send an IPC command token to an already-running server daemon and
+ * print the response.
+ *
+ * This is a simple 1 word command/token that `test_app_cb()` (in the
+ * daemon process) will understand.
+ */
+static int client__send_ipc(void)
+{
+ const char *command = "(no-command)";
+ struct strbuf buf = STRBUF_INIT;
+ struct ipc_client_connect_options options
+ = IPC_CLIENT_CONNECT_OPTIONS_INIT;
+
+ if (cl_args.token && *cl_args.token)
+ command = cl_args.token;
+
+ options.wait_if_busy = 1;
+ options.wait_if_not_found = 0;
+
+ if (!ipc_client_send_command(cl_args.path, &options, command, &buf)) {
+ if (buf.len) {
+ printf("%s\n", buf.buf);
+ fflush(stdout);
+ }
+ strbuf_release(&buf);
+
+ return 0;
+ }
+
+ return error("failed to send '%s' to '%s'", command, cl_args.path);
+}
+
+/*
+ * Send an IPC command to an already-running server and ask it to
+ * shutdown. "send quit" is an async request and queues a shutdown
+ * event in the server, so we spin and wait here for it to actually
+ * shutdown to make the unit tests a little easier to write.
+ */
+static int client__stop_server(void)
+{
+ int ret;
+ time_t time_limit, now;
+ enum ipc_active_state s;
+
+ time(&time_limit);
+ time_limit += cl_args.max_wait_sec;
+
+ cl_args.token = "quit";
+
+ ret = client__send_ipc();
+ if (ret)
+ return ret;
+
+ for (;;) {
+ sleep_millisec(100);
+
+ s = ipc_get_active_state(cl_args.path);
+
+ if (s != IPC_STATE__LISTENING) {
+ /*
+ * The socket/pipe is gone and/or has stopped
+ * responding. Lets assume that the daemon
+ * process has exited too.
+ */
+ return 0;
+ }
+
+ time(&now);
+ if (now > time_limit)
+ return error(_("daemon has not shutdown yet"));
+ }
+}
+
+/*
+ * Send an IPC command followed by ballast to confirm that a large
+ * message can be sent and that the kernel or pkt-line layers will
+ * properly chunk it and that the daemon receives the entire message.
+ */
+static int do_sendbytes(int bytecount, char byte, const char *path,
+ const struct ipc_client_connect_options *options)
+{
+ struct strbuf buf_send = STRBUF_INIT;
+ struct strbuf buf_resp = STRBUF_INIT;
+
+ strbuf_addstr(&buf_send, "sendbytes ");
+ strbuf_addchars(&buf_send, byte, bytecount);
+
+ if (!ipc_client_send_command(path, options, buf_send.buf, &buf_resp)) {
+ strbuf_rtrim(&buf_resp);
+ printf("sent:%c%08d %s\n", byte, bytecount, buf_resp.buf);
+ fflush(stdout);
+ strbuf_release(&buf_send);
+ strbuf_release(&buf_resp);
+
+ return 0;
+ }
+
+ return error("client failed to sendbytes(%d, '%c') to '%s'",
+ bytecount, byte, path);
+}
+
+/*
+ * Send an IPC command with ballast to an already-running server daemon.
+ */
+static int client__sendbytes(void)
+{
+ struct ipc_client_connect_options options
+ = IPC_CLIENT_CONNECT_OPTIONS_INIT;
+
+ options.wait_if_busy = 1;
+ options.wait_if_not_found = 0;
+ options.uds_disallow_chdir = 0;
+
+ return do_sendbytes(cl_args.bytecount, cl_args.bytevalue, cl_args.path,
+ &options);
+}
+
+struct multiple_thread_data {
+ pthread_t pthread_id;
+ struct multiple_thread_data *next;
+ const char *path;
+ int bytecount;
+ int batchsize;
+ int sum_errors;
+ int sum_good;
+ char letter;
+};
+
+static void *multiple_thread_proc(void *_multiple_thread_data)
+{
+ struct multiple_thread_data *d = _multiple_thread_data;
+ int k;
+ struct ipc_client_connect_options options
+ = IPC_CLIENT_CONNECT_OPTIONS_INIT;
+
+ options.wait_if_busy = 1;
+ options.wait_if_not_found = 0;
+ /*
+ * A multi-threaded client should not be randomly calling chdir().
+ * The test will pass without this restriction because the test is
+ * not otherwise accessing the filesystem, but it makes us honest.
+ */
+ options.uds_disallow_chdir = 1;
+
+ trace2_thread_start("multiple");
+
+ for (k = 0; k < d->batchsize; k++) {
+ if (do_sendbytes(d->bytecount + k, d->letter, d->path, &options))
+ d->sum_errors++;
+ else
+ d->sum_good++;
+ }
+
+ trace2_thread_exit();
+ return NULL;
+}
+
+/*
+ * Start a client-side thread pool. Each thread sends a series of
+ * IPC requests. Each request is on a new connection to the server.
+ */
+static int client__multiple(void)
+{
+ struct multiple_thread_data *list = NULL;
+ int k;
+ int sum_join_errors = 0;
+ int sum_thread_errors = 0;
+ int sum_good = 0;
+
+ for (k = 0; k < cl_args.nr_threads; k++) {
+ struct multiple_thread_data *d = xcalloc(1, sizeof(*d));
+ d->next = list;
+ d->path = cl_args.path;
+ d->bytecount = cl_args.bytecount + cl_args.batchsize*(k/26);
+ d->batchsize = cl_args.batchsize;
+ d->sum_errors = 0;
+ d->sum_good = 0;
+ d->letter = 'A' + (k % 26);
+
+ if (pthread_create(&d->pthread_id, NULL, multiple_thread_proc, d)) {
+ warning("failed to create thread[%d] skipping remainder", k);
+ free(d);
+ break;
+ }
+
+ list = d;
+ }
+
+ while (list) {
+ struct multiple_thread_data *d = list;
+
+ if (pthread_join(d->pthread_id, NULL))
+ sum_join_errors++;
+
+ sum_thread_errors += d->sum_errors;
+ sum_good += d->sum_good;
+
+ list = d->next;
+ free(d);
+ }
+
+ printf("client (good %d) (join %d), (errors %d)\n",
+ sum_good, sum_join_errors, sum_thread_errors);
+
+ return (sum_join_errors + sum_thread_errors) ? 1 : 0;
+}
+
+int cmd__simple_ipc(int argc, const char **argv)
+{
+ const char * const simple_ipc_usage[] = {
+ N_("test-helper simple-ipc is-active [<name>] [<options>]"),
+ N_("test-helper simple-ipc run-daemon [<name>] [<threads>]"),
+ N_("test-helper simple-ipc start-daemon [<name>] [<threads>] [<max-wait>]"),
+ N_("test-helper simple-ipc stop-daemon [<name>] [<max-wait>]"),
+ N_("test-helper simple-ipc send [<name>] [<token>]"),
+ N_("test-helper simple-ipc sendbytes [<name>] [<bytecount>] [<byte>]"),
+ N_("test-helper simple-ipc multiple [<name>] [<threads>] [<bytecount>] [<batchsize>]"),
+ NULL
+ };
+
+ const char *bytevalue = NULL;
+
+ struct option options[] = {
+#ifndef GIT_WINDOWS_NATIVE
+ OPT_STRING(0, "name", &cl_args.path, N_("name"), N_("name or pathname of unix domain socket")),
+#else
+ OPT_STRING(0, "name", &cl_args.path, N_("name"), N_("named-pipe name")),
+#endif
+ OPT_INTEGER(0, "threads", &cl_args.nr_threads, N_("number of threads in server thread pool")),
+ OPT_INTEGER(0, "max-wait", &cl_args.max_wait_sec, N_("seconds to wait for daemon to start or stop")),
+
+ OPT_INTEGER(0, "bytecount", &cl_args.bytecount, N_("number of bytes")),
+ OPT_INTEGER(0, "batchsize", &cl_args.batchsize, N_("number of requests per thread")),
+
+ OPT_STRING(0, "byte", &bytevalue, N_("byte"), N_("ballast character")),
+ OPT_STRING(0, "token", &cl_args.token, N_("token"), N_("command token to send to the server")),
+
+ OPT_END()
+ };
+
+ if (argc < 2)
+ usage_with_options(simple_ipc_usage, options);
+
+ if (argc == 2 && !strcmp(argv[1], "-h"))
+ usage_with_options(simple_ipc_usage, options);
+
+ if (argc == 2 && !strcmp(argv[1], "SUPPORTS_SIMPLE_IPC"))
+ return 0;
+
+ cl_args.subcommand = argv[1];
+
+ argc--;
+ argv++;
+
+ argc = parse_options(argc, argv, NULL, options, simple_ipc_usage, 0);
+
+ if (cl_args.nr_threads < 1)
+ cl_args.nr_threads = 1;
+ if (cl_args.max_wait_sec < 0)
+ cl_args.max_wait_sec = 0;
+ if (cl_args.bytecount < 1)
+ cl_args.bytecount = 1;
+ if (cl_args.batchsize < 1)
+ cl_args.batchsize = 1;
+
+ if (bytevalue && *bytevalue)
+ cl_args.bytevalue = bytevalue[0];
+
+ /*
+ * Use '!!' on all dispatch functions to map from `error()` style
+ * (returns -1) style to `test_must_fail` style (expects 1). This
+ * makes shell error messages less confusing.
+ */
+
+ if (!strcmp(cl_args.subcommand, "is-active"))
+ return !!client__probe_server();
+
+ if (!strcmp(cl_args.subcommand, "run-daemon"))
+ return !!daemon__run_server();
+
+ if (!strcmp(cl_args.subcommand, "start-daemon"))
+ return !!daemon__start_server();
+
+ /*
+ * Client commands follow. Ensure a server is running before
+ * sending any data. This might be overkill, but then again
+ * this is a test harness.
+ */
+
+ if (!strcmp(cl_args.subcommand, "stop-daemon")) {
+ if (client__probe_server())
+ return 1;
+ return !!client__stop_server();
+ }
+
+ if (!strcmp(cl_args.subcommand, "send")) {
+ if (client__probe_server())
+ return 1;
+ return !!client__send_ipc();
+ }
+
+ if (!strcmp(cl_args.subcommand, "sendbytes")) {
+ if (client__probe_server())
+ return 1;
+ return !!client__sendbytes();
+ }
+
+ if (!strcmp(cl_args.subcommand, "multiple")) {
+ if (client__probe_server())
+ return 1;
+ return !!client__multiple();
+ }
+
+ die("Unhandled subcommand: '%s'", cl_args.subcommand);
+}
+#endif
diff --git a/t/helper/test-tool.c b/t/helper/test-tool.c
index f97cd9f48a..287aa60023 100644
--- a/t/helper/test-tool.c
+++ b/t/helper/test-tool.c
@@ -65,6 +65,7 @@ static struct test_cmd cmds[] = {
{ "sha1", cmd__sha1 },
{ "sha256", cmd__sha256 },
{ "sigchain", cmd__sigchain },
+ { "simple-ipc", cmd__simple_ipc },
{ "strcmp-offset", cmd__strcmp_offset },
{ "string-list", cmd__string_list },
{ "submodule-config", cmd__submodule_config },
diff --git a/t/helper/test-tool.h b/t/helper/test-tool.h
index 28072c0ad5..9ea4b31011 100644
--- a/t/helper/test-tool.h
+++ b/t/helper/test-tool.h
@@ -55,6 +55,7 @@ int cmd__sha1(int argc, const char **argv);
int cmd__oid_array(int argc, const char **argv);
int cmd__sha256(int argc, const char **argv);
int cmd__sigchain(int argc, const char **argv);
+int cmd__simple_ipc(int argc, const char **argv);
int cmd__strcmp_offset(int argc, const char **argv);
int cmd__string_list(int argc, const char **argv);
int cmd__submodule_config(int argc, const char **argv);
diff --git a/t/t0052-simple-ipc.sh b/t/t0052-simple-ipc.sh
new file mode 100755
index 0000000000..ff98be31a5
--- /dev/null
+++ b/t/t0052-simple-ipc.sh
@@ -0,0 +1,122 @@
+#!/bin/sh
+
+test_description='simple command server'
+
+. ./test-lib.sh
+
+test-tool simple-ipc SUPPORTS_SIMPLE_IPC || {
+ skip_all='simple IPC not supported on this platform'
+ test_done
+}
+
+stop_simple_IPC_server () {
+ test-tool simple-ipc stop-daemon
+}
+
+test_expect_success 'start simple command server' '
+ test_atexit stop_simple_IPC_server &&
+ test-tool simple-ipc start-daemon --threads=8 &&
+ test-tool simple-ipc is-active
+'
+
+test_expect_success 'simple command server' '
+ test-tool simple-ipc send --token=ping >actual &&
+ echo pong >expect &&
+ test_cmp expect actual
+'
+
+test_expect_success 'servers cannot share the same path' '
+ test_must_fail test-tool simple-ipc run-daemon &&
+ test-tool simple-ipc is-active
+'
+
+test_expect_success 'big response' '
+ test-tool simple-ipc send --token=big >actual &&
+ test_line_count -ge 10000 actual &&
+ grep -q "big: [0]*9999\$" actual
+'
+
+test_expect_success 'chunk response' '
+ test-tool simple-ipc send --token=chunk >actual &&
+ test_line_count -ge 10000 actual &&
+ grep -q "big: [0]*9999\$" actual
+'
+
+test_expect_success 'slow response' '
+ test-tool simple-ipc send --token=slow >actual &&
+ test_line_count -ge 100 actual &&
+ grep -q "big: [0]*99\$" actual
+'
+
+# Send an IPC with n=100,000 bytes of ballast. This should be large enough
+# to force both the kernel and the pkt-line layer to chunk the message to the
+# daemon and for the daemon to receive it in chunks.
+#
+test_expect_success 'sendbytes' '
+ test-tool simple-ipc sendbytes --bytecount=100000 --byte=A >actual &&
+ grep "sent:A00100000 rcvd:A00100000" actual
+'
+
+# Start a series of <threads> client threads that each make <batchsize>
+# IPC requests to the server. Each (<threads> * <batchsize>) request
+# will open a new connection to the server and randomly bind to a server
+# thread. Each client thread exits after completing its batch. So the
+# total number of live client threads will be smaller than the total.
+# Each request will send a message containing at least <bytecount> bytes
+# of ballast. (Responses are small.)
+#
+# The purpose here is to test threading in the server and responding to
+# many concurrent client requests (regardless of whether they come from
+# 1 client process or many). And to test that the server side of the
+# named pipe/socket is stable. (On Windows this means that the server
+# pipe is properly recycled.)
+#
+# On Windows it also lets us adjust the connection timeout in the
+# `ipc_client_send_command()`.
+#
+# Note it is easy to drive the system into failure by requesting an
+# insane number of threads on client or server and/or increasing the
+# per-thread batchsize or the per-request bytecount (ballast).
+# On Windows these failures look like "pipe is busy" errors.
+# So I've chosen fairly conservative values for now.
+#
+# We expect output of the form "sent:<letter><length> ..."
+# With terms (7, 19, 13) we expect:
+# <letter> in [A-G]
+# <length> in [19+0 .. 19+(13-1)]
+# and (7 * 13) successful responses.
+#
+test_expect_success 'stress test threads' '
+ test-tool simple-ipc multiple \
+ --threads=7 \
+ --bytecount=19 \
+ --batchsize=13 \
+ >actual &&
+ test_line_count = 92 actual &&
+ grep "good 91" actual &&
+ grep "sent:A" <actual >actual_a &&
+ cat >expect_a <<-EOF &&
+ sent:A00000019 rcvd:A00000019
+ sent:A00000020 rcvd:A00000020
+ sent:A00000021 rcvd:A00000021
+ sent:A00000022 rcvd:A00000022
+ sent:A00000023 rcvd:A00000023
+ sent:A00000024 rcvd:A00000024
+ sent:A00000025 rcvd:A00000025
+ sent:A00000026 rcvd:A00000026
+ sent:A00000027 rcvd:A00000027
+ sent:A00000028 rcvd:A00000028
+ sent:A00000029 rcvd:A00000029
+ sent:A00000030 rcvd:A00000030
+ sent:A00000031 rcvd:A00000031
+ EOF
+ test_cmp expect_a actual_a
+'
+
+test_expect_success 'stop-daemon works' '
+ test-tool simple-ipc stop-daemon &&
+ test_must_fail test-tool simple-ipc is-active &&
+ test_must_fail test-tool simple-ipc send --token=ping
+'
+
+test_done
diff --git a/unix-socket.c b/unix-socket.c
index 19ed48be99..e0be1badb5 100644
--- a/unix-socket.c
+++ b/unix-socket.c
@@ -1,13 +1,7 @@
#include "cache.h"
#include "unix-socket.h"
-static int unix_stream_socket(void)
-{
- int fd = socket(AF_UNIX, SOCK_STREAM, 0);
- if (fd < 0)
- die_errno("unable to create socket");
- return fd;
-}
+#define DEFAULT_UNIX_STREAM_LISTEN_BACKLOG (5)
static int chdir_len(const char *orig, int len)
{
@@ -36,16 +30,23 @@ static void unix_sockaddr_cleanup(struct unix_sockaddr_context *ctx)
}
static int unix_sockaddr_init(struct sockaddr_un *sa, const char *path,
- struct unix_sockaddr_context *ctx)
+ struct unix_sockaddr_context *ctx,
+ int disallow_chdir)
{
int size = strlen(path) + 1;
ctx->orig_dir = NULL;
if (size > sizeof(sa->sun_path)) {
- const char *slash = find_last_dir_sep(path);
+ const char *slash;
const char *dir;
struct strbuf cwd = STRBUF_INIT;
+ if (disallow_chdir) {
+ errno = ENAMETOOLONG;
+ return -1;
+ }
+
+ slash = find_last_dir_sep(path);
if (!slash) {
errno = ENAMETOOLONG;
return -1;
@@ -71,15 +72,18 @@ static int unix_sockaddr_init(struct sockaddr_un *sa, const char *path,
return 0;
}
-int unix_stream_connect(const char *path)
+int unix_stream_connect(const char *path, int disallow_chdir)
{
- int fd, saved_errno;
+ int fd = -1, saved_errno;
struct sockaddr_un sa;
struct unix_sockaddr_context ctx;
- if (unix_sockaddr_init(&sa, path, &ctx) < 0)
+ if (unix_sockaddr_init(&sa, path, &ctx, disallow_chdir) < 0)
return -1;
- fd = unix_stream_socket();
+ fd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (fd < 0)
+ goto fail;
+
if (connect(fd, (struct sockaddr *)&sa, sizeof(sa)) < 0)
goto fail;
unix_sockaddr_cleanup(&ctx);
@@ -87,28 +91,36 @@ int unix_stream_connect(const char *path)
fail:
saved_errno = errno;
+ if (fd != -1)
+ close(fd);
unix_sockaddr_cleanup(&ctx);
- close(fd);
errno = saved_errno;
return -1;
}
-int unix_stream_listen(const char *path)
+int unix_stream_listen(const char *path,
+ const struct unix_stream_listen_opts *opts)
{
- int fd, saved_errno;
+ int fd = -1, saved_errno;
+ int backlog;
struct sockaddr_un sa;
struct unix_sockaddr_context ctx;
unlink(path);
- if (unix_sockaddr_init(&sa, path, &ctx) < 0)
+ if (unix_sockaddr_init(&sa, path, &ctx, opts->disallow_chdir) < 0)
return -1;
- fd = unix_stream_socket();
+ fd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (fd < 0)
+ goto fail;
if (bind(fd, (struct sockaddr *)&sa, sizeof(sa)) < 0)
goto fail;
- if (listen(fd, 5) < 0)
+ backlog = opts->listen_backlog_size;
+ if (backlog <= 0)
+ backlog = DEFAULT_UNIX_STREAM_LISTEN_BACKLOG;
+ if (listen(fd, backlog) < 0)
goto fail;
unix_sockaddr_cleanup(&ctx);
@@ -116,8 +128,9 @@ int unix_stream_listen(const char *path)
fail:
saved_errno = errno;
+ if (fd != -1)
+ close(fd);
unix_sockaddr_cleanup(&ctx);
- close(fd);
errno = saved_errno;
return -1;
}
diff --git a/unix-socket.h b/unix-socket.h
index e271aeec5a..8542cdd799 100644
--- a/unix-socket.h
+++ b/unix-socket.h
@@ -1,7 +1,15 @@
#ifndef UNIX_SOCKET_H
#define UNIX_SOCKET_H
-int unix_stream_connect(const char *path);
-int unix_stream_listen(const char *path);
+struct unix_stream_listen_opts {
+ int listen_backlog_size;
+ unsigned int disallow_chdir:1;
+};
+
+#define UNIX_STREAM_LISTEN_OPTS_INIT { 0 }
+
+int unix_stream_connect(const char *path, int disallow_chdir);
+int unix_stream_listen(const char *path,
+ const struct unix_stream_listen_opts *opts);
#endif /* UNIX_SOCKET_H */
diff --git a/unix-stream-server.c b/unix-stream-server.c
new file mode 100644
index 0000000000..efa2a207ab
--- /dev/null
+++ b/unix-stream-server.c
@@ -0,0 +1,125 @@
+#include "cache.h"
+#include "lockfile.h"
+#include "unix-socket.h"
+#include "unix-stream-server.h"
+
+#define DEFAULT_LOCK_TIMEOUT (100)
+
+/*
+ * Try to connect to a unix domain socket at `path` (if it exists) and
+ * see if there is a server listening.
+ *
+ * We don't know if the socket exists, whether a server died and
+ * failed to cleanup, or whether we have a live server listening, so
+ * we "poke" it.
+ *
+ * We immediately hangup without sending/receiving any data because we
+ * don't know anything about the protocol spoken and don't want to
+ * block while writing/reading data. It is sufficient to just know
+ * that someone is listening.
+ */
+static int is_another_server_alive(const char *path,
+ const struct unix_stream_listen_opts *opts)
+{
+ int fd = unix_stream_connect(path, opts->disallow_chdir);
+ if (fd >= 0) {
+ close(fd);
+ return 1;
+ }
+
+ return 0;
+}
+
+int unix_ss_create(const char *path,
+ const struct unix_stream_listen_opts *opts,
+ long timeout_ms,
+ struct unix_ss_socket **new_server_socket)
+{
+ struct lock_file lock = LOCK_INIT;
+ int fd_socket;
+ struct unix_ss_socket *server_socket;
+
+ *new_server_socket = NULL;
+
+ if (timeout_ms < 0)
+ timeout_ms = DEFAULT_LOCK_TIMEOUT;
+
+ /*
+ * Create a lock at "<path>.lock" if we can.
+ */
+ if (hold_lock_file_for_update_timeout(&lock, path, 0, timeout_ms) < 0)
+ return -1;
+
+ /*
+ * If another server is listening on "<path>" give up. We do not
+ * want to create a socket and steal future connections from them.
+ */
+ if (is_another_server_alive(path, opts)) {
+ rollback_lock_file(&lock);
+ errno = EADDRINUSE;
+ return -2;
+ }
+
+ /*
+ * Create and bind to a Unix domain socket at "<path>".
+ */
+ fd_socket = unix_stream_listen(path, opts);
+ if (fd_socket < 0) {
+ int saved_errno = errno;
+ rollback_lock_file(&lock);
+ errno = saved_errno;
+ return -1;
+ }
+
+ server_socket = xcalloc(1, sizeof(*server_socket));
+ server_socket->path_socket = strdup(path);
+ server_socket->fd_socket = fd_socket;
+ lstat(path, &server_socket->st_socket);
+
+ *new_server_socket = server_socket;
+
+ /*
+ * Always rollback (just delete) "<path>.lock" because we already created
+ * "<path>" as a socket and do not want to commit_lock to do the atomic
+ * rename trick.
+ */
+ rollback_lock_file(&lock);
+
+ return 0;
+}
+
+void unix_ss_free(struct unix_ss_socket *server_socket)
+{
+ if (!server_socket)
+ return;
+
+ if (server_socket->fd_socket >= 0) {
+ if (!unix_ss_was_stolen(server_socket))
+ unlink(server_socket->path_socket);
+ close(server_socket->fd_socket);
+ }
+
+ free(server_socket->path_socket);
+ free(server_socket);
+}
+
+int unix_ss_was_stolen(struct unix_ss_socket *server_socket)
+{
+ struct stat st_now;
+
+ if (!server_socket)
+ return 0;
+
+ if (lstat(server_socket->path_socket, &st_now) == -1)
+ return 1;
+
+ if (st_now.st_ino != server_socket->st_socket.st_ino)
+ return 1;
+ if (st_now.st_dev != server_socket->st_socket.st_dev)
+ return 1;
+
+ if (!S_ISSOCK(st_now.st_mode))
+ return 1;
+
+ return 0;
+}
diff --git a/unix-stream-server.h b/unix-stream-server.h
new file mode 100644
index 0000000000..ae2712ba39
--- /dev/null
+++ b/unix-stream-server.h
@@ -0,0 +1,33 @@
+#ifndef UNIX_STREAM_SERVER_H
+#define UNIX_STREAM_SERVER_H
+
+#include "unix-socket.h"
+
+struct unix_ss_socket {
+ char *path_socket;
+ struct stat st_socket;
+ int fd_socket;
+};
+
+/*
+ * Create a Unix Domain Socket at the given path under the protection
+ * of a '.lock' lockfile.
+ *
+ * Returns 0 on success, -1 on error, -2 if socket is in use.
+ */
+int unix_ss_create(const char *path,
+ const struct unix_stream_listen_opts *opts,
+ long timeout_ms,
+ struct unix_ss_socket **server_socket);
+
+/*
+ * Close and delete the socket.
+ */
+void unix_ss_free(struct unix_ss_socket *server_socket);
+
+/*
+ * Return 1 if the inode of the pathname to our socket changes.
+ */
+int unix_ss_was_stolen(struct unix_ss_socket *server_socket);
+
+#endif /* UNIX_STREAM_SERVER_H */