diff options
Diffstat (limited to 'run-command.c')
-rw-r--r-- | run-command.c | 484 |
1 files changed, 413 insertions, 71 deletions
diff --git a/run-command.c b/run-command.c index aad03ab705..e4593cd99b 100644 --- a/run-command.c +++ b/run-command.c @@ -3,6 +3,8 @@ #include "exec_cmd.h" #include "sigchain.h" #include "argv-array.h" +#include "thread-utils.h" +#include "strbuf.h" void child_process_init(struct child_process *child) { @@ -11,6 +13,12 @@ void child_process_init(struct child_process *child) argv_array_init(&child->env_array); } +void child_process_clear(struct child_process *child) +{ + argv_array_clear(&child->args); + argv_array_clear(&child->env_array); +} + struct child_to_clean { pid_t pid; struct child_to_clean *next; @@ -18,26 +26,27 @@ struct child_to_clean { static struct child_to_clean *children_to_clean; static int installed_child_cleanup_handler; -static void cleanup_children(int sig) +static void cleanup_children(int sig, int in_signal) { while (children_to_clean) { struct child_to_clean *p = children_to_clean; children_to_clean = p->next; kill(p->pid, sig); - free(p); + if (!in_signal) + free(p); } } static void cleanup_children_on_signal(int sig) { - cleanup_children(sig); + cleanup_children(sig, 1); sigchain_pop(sig); raise(sig); } static void cleanup_children_on_exit(void) { - cleanup_children(SIGTERM); + cleanup_children(SIGTERM, 0); } static void mark_child_for_cleanup(pid_t pid) @@ -151,56 +160,46 @@ int sane_execvp(const char *file, char * const argv[]) return -1; } -static const char **prepare_shell_cmd(const char **argv) +static const char **prepare_shell_cmd(struct argv_array *out, const char **argv) { - int argc, nargc = 0; - const char **nargv; - - for (argc = 0; argv[argc]; argc++) - ; /* just counting */ - /* +1 for NULL, +3 for "sh -c" plus extra $0 */ - nargv = xmalloc(sizeof(*nargv) * (argc + 1 + 3)); - - if (argc < 1) + if (!argv[0]) die("BUG: shell command is empty"); if (strcspn(argv[0], "|&;<>()$`\\\"' \t\n*?[#~=%") != strlen(argv[0])) { #ifndef GIT_WINDOWS_NATIVE - nargv[nargc++] = SHELL_PATH; + argv_array_push(out, SHELL_PATH); #else - nargv[nargc++] = "sh"; + argv_array_push(out, "sh"); #endif - nargv[nargc++] = "-c"; - - if (argc < 2) - nargv[nargc++] = argv[0]; - else { - struct strbuf arg0 = STRBUF_INIT; - strbuf_addf(&arg0, "%s \"$@\"", argv[0]); - nargv[nargc++] = strbuf_detach(&arg0, NULL); - } - } + argv_array_push(out, "-c"); - for (argc = 0; argv[argc]; argc++) - nargv[nargc++] = argv[argc]; - nargv[nargc] = NULL; + /* + * If we have no extra arguments, we do not even need to + * bother with the "$@" magic. + */ + if (!argv[1]) + argv_array_push(out, argv[0]); + else + argv_array_pushf(out, "%s \"$@\"", argv[0]); + } - return nargv; + argv_array_pushv(out, argv); + return out->argv; } #ifndef GIT_WINDOWS_NATIVE static int execv_shell_cmd(const char **argv) { - const char **nargv = prepare_shell_cmd(argv); - trace_argv_printf(nargv, "trace: exec:"); - sane_execvp(nargv[0], (char **)nargv); - free(nargv); + struct argv_array nargv = ARGV_ARRAY_INIT; + prepare_shell_cmd(&nargv, argv); + trace_argv_printf(nargv.argv, "trace: exec:"); + sane_execvp(nargv.argv[0], (char **)nargv.argv); + argv_array_clear(&nargv); return -1; } #endif #ifndef GIT_WINDOWS_NATIVE -static int child_err = 2; static int child_notifier = -1; static void notify_parent(void) @@ -212,17 +211,6 @@ static void notify_parent(void) */ xwrite(child_notifier, "", 1); } - -static NORETURN void die_child(const char *err, va_list params) -{ - vwritef(child_err, "fatal: ", err, params); - exit(128); -} - -static void error_child(const char *err, va_list params) -{ - vwritef(child_err, "error: ", err, params); -} #endif static inline void set_cloexec(int fd) @@ -232,7 +220,7 @@ static inline void set_cloexec(int fd) fcntl(fd, F_SETFD, flags | FD_CLOEXEC); } -static int wait_or_whine(pid_t pid, const char *argv0) +static int wait_or_whine(pid_t pid, const char *argv0, int in_signal) { int status, code = -1; pid_t waiting; @@ -240,6 +228,8 @@ static int wait_or_whine(pid_t pid, const char *argv0) while ((waiting = waitpid(pid, &status, 0)) < 0 && errno == EINTR) ; /* nothing */ + if (in_signal) + return 0; if (waiting < 0) { failed_errno = errno; @@ -248,7 +238,7 @@ static int wait_or_whine(pid_t pid, const char *argv0) error("waitpid is confused (%s)", argv0); } else if (WIFSIGNALED(status)) { code = WTERMSIG(status); - if (code != SIGINT && code != SIGQUIT) + if (code != SIGINT && code != SIGQUIT && code != SIGPIPE) error("%s died of signal %d", argv0, code); /* * This return value is chosen so that code & 0xff @@ -336,8 +326,7 @@ int start_command(struct child_process *cmd) fail_pipe: error("cannot create %s pipe for %s: %s", str, cmd->argv[0], strerror(failed_errno)); - argv_array_clear(&cmd->args); - argv_array_clear(&cmd->env_array); + child_process_clear(cmd); errno = failed_errno; return -1; } @@ -362,11 +351,10 @@ fail_pipe: * in subsequent call paths use the parent's stderr. */ if (cmd->no_stderr || need_err) { - child_err = dup(2); + int child_err = dup(2); set_cloexec(child_err); + set_error_handle(fdopen(child_err, "w")); } - set_die_routine(die_child); - set_error_routine(error_child); close(notify_pipe[0]); set_cloexec(notify_pipe[1]); @@ -450,7 +438,7 @@ fail_pipe: * At this point we know that fork() succeeded, but execvp() * failed. Errors have been reported to our stderr. */ - wait_or_whine(cmd->pid, cmd->argv[0]); + wait_or_whine(cmd->pid, cmd->argv[0], 0); failed_errno = errno; cmd->pid = -1; } @@ -460,6 +448,7 @@ fail_pipe: { int fhin = 0, fhout = 1, fherr = 2; const char **sargv = cmd->argv; + struct argv_array nargv = ARGV_ARRAY_INIT; if (cmd->no_stdin) fhin = open("/dev/null", O_RDWR); @@ -485,9 +474,9 @@ fail_pipe: fhout = dup(cmd->out); if (cmd->git_cmd) - cmd->argv = prepare_git_cmd(cmd->argv); + cmd->argv = prepare_git_cmd(&nargv, cmd->argv); else if (cmd->use_shell) - cmd->argv = prepare_shell_cmd(cmd->argv); + cmd->argv = prepare_shell_cmd(&nargv, cmd->argv); cmd->pid = mingw_spawnvpe(cmd->argv[0], cmd->argv, (char**) cmd->env, cmd->dir, fhin, fhout, fherr); @@ -497,9 +486,7 @@ fail_pipe: if (cmd->clean_on_exit && cmd->pid >= 0) mark_child_for_cleanup(cmd->pid); - if (cmd->git_cmd) - free(cmd->argv); - + argv_array_clear(&nargv); cmd->argv = sargv; if (fhin != 0) close(fhin); @@ -523,8 +510,7 @@ fail_pipe: close_pair(fderr); else if (cmd->err) close(cmd->err); - argv_array_clear(&cmd->args); - argv_array_clear(&cmd->env_array); + child_process_clear(cmd); errno = failed_errno; return -1; } @@ -549,12 +535,17 @@ fail_pipe: int finish_command(struct child_process *cmd) { - int ret = wait_or_whine(cmd->pid, cmd->argv[0]); - argv_array_clear(&cmd->args); - argv_array_clear(&cmd->env_array); + int ret = wait_or_whine(cmd->pid, cmd->argv[0], 0); + child_process_clear(cmd); return ret; } +int finish_command_in_signal(struct child_process *cmd) +{ + return wait_or_whine(cmd->pid, cmd->argv[0], 1); +} + + int run_command(struct child_process *cmd) { int code; @@ -599,6 +590,16 @@ static void *run_thread(void *data) struct async *async = data; intptr_t ret; + if (async->isolate_sigpipe) { + sigset_t mask; + sigemptyset(&mask); + sigaddset(&mask, SIGPIPE); + if (pthread_sigmask(SIG_BLOCK, &mask, NULL) < 0) { + ret = error("unable to block SIGPIPE in async thread"); + return (void *)ret; + } + } + pthread_setspecific(async_key, async); ret = async->proc(async->proc_in, async->proc_out, async->data); return (void *)ret; @@ -608,7 +609,7 @@ static NORETURN void die_async(const char *err, va_list params) { vreportf("fatal: ", err, params); - if (!pthread_equal(main_thread, pthread_self())) { + if (in_async()) { struct async *async = pthread_getspecific(async_key); if (async->proc_in >= 0) close(async->proc_in); @@ -627,6 +628,18 @@ static int async_die_is_recursing(void) return ret != NULL; } +int in_async(void) +{ + if (!main_thread_set) + return 0; /* no asyncs started yet */ + return !pthread_equal(main_thread, pthread_self()); +} + +void NORETURN async_exit(int code) +{ + pthread_exit((void *)(intptr_t)code); +} + #else static struct { @@ -666,6 +679,17 @@ int git_atexit(void (*handler)(void)) } #define atexit git_atexit +static int process_is_async; +int in_async(void) +{ + return process_is_async; +} + +void NORETURN async_exit(int code) +{ + exit(code); +} + #endif int start_async(struct async *async) @@ -725,6 +749,7 @@ int start_async(struct async *async) if (need_out) close(fdout[0]); git_atexit_clear(); + process_is_async = 1; exit(!!async->proc(proc_in, proc_out, async->data)); } @@ -785,7 +810,7 @@ error: int finish_async(struct async *async) { #ifdef NO_PTHREADS - return wait_or_whine(async->pid, "child process"); + return wait_or_whine(async->pid, "child process", 0); #else void *ret = (void *)(intptr_t)(-1); @@ -795,13 +820,15 @@ int finish_async(struct async *async) #endif } -char *find_hook(const char *name) +const char *find_hook(const char *name) { - char *path = git_path("hooks/%s", name); - if (access(path, X_OK) < 0) - path = NULL; + static struct strbuf path = STRBUF_INIT; - return path; + strbuf_reset(&path); + strbuf_git_path(&path, "hooks/%s", name); + if (access(path.buf, X_OK) < 0) + return NULL; + return path.buf; } int run_hook_ve(const char *const *env, const char *name, va_list args) @@ -850,3 +877,318 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint) close(cmd->out); return finish_command(cmd); } + +enum child_state { + GIT_CP_FREE, + GIT_CP_WORKING, + GIT_CP_WAIT_CLEANUP, +}; + +struct parallel_processes { + void *data; + + int max_processes; + int nr_processes; + + get_next_task_fn get_next_task; + start_failure_fn start_failure; + task_finished_fn task_finished; + + struct { + enum child_state state; + struct child_process process; + struct strbuf err; + void *data; + } *children; + /* + * The struct pollfd is logically part of *children, + * but the system call expects it as its own array. + */ + struct pollfd *pfd; + + unsigned shutdown : 1; + + int output_owner; + struct strbuf buffered_output; /* of finished children */ +}; + +static int default_start_failure(struct strbuf *out, + void *pp_cb, + void *pp_task_cb) +{ + return 0; +} + +static int default_task_finished(int result, + struct strbuf *out, + void *pp_cb, + void *pp_task_cb) +{ + return 0; +} + +static void kill_children(struct parallel_processes *pp, int signo) +{ + int i, n = pp->max_processes; + + for (i = 0; i < n; i++) + if (pp->children[i].state == GIT_CP_WORKING) + kill(pp->children[i].process.pid, signo); +} + +static struct parallel_processes *pp_for_signal; + +static void handle_children_on_signal(int signo) +{ + kill_children(pp_for_signal, signo); + sigchain_pop(signo); + raise(signo); +} + +static void pp_init(struct parallel_processes *pp, + int n, + get_next_task_fn get_next_task, + start_failure_fn start_failure, + task_finished_fn task_finished, + void *data) +{ + int i; + + if (n < 1) + n = online_cpus(); + + pp->max_processes = n; + + trace_printf("run_processes_parallel: preparing to run up to %d tasks", n); + + pp->data = data; + if (!get_next_task) + die("BUG: you need to specify a get_next_task function"); + pp->get_next_task = get_next_task; + + pp->start_failure = start_failure ? start_failure : default_start_failure; + pp->task_finished = task_finished ? task_finished : default_task_finished; + + pp->nr_processes = 0; + pp->output_owner = 0; + pp->shutdown = 0; + pp->children = xcalloc(n, sizeof(*pp->children)); + pp->pfd = xcalloc(n, sizeof(*pp->pfd)); + strbuf_init(&pp->buffered_output, 0); + + for (i = 0; i < n; i++) { + strbuf_init(&pp->children[i].err, 0); + child_process_init(&pp->children[i].process); + pp->pfd[i].events = POLLIN | POLLHUP; + pp->pfd[i].fd = -1; + } + + pp_for_signal = pp; + sigchain_push_common(handle_children_on_signal); +} + +static void pp_cleanup(struct parallel_processes *pp) +{ + int i; + + trace_printf("run_processes_parallel: done"); + for (i = 0; i < pp->max_processes; i++) { + strbuf_release(&pp->children[i].err); + child_process_clear(&pp->children[i].process); + } + + free(pp->children); + free(pp->pfd); + + /* + * When get_next_task added messages to the buffer in its last + * iteration, the buffered output is non empty. + */ + strbuf_write(&pp->buffered_output, stderr); + strbuf_release(&pp->buffered_output); + + sigchain_pop_common(); +} + +/* returns + * 0 if a new task was started. + * 1 if no new jobs was started (get_next_task ran out of work, non critical + * problem with starting a new command) + * <0 no new job was started, user wishes to shutdown early. Use negative code + * to signal the children. + */ +static int pp_start_one(struct parallel_processes *pp) +{ + int i, code; + + for (i = 0; i < pp->max_processes; i++) + if (pp->children[i].state == GIT_CP_FREE) + break; + if (i == pp->max_processes) + die("BUG: bookkeeping is hard"); + + code = pp->get_next_task(&pp->children[i].process, + &pp->children[i].err, + pp->data, + &pp->children[i].data); + if (!code) { + strbuf_addbuf(&pp->buffered_output, &pp->children[i].err); + strbuf_reset(&pp->children[i].err); + return 1; + } + pp->children[i].process.err = -1; + pp->children[i].process.stdout_to_stderr = 1; + pp->children[i].process.no_stdin = 1; + + if (start_command(&pp->children[i].process)) { + code = pp->start_failure(&pp->children[i].err, + pp->data, + &pp->children[i].data); + strbuf_addbuf(&pp->buffered_output, &pp->children[i].err); + strbuf_reset(&pp->children[i].err); + if (code) + pp->shutdown = 1; + return code; + } + + pp->nr_processes++; + pp->children[i].state = GIT_CP_WORKING; + pp->pfd[i].fd = pp->children[i].process.err; + return 0; +} + +static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout) +{ + int i; + + while ((i = poll(pp->pfd, pp->max_processes, output_timeout)) < 0) { + if (errno == EINTR) + continue; + pp_cleanup(pp); + die_errno("poll"); + } + + /* Buffer output from all pipes. */ + for (i = 0; i < pp->max_processes; i++) { + if (pp->children[i].state == GIT_CP_WORKING && + pp->pfd[i].revents & (POLLIN | POLLHUP)) { + int n = strbuf_read_once(&pp->children[i].err, + pp->children[i].process.err, 0); + if (n == 0) { + close(pp->children[i].process.err); + pp->children[i].state = GIT_CP_WAIT_CLEANUP; + } else if (n < 0) + if (errno != EAGAIN) + die_errno("read"); + } + } +} + +static void pp_output(struct parallel_processes *pp) +{ + int i = pp->output_owner; + if (pp->children[i].state == GIT_CP_WORKING && + pp->children[i].err.len) { + strbuf_write(&pp->children[i].err, stderr); + strbuf_reset(&pp->children[i].err); + } +} + +static int pp_collect_finished(struct parallel_processes *pp) +{ + int i, code; + int n = pp->max_processes; + int result = 0; + + while (pp->nr_processes > 0) { + for (i = 0; i < pp->max_processes; i++) + if (pp->children[i].state == GIT_CP_WAIT_CLEANUP) + break; + if (i == pp->max_processes) + break; + + code = finish_command(&pp->children[i].process); + + code = pp->task_finished(code, + &pp->children[i].err, pp->data, + &pp->children[i].data); + + if (code) + result = code; + if (code < 0) + break; + + pp->nr_processes--; + pp->children[i].state = GIT_CP_FREE; + pp->pfd[i].fd = -1; + child_process_init(&pp->children[i].process); + + if (i != pp->output_owner) { + strbuf_addbuf(&pp->buffered_output, &pp->children[i].err); + strbuf_reset(&pp->children[i].err); + } else { + strbuf_write(&pp->children[i].err, stderr); + strbuf_reset(&pp->children[i].err); + + /* Output all other finished child processes */ + strbuf_write(&pp->buffered_output, stderr); + strbuf_reset(&pp->buffered_output); + + /* + * Pick next process to output live. + * NEEDSWORK: + * For now we pick it randomly by doing a round + * robin. Later we may want to pick the one with + * the most output or the longest or shortest + * running process time. + */ + for (i = 0; i < n; i++) + if (pp->children[(pp->output_owner + i) % n].state == GIT_CP_WORKING) + break; + pp->output_owner = (pp->output_owner + i) % n; + } + } + return result; +} + +int run_processes_parallel(int n, + get_next_task_fn get_next_task, + start_failure_fn start_failure, + task_finished_fn task_finished, + void *pp_cb) +{ + int i, code; + int output_timeout = 100; + int spawn_cap = 4; + struct parallel_processes pp; + + pp_init(&pp, n, get_next_task, start_failure, task_finished, pp_cb); + while (1) { + for (i = 0; + i < spawn_cap && !pp.shutdown && + pp.nr_processes < pp.max_processes; + i++) { + code = pp_start_one(&pp); + if (!code) + continue; + if (code < 0) { + pp.shutdown = 1; + kill_children(&pp, -code); + } + break; + } + if (!pp.nr_processes) + break; + pp_buffer_stderr(&pp, output_timeout); + pp_output(&pp); + code = pp_collect_finished(&pp); + if (code) { + pp.shutdown = 1; + if (code < 0) + kill_children(&pp, -code); + } + } + + pp_cleanup(&pp); + return 0; +} |