summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLibravatar Junio C Hamano <gitster@pobox.com>2016-01-12 15:16:54 -0800
committerLibravatar Junio C Hamano <gitster@pobox.com>2016-01-12 15:16:54 -0800
commit187c0d3d9e63f7d84d7055372f07bedb52849f06 (patch)
tree18a498fa6c6e77288756cde5dfb3a84ed7bd4062
parentMerge branch 'ps/push-delete-option' (diff)
parentsubmodules: allow parallel fetching, add tests and documentation (diff)
downloadtgif-187c0d3d9e63f7d84d7055372f07bedb52849f06.tar.xz
Merge branch 'sb/submodule-parallel-fetch'
Add a framework to spawn a group of processes in parallel, and use it to run "git fetch --recurse-submodules" in parallel. Rerolled and this seems to be a lot cleaner. The merge of the earlier one to 'next' has been reverted. * sb/submodule-parallel-fetch: submodules: allow parallel fetching, add tests and documentation fetch_populated_submodules: use new parallel job processing run-command: add an asynchronous parallel child processor sigchain: add command to pop all common signals strbuf: add strbuf_read_once to read without blocking xread: poll on non blocking fds submodule.c: write "Fetching submodule <foo>" to stderr
-rw-r--r--Documentation/fetch-options.txt7
-rw-r--r--builtin/fetch.c6
-rw-r--r--builtin/pull.c6
-rw-r--r--run-command.c335
-rw-r--r--run-command.h80
-rw-r--r--sigchain.c9
-rw-r--r--sigchain.h1
-rw-r--r--strbuf.c11
-rw-r--r--strbuf.h8
-rw-r--r--submodule.c141
-rw-r--r--submodule.h2
-rwxr-xr-xt/t0061-run-command.sh53
-rwxr-xr-xt/t5526-fetch-submodules.sh71
-rw-r--r--test-run-command.c55
-rw-r--r--wrapper.c20
15 files changed, 731 insertions, 74 deletions
diff --git a/Documentation/fetch-options.txt b/Documentation/fetch-options.txt
index 45583d8454..6b109f687a 100644
--- a/Documentation/fetch-options.txt
+++ b/Documentation/fetch-options.txt
@@ -100,6 +100,13 @@ ifndef::git-pull[]
reference to a commit that isn't already in the local submodule
clone.
+-j::
+--jobs=<n>::
+ Number of parallel children to be used for fetching submodules.
+ Each will fetch from different submodules, such that fetching many
+ submodules will be faster. By default submodules will be fetched
+ one at a time.
+
--no-recurse-submodules::
Disable recursive fetching of submodules (this has the same effect as
using the '--recurse-submodules=no' option).
diff --git a/builtin/fetch.c b/builtin/fetch.c
index c85f3471d4..586840d761 100644
--- a/builtin/fetch.c
+++ b/builtin/fetch.c
@@ -37,6 +37,7 @@ static int prune = -1; /* unspecified */
static int all, append, dry_run, force, keep, multiple, update_head_ok, verbosity;
static int progress = -1, recurse_submodules = RECURSE_SUBMODULES_DEFAULT;
static int tags = TAGS_DEFAULT, unshallow, update_shallow;
+static int max_children = 1;
static const char *depth;
static const char *upload_pack;
static struct strbuf default_rla = STRBUF_INIT;
@@ -99,6 +100,8 @@ static struct option builtin_fetch_options[] = {
N_("fetch all tags and associated objects"), TAGS_SET),
OPT_SET_INT('n', NULL, &tags,
N_("do not fetch all tags (--no-tags)"), TAGS_UNSET),
+ OPT_INTEGER('j', "jobs", &max_children,
+ N_("number of submodules fetched in parallel")),
OPT_BOOL('p', "prune", &prune,
N_("prune remote-tracking branches no longer on remote")),
{ OPTION_CALLBACK, 0, "recurse-submodules", NULL, N_("on-demand"),
@@ -1213,7 +1216,8 @@ int cmd_fetch(int argc, const char **argv, const char *prefix)
result = fetch_populated_submodules(&options,
submodule_prefix,
recurse_submodules,
- verbosity < 0);
+ verbosity < 0,
+ max_children);
argv_array_clear(&options);
}
diff --git a/builtin/pull.c b/builtin/pull.c
index 5145fc60a0..9e3c73809f 100644
--- a/builtin/pull.c
+++ b/builtin/pull.c
@@ -95,6 +95,7 @@ static int opt_force;
static char *opt_tags;
static char *opt_prune;
static char *opt_recurse_submodules;
+static char *max_children;
static int opt_dry_run;
static char *opt_keep;
static char *opt_depth;
@@ -178,6 +179,9 @@ static struct option pull_options[] = {
N_("on-demand"),
N_("control recursive fetching of submodules"),
PARSE_OPT_OPTARG),
+ OPT_PASSTHRU('j', "jobs", &max_children, N_("n"),
+ N_("number of submodules pulled in parallel"),
+ PARSE_OPT_OPTARG),
OPT_BOOL(0, "dry-run", &opt_dry_run,
N_("dry run")),
OPT_PASSTHRU('k', "keep", &opt_keep, NULL,
@@ -525,6 +529,8 @@ static int run_fetch(const char *repo, const char **refspecs)
argv_array_push(&args, opt_prune);
if (opt_recurse_submodules)
argv_array_push(&args, opt_recurse_submodules);
+ if (max_children)
+ argv_array_push(&args, max_children);
if (opt_dry_run)
argv_array_push(&args, "--dry-run");
if (opt_keep)
diff --git a/run-command.c b/run-command.c
index 13fa452e8c..51fd72c427 100644
--- a/run-command.c
+++ b/run-command.c
@@ -3,6 +3,8 @@
#include "exec_cmd.h"
#include "sigchain.h"
#include "argv-array.h"
+#include "thread-utils.h"
+#include "strbuf.h"
void child_process_init(struct child_process *child)
{
@@ -865,3 +867,336 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint)
close(cmd->out);
return finish_command(cmd);
}
+
+enum child_state {
+ GIT_CP_FREE,
+ GIT_CP_WORKING,
+ GIT_CP_WAIT_CLEANUP,
+};
+
+struct parallel_processes {
+ void *data;
+
+ int max_processes;
+ int nr_processes;
+
+ get_next_task_fn get_next_task;
+ start_failure_fn start_failure;
+ task_finished_fn task_finished;
+
+ struct {
+ enum child_state state;
+ struct child_process process;
+ struct strbuf err;
+ void *data;
+ } *children;
+ /*
+ * The struct pollfd is logically part of *children,
+ * but the system call expects it as its own array.
+ */
+ struct pollfd *pfd;
+
+ unsigned shutdown : 1;
+
+ int output_owner;
+ struct strbuf buffered_output; /* of finished children */
+};
+
+static int default_start_failure(struct child_process *cp,
+ struct strbuf *err,
+ void *pp_cb,
+ void *pp_task_cb)
+{
+ int i;
+
+ strbuf_addstr(err, "Starting a child failed:");
+ for (i = 0; cp->argv[i]; i++)
+ strbuf_addf(err, " %s", cp->argv[i]);
+
+ return 0;
+}
+
+static int default_task_finished(int result,
+ struct child_process *cp,
+ struct strbuf *err,
+ void *pp_cb,
+ void *pp_task_cb)
+{
+ int i;
+
+ if (!result)
+ return 0;
+
+ strbuf_addf(err, "A child failed with return code %d:", result);
+ for (i = 0; cp->argv[i]; i++)
+ strbuf_addf(err, " %s", cp->argv[i]);
+
+ return 0;
+}
+
+static void kill_children(struct parallel_processes *pp, int signo)
+{
+ int i, n = pp->max_processes;
+
+ for (i = 0; i < n; i++)
+ if (pp->children[i].state == GIT_CP_WORKING)
+ kill(pp->children[i].process.pid, signo);
+}
+
+static struct parallel_processes *pp_for_signal;
+
+static void handle_children_on_signal(int signo)
+{
+ kill_children(pp_for_signal, signo);
+ sigchain_pop(signo);
+ raise(signo);
+}
+
+static void pp_init(struct parallel_processes *pp,
+ int n,
+ get_next_task_fn get_next_task,
+ start_failure_fn start_failure,
+ task_finished_fn task_finished,
+ void *data)
+{
+ int i;
+
+ if (n < 1)
+ n = online_cpus();
+
+ pp->max_processes = n;
+
+ trace_printf("run_processes_parallel: preparing to run up to %d tasks", n);
+
+ pp->data = data;
+ if (!get_next_task)
+ die("BUG: you need to specify a get_next_task function");
+ pp->get_next_task = get_next_task;
+
+ pp->start_failure = start_failure ? start_failure : default_start_failure;
+ pp->task_finished = task_finished ? task_finished : default_task_finished;
+
+ pp->nr_processes = 0;
+ pp->output_owner = 0;
+ pp->shutdown = 0;
+ pp->children = xcalloc(n, sizeof(*pp->children));
+ pp->pfd = xcalloc(n, sizeof(*pp->pfd));
+ strbuf_init(&pp->buffered_output, 0);
+
+ for (i = 0; i < n; i++) {
+ strbuf_init(&pp->children[i].err, 0);
+ child_process_init(&pp->children[i].process);
+ pp->pfd[i].events = POLLIN | POLLHUP;
+ pp->pfd[i].fd = -1;
+ }
+
+ pp_for_signal = pp;
+ sigchain_push_common(handle_children_on_signal);
+}
+
+static void pp_cleanup(struct parallel_processes *pp)
+{
+ int i;
+
+ trace_printf("run_processes_parallel: done");
+ for (i = 0; i < pp->max_processes; i++) {
+ strbuf_release(&pp->children[i].err);
+ child_process_clear(&pp->children[i].process);
+ }
+
+ free(pp->children);
+ free(pp->pfd);
+
+ /*
+ * When get_next_task added messages to the buffer in its last
+ * iteration, the buffered output is non empty.
+ */
+ fputs(pp->buffered_output.buf, stderr);
+ strbuf_release(&pp->buffered_output);
+
+ sigchain_pop_common();
+}
+
+/* returns
+ * 0 if a new task was started.
+ * 1 if no new jobs was started (get_next_task ran out of work, non critical
+ * problem with starting a new command)
+ * <0 no new job was started, user wishes to shutdown early. Use negative code
+ * to signal the children.
+ */
+static int pp_start_one(struct parallel_processes *pp)
+{
+ int i, code;
+
+ for (i = 0; i < pp->max_processes; i++)
+ if (pp->children[i].state == GIT_CP_FREE)
+ break;
+ if (i == pp->max_processes)
+ die("BUG: bookkeeping is hard");
+
+ code = pp->get_next_task(&pp->children[i].process,
+ &pp->children[i].err,
+ pp->data,
+ &pp->children[i].data);
+ if (!code) {
+ strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
+ strbuf_reset(&pp->children[i].err);
+ return 1;
+ }
+ pp->children[i].process.err = -1;
+ pp->children[i].process.stdout_to_stderr = 1;
+ pp->children[i].process.no_stdin = 1;
+
+ if (start_command(&pp->children[i].process)) {
+ code = pp->start_failure(&pp->children[i].process,
+ &pp->children[i].err,
+ pp->data,
+ &pp->children[i].data);
+ strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
+ strbuf_reset(&pp->children[i].err);
+ if (code)
+ pp->shutdown = 1;
+ return code;
+ }
+
+ pp->nr_processes++;
+ pp->children[i].state = GIT_CP_WORKING;
+ pp->pfd[i].fd = pp->children[i].process.err;
+ return 0;
+}
+
+static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout)
+{
+ int i;
+
+ while ((i = poll(pp->pfd, pp->max_processes, output_timeout)) < 0) {
+ if (errno == EINTR)
+ continue;
+ pp_cleanup(pp);
+ die_errno("poll");
+ }
+
+ /* Buffer output from all pipes. */
+ for (i = 0; i < pp->max_processes; i++) {
+ if (pp->children[i].state == GIT_CP_WORKING &&
+ pp->pfd[i].revents & (POLLIN | POLLHUP)) {
+ int n = strbuf_read_once(&pp->children[i].err,
+ pp->children[i].process.err, 0);
+ if (n == 0) {
+ close(pp->children[i].process.err);
+ pp->children[i].state = GIT_CP_WAIT_CLEANUP;
+ } else if (n < 0)
+ if (errno != EAGAIN)
+ die_errno("read");
+ }
+ }
+}
+
+static void pp_output(struct parallel_processes *pp)
+{
+ int i = pp->output_owner;
+ if (pp->children[i].state == GIT_CP_WORKING &&
+ pp->children[i].err.len) {
+ fputs(pp->children[i].err.buf, stderr);
+ strbuf_reset(&pp->children[i].err);
+ }
+}
+
+static int pp_collect_finished(struct parallel_processes *pp)
+{
+ int i, code;
+ int n = pp->max_processes;
+ int result = 0;
+
+ while (pp->nr_processes > 0) {
+ for (i = 0; i < pp->max_processes; i++)
+ if (pp->children[i].state == GIT_CP_WAIT_CLEANUP)
+ break;
+ if (i == pp->max_processes)
+ break;
+
+ code = finish_command(&pp->children[i].process);
+
+ code = pp->task_finished(code, &pp->children[i].process,
+ &pp->children[i].err, pp->data,
+ &pp->children[i].data);
+
+ if (code)
+ result = code;
+ if (code < 0)
+ break;
+
+ pp->nr_processes--;
+ pp->children[i].state = GIT_CP_FREE;
+ pp->pfd[i].fd = -1;
+ child_process_init(&pp->children[i].process);
+
+ if (i != pp->output_owner) {
+ strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
+ strbuf_reset(&pp->children[i].err);
+ } else {
+ fputs(pp->children[i].err.buf, stderr);
+ strbuf_reset(&pp->children[i].err);
+
+ /* Output all other finished child processes */
+ fputs(pp->buffered_output.buf, stderr);
+ strbuf_reset(&pp->buffered_output);
+
+ /*
+ * Pick next process to output live.
+ * NEEDSWORK:
+ * For now we pick it randomly by doing a round
+ * robin. Later we may want to pick the one with
+ * the most output or the longest or shortest
+ * running process time.
+ */
+ for (i = 0; i < n; i++)
+ if (pp->children[(pp->output_owner + i) % n].state == GIT_CP_WORKING)
+ break;
+ pp->output_owner = (pp->output_owner + i) % n;
+ }
+ }
+ return result;
+}
+
+int run_processes_parallel(int n,
+ get_next_task_fn get_next_task,
+ start_failure_fn start_failure,
+ task_finished_fn task_finished,
+ void *pp_cb)
+{
+ int i, code;
+ int output_timeout = 100;
+ int spawn_cap = 4;
+ struct parallel_processes pp;
+
+ pp_init(&pp, n, get_next_task, start_failure, task_finished, pp_cb);
+ while (1) {
+ for (i = 0;
+ i < spawn_cap && !pp.shutdown &&
+ pp.nr_processes < pp.max_processes;
+ i++) {
+ code = pp_start_one(&pp);
+ if (!code)
+ continue;
+ if (code < 0) {
+ pp.shutdown = 1;
+ kill_children(&pp, -code);
+ }
+ break;
+ }
+ if (!pp.nr_processes)
+ break;
+ pp_buffer_stderr(&pp, output_timeout);
+ pp_output(&pp);
+ code = pp_collect_finished(&pp);
+ if (code) {
+ pp.shutdown = 1;
+ if (code < 0)
+ kill_children(&pp, -code);
+ }
+ }
+
+ pp_cleanup(&pp);
+ return 0;
+}
diff --git a/run-command.h b/run-command.h
index 12bb26c2a6..d5a57f9227 100644
--- a/run-command.h
+++ b/run-command.h
@@ -122,4 +122,84 @@ int start_async(struct async *async);
int finish_async(struct async *async);
int in_async(void);
+/**
+ * This callback should initialize the child process and preload the
+ * error channel if desired. The preloading of is useful if you want to
+ * have a message printed directly before the output of the child process.
+ * pp_cb is the callback cookie as passed to run_processes_parallel.
+ * You can store a child process specific callback cookie in pp_task_cb.
+ *
+ * Even after returning 0 to indicate that there are no more processes,
+ * this function will be called again until there are no more running
+ * child processes.
+ *
+ * Return 1 if the next child is ready to run.
+ * Return 0 if there are currently no more tasks to be processed.
+ * To send a signal to other child processes for abortion,
+ * return the negative signal number.
+ */
+typedef int (*get_next_task_fn)(struct child_process *cp,
+ struct strbuf *err,
+ void *pp_cb,
+ void **pp_task_cb);
+
+/**
+ * This callback is called whenever there are problems starting
+ * a new process.
+ *
+ * You must not write to stdout or stderr in this function. Add your
+ * message to the strbuf err instead, which will be printed without
+ * messing up the output of the other parallel processes.
+ *
+ * pp_cb is the callback cookie as passed into run_processes_parallel,
+ * pp_task_cb is the callback cookie as passed into get_next_task_fn.
+ *
+ * Return 0 to continue the parallel processing. To abort return non zero.
+ * To send a signal to other child processes for abortion, return
+ * the negative signal number.
+ */
+typedef int (*start_failure_fn)(struct child_process *cp,
+ struct strbuf *err,
+ void *pp_cb,
+ void *pp_task_cb);
+
+/**
+ * This callback is called on every child process that finished processing.
+ *
+ * You must not write to stdout or stderr in this function. Add your
+ * message to the strbuf err instead, which will be printed without
+ * messing up the output of the other parallel processes.
+ *
+ * pp_cb is the callback cookie as passed into run_processes_parallel,
+ * pp_task_cb is the callback cookie as passed into get_next_task_fn.
+ *
+ * Return 0 to continue the parallel processing. To abort return non zero.
+ * To send a signal to other child processes for abortion, return
+ * the negative signal number.
+ */
+typedef int (*task_finished_fn)(int result,
+ struct child_process *cp,
+ struct strbuf *err,
+ void *pp_cb,
+ void *pp_task_cb);
+
+/**
+ * Runs up to n processes at the same time. Whenever a process can be
+ * started, the callback get_next_task_fn is called to obtain the data
+ * required to start another child process.
+ *
+ * The children started via this function run in parallel. Their output
+ * (both stdout and stderr) is routed to stderr in a manner that output
+ * from different tasks does not interleave.
+ *
+ * If start_failure_fn or task_finished_fn are NULL, default handlers
+ * will be used. The default handlers will print an error message on
+ * error without issuing an emergency stop.
+ */
+int run_processes_parallel(int n,
+ get_next_task_fn,
+ start_failure_fn,
+ task_finished_fn,
+ void *pp_cb);
+
#endif
diff --git a/sigchain.c b/sigchain.c
index faa375d5d8..2ac43bbd28 100644
--- a/sigchain.c
+++ b/sigchain.c
@@ -50,3 +50,12 @@ void sigchain_push_common(sigchain_fun f)
sigchain_push(SIGQUIT, f);
sigchain_push(SIGPIPE, f);
}
+
+void sigchain_pop_common(void)
+{
+ sigchain_pop(SIGPIPE);
+ sigchain_pop(SIGQUIT);
+ sigchain_pop(SIGTERM);
+ sigchain_pop(SIGHUP);
+ sigchain_pop(SIGINT);
+}
diff --git a/sigchain.h b/sigchain.h
index 618083bce0..138b20f54b 100644
--- a/sigchain.h
+++ b/sigchain.h
@@ -7,5 +7,6 @@ int sigchain_push(int sig, sigchain_fun f);
int sigchain_pop(int sig);
void sigchain_push_common(sigchain_fun f);
+void sigchain_pop_common(void);
#endif /* SIGCHAIN_H */
diff --git a/strbuf.c b/strbuf.c
index d76f0aed85..38686ffb65 100644
--- a/strbuf.c
+++ b/strbuf.c
@@ -384,6 +384,17 @@ ssize_t strbuf_read(struct strbuf *sb, int fd, size_t hint)
return sb->len - oldlen;
}
+ssize_t strbuf_read_once(struct strbuf *sb, int fd, size_t hint)
+{
+ ssize_t cnt;
+
+ strbuf_grow(sb, hint ? hint : 8192);
+ cnt = xread(fd, sb->buf + sb->len, sb->alloc - sb->len - 1);
+ if (cnt > 0)
+ strbuf_setlen(sb, sb->len + cnt);
+ return cnt;
+}
+
#define STRBUF_MAXLINK (2*PATH_MAX)
int strbuf_readlink(struct strbuf *sb, const char *path, size_t hint)
diff --git a/strbuf.h b/strbuf.h
index 7123fca7af..2bf90e70fc 100644
--- a/strbuf.h
+++ b/strbuf.h
@@ -367,6 +367,14 @@ extern size_t strbuf_fread(struct strbuf *, size_t, FILE *);
extern ssize_t strbuf_read(struct strbuf *, int fd, size_t hint);
/**
+ * Read the contents of a given file descriptor partially by using only one
+ * attempt of xread. The third argument can be used to give a hint about the
+ * file size, to avoid reallocs. Returns the number of new bytes appended to
+ * the sb.
+ */
+extern ssize_t strbuf_read_once(struct strbuf *, int fd, size_t hint);
+
+/**
* Read the contents of a file, specified by its path. The third argument
* can be used to give a hint about the file size, to avoid reallocs.
*/
diff --git a/submodule.c b/submodule.c
index 14e76247bf..b83939c294 100644
--- a/submodule.c
+++ b/submodule.c
@@ -12,6 +12,7 @@
#include "sha1-array.h"
#include "argv-array.h"
#include "blob.h"
+#include "thread-utils.h"
static int config_fetch_recurse_submodules = RECURSE_SUBMODULES_ON_DEMAND;
static struct string_list changed_submodule_paths;
@@ -610,37 +611,28 @@ static void calculate_changed_submodule_paths(void)
initialized_fetch_ref_tips = 0;
}
-int fetch_populated_submodules(const struct argv_array *options,
- const char *prefix, int command_line_option,
- int quiet)
+struct submodule_parallel_fetch {
+ int count;
+ struct argv_array args;
+ const char *work_tree;
+ const char *prefix;
+ int command_line_option;
+ int quiet;
+ int result;
+};
+#define SPF_INIT {0, ARGV_ARRAY_INIT, NULL, NULL, 0, 0, 0}
+
+static int get_next_submodule(struct child_process *cp,
+ struct strbuf *err, void *data, void **task_cb)
{
- int i, result = 0;
- struct child_process cp = CHILD_PROCESS_INIT;
- struct argv_array argv = ARGV_ARRAY_INIT;
- const char *work_tree = get_git_work_tree();
- if (!work_tree)
- goto out;
-
- if (read_cache() < 0)
- die("index file corrupt");
-
- argv_array_push(&argv, "fetch");
- for (i = 0; i < options->argc; i++)
- argv_array_push(&argv, options->argv[i]);
- argv_array_push(&argv, "--recurse-submodules-default");
- /* default value, "--submodule-prefix" and its value are added later */
-
- cp.env = local_repo_env;
- cp.git_cmd = 1;
- cp.no_stdin = 1;
-
- calculate_changed_submodule_paths();
+ int ret = 0;
+ struct submodule_parallel_fetch *spf = data;
- for (i = 0; i < active_nr; i++) {
+ for (; spf->count < active_nr; spf->count++) {
struct strbuf submodule_path = STRBUF_INIT;
struct strbuf submodule_git_dir = STRBUF_INIT;
struct strbuf submodule_prefix = STRBUF_INIT;
- const struct cache_entry *ce = active_cache[i];
+ const struct cache_entry *ce = active_cache[spf->count];
const char *git_dir, *default_argv;
const struct submodule *submodule;
@@ -652,7 +644,7 @@ int fetch_populated_submodules(const struct argv_array *options,
submodule = submodule_from_name(null_sha1, ce->name);
default_argv = "yes";
- if (command_line_option == RECURSE_SUBMODULES_DEFAULT) {
+ if (spf->command_line_option == RECURSE_SUBMODULES_DEFAULT) {
if (submodule &&
submodule->fetch_recurse !=
RECURSE_SUBMODULES_NONE) {
@@ -675,40 +667,101 @@ int fetch_populated_submodules(const struct argv_array *options,
default_argv = "on-demand";
}
}
- } else if (command_line_option == RECURSE_SUBMODULES_ON_DEMAND) {
+ } else if (spf->command_line_option == RECURSE_SUBMODULES_ON_DEMAND) {
if (!unsorted_string_list_lookup(&changed_submodule_paths, ce->name))
continue;
default_argv = "on-demand";
}
- strbuf_addf(&submodule_path, "%s/%s", work_tree, ce->name);
+ strbuf_addf(&submodule_path, "%s/%s", spf->work_tree, ce->name);
strbuf_addf(&submodule_git_dir, "%s/.git", submodule_path.buf);
- strbuf_addf(&submodule_prefix, "%s%s/", prefix, ce->name);
+ strbuf_addf(&submodule_prefix, "%s%s/", spf->prefix, ce->name);
git_dir = read_gitfile(submodule_git_dir.buf);
if (!git_dir)
git_dir = submodule_git_dir.buf;
if (is_directory(git_dir)) {
- if (!quiet)
- printf("Fetching submodule %s%s\n", prefix, ce->name);
- cp.dir = submodule_path.buf;
- argv_array_push(&argv, default_argv);
- argv_array_push(&argv, "--submodule-prefix");
- argv_array_push(&argv, submodule_prefix.buf);
- cp.argv = argv.argv;
- if (run_command(&cp))
- result = 1;
- argv_array_pop(&argv);
- argv_array_pop(&argv);
- argv_array_pop(&argv);
+ child_process_init(cp);
+ cp->dir = strbuf_detach(&submodule_path, NULL);
+ cp->env = local_repo_env;
+ cp->git_cmd = 1;
+ if (!spf->quiet)
+ strbuf_addf(err, "Fetching submodule %s%s\n",
+ spf->prefix, ce->name);
+ argv_array_init(&cp->args);
+ argv_array_pushv(&cp->args, spf->args.argv);
+ argv_array_push(&cp->args, default_argv);
+ argv_array_push(&cp->args, "--submodule-prefix");
+ argv_array_push(&cp->args, submodule_prefix.buf);
+ ret = 1;
}
strbuf_release(&submodule_path);
strbuf_release(&submodule_git_dir);
strbuf_release(&submodule_prefix);
+ if (ret) {
+ spf->count++;
+ return 1;
+ }
}
- argv_array_clear(&argv);
+ return 0;
+}
+
+static int fetch_start_failure(struct child_process *cp,
+ struct strbuf *err,
+ void *cb, void *task_cb)
+{
+ struct submodule_parallel_fetch *spf = cb;
+
+ spf->result = 1;
+
+ return 0;
+}
+
+static int fetch_finish(int retvalue, struct child_process *cp,
+ struct strbuf *err, void *cb, void *task_cb)
+{
+ struct submodule_parallel_fetch *spf = cb;
+
+ if (retvalue)
+ spf->result = 1;
+
+ return 0;
+}
+
+int fetch_populated_submodules(const struct argv_array *options,
+ const char *prefix, int command_line_option,
+ int quiet, int max_parallel_jobs)
+{
+ int i;
+ struct submodule_parallel_fetch spf = SPF_INIT;
+
+ spf.work_tree = get_git_work_tree();
+ spf.command_line_option = command_line_option;
+ spf.quiet = quiet;
+ spf.prefix = prefix;
+
+ if (!spf.work_tree)
+ goto out;
+
+ if (read_cache() < 0)
+ die("index file corrupt");
+
+ argv_array_push(&spf.args, "fetch");
+ for (i = 0; i < options->argc; i++)
+ argv_array_push(&spf.args, options->argv[i]);
+ argv_array_push(&spf.args, "--recurse-submodules-default");
+ /* default value, "--submodule-prefix" and its value are added later */
+
+ calculate_changed_submodule_paths();
+ run_processes_parallel(max_parallel_jobs,
+ get_next_submodule,
+ fetch_start_failure,
+ fetch_finish,
+ &spf);
+
+ argv_array_clear(&spf.args);
out:
string_list_clear(&changed_submodule_paths, 1);
- return result;
+ return spf.result;
}
unsigned is_submodule_modified(const char *path, int ignore_untracked)
diff --git a/submodule.h b/submodule.h
index ddff512109..e06eaa5ebb 100644
--- a/submodule.h
+++ b/submodule.h
@@ -32,7 +32,7 @@ void set_config_fetch_recurse_submodules(int value);
void check_for_new_submodule_commits(unsigned char new_sha1[20]);
int fetch_populated_submodules(const struct argv_array *options,
const char *prefix, int command_line_option,
- int quiet);
+ int quiet, int max_parallel_jobs);
unsigned is_submodule_modified(const char *path, int ignore_untracked);
int submodule_uses_gitfile(const char *path);
int ok_to_remove_submodule(const char *path);
diff --git a/t/t0061-run-command.sh b/t/t0061-run-command.sh
index 9acf628726..12228b4aa6 100755
--- a/t/t0061-run-command.sh
+++ b/t/t0061-run-command.sh
@@ -47,4 +47,57 @@ test_expect_success POSIXPERM,SANITY 'unreadable directory in PATH' '
test_cmp expect actual
'
+cat >expect <<-EOF
+preloaded output of a child
+Hello
+World
+preloaded output of a child
+Hello
+World
+preloaded output of a child
+Hello
+World
+preloaded output of a child
+Hello
+World
+EOF
+
+test_expect_success 'run_command runs in parallel with more jobs available than tasks' '
+ test-run-command run-command-parallel 5 sh -c "printf \"%s\n%s\n\" Hello World" 2>actual &&
+ test_cmp expect actual
+'
+
+test_expect_success 'run_command runs in parallel with as many jobs as tasks' '
+ test-run-command run-command-parallel 4 sh -c "printf \"%s\n%s\n\" Hello World" 2>actual &&
+ test_cmp expect actual
+'
+
+test_expect_success 'run_command runs in parallel with more tasks than jobs available' '
+ test-run-command run-command-parallel 3 sh -c "printf \"%s\n%s\n\" Hello World" 2>actual &&
+ test_cmp expect actual
+'
+
+cat >expect <<-EOF
+preloaded output of a child
+asking for a quick stop
+preloaded output of a child
+asking for a quick stop
+preloaded output of a child
+asking for a quick stop
+EOF
+
+test_expect_success 'run_command is asked to abort gracefully' '
+ test-run-command run-command-abort 3 false 2>actual &&
+ test_cmp expect actual
+'
+
+cat >expect <<-EOF
+no further jobs available
+EOF
+
+test_expect_success 'run_command outputs ' '
+ test-run-command run-command-no-jobs 3 sh -c "printf \"%s\n%s\n\" Hello World" 2>actual &&
+ test_cmp expect actual
+'
+
test_done
diff --git a/t/t5526-fetch-submodules.sh b/t/t5526-fetch-submodules.sh
index a4532b00d6..1241146227 100755
--- a/t/t5526-fetch-submodules.sh
+++ b/t/t5526-fetch-submodules.sh
@@ -16,7 +16,8 @@ add_upstream_commit() {
git add subfile &&
git commit -m new subfile &&
head2=$(git rev-parse --short HEAD) &&
- echo "From $pwd/submodule" > ../expect.err &&
+ echo "Fetching submodule submodule" > ../expect.err &&
+ echo "From $pwd/submodule" >> ../expect.err &&
echo " $head1..$head2 master -> origin/master" >> ../expect.err
) &&
(
@@ -27,6 +28,7 @@ add_upstream_commit() {
git add deepsubfile &&
git commit -m new deepsubfile &&
head2=$(git rev-parse --short HEAD) &&
+ echo "Fetching submodule submodule/subdir/deepsubmodule" >> ../expect.err
echo "From $pwd/deepsubmodule" >> ../expect.err &&
echo " $head1..$head2 master -> origin/master" >> ../expect.err
)
@@ -56,9 +58,7 @@ test_expect_success setup '
(
cd downstream &&
git submodule update --init --recursive
- ) &&
- echo "Fetching submodule submodule" > expect.out &&
- echo "Fetching submodule submodule/subdir/deepsubmodule" >> expect.out
+ )
'
test_expect_success "fetch --recurse-submodules recurses into submodules" '
@@ -67,10 +67,21 @@ test_expect_success "fetch --recurse-submodules recurses into submodules" '
cd downstream &&
git fetch --recurse-submodules >../actual.out 2>../actual.err
) &&
- test_i18ncmp expect.out actual.out &&
+ test_must_be_empty actual.out &&
test_i18ncmp expect.err actual.err
'
+test_expect_success "fetch --recurse-submodules -j2 has the same output behaviour" '
+ add_upstream_commit &&
+ (
+ cd downstream &&
+ GIT_TRACE=$(pwd)/../trace.out git fetch --recurse-submodules -j2 2>../actual.err
+ ) &&
+ test_must_be_empty actual.out &&
+ test_i18ncmp expect.err actual.err &&
+ grep "2 tasks" trace.out
+'
+
test_expect_success "fetch alone only fetches superproject" '
add_upstream_commit &&
(
@@ -96,7 +107,7 @@ test_expect_success "using fetchRecurseSubmodules=true in .gitmodules recurses i
git config -f .gitmodules submodule.submodule.fetchRecurseSubmodules true &&
git fetch >../actual.out 2>../actual.err
) &&
- test_i18ncmp expect.out actual.out &&
+ test_must_be_empty actual.out &&
test_i18ncmp expect.err actual.err
'
@@ -127,7 +138,7 @@ test_expect_success "--recurse-submodules overrides fetchRecurseSubmodules setti
git config --unset -f .gitmodules submodule.submodule.fetchRecurseSubmodules &&
git config --unset submodule.submodule.fetchRecurseSubmodules
) &&
- test_i18ncmp expect.out actual.out &&
+ test_must_be_empty actual.out &&
test_i18ncmp expect.err actual.err
'
@@ -140,13 +151,22 @@ test_expect_success "--quiet propagates to submodules" '
! test -s actual.err
'
+test_expect_success "--quiet propagates to parallel submodules" '
+ (
+ cd downstream &&
+ git fetch --recurse-submodules -j 2 --quiet >../actual.out 2>../actual.err
+ ) &&
+ ! test -s actual.out &&
+ ! test -s actual.err
+'
+
test_expect_success "--dry-run propagates to submodules" '
add_upstream_commit &&
(
cd downstream &&
git fetch --recurse-submodules --dry-run >../actual.out 2>../actual.err
) &&
- test_i18ncmp expect.out actual.out &&
+ test_must_be_empty actual.out &&
test_i18ncmp expect.err actual.err
'
@@ -155,7 +175,7 @@ test_expect_success "Without --dry-run propagates to submodules" '
cd downstream &&
git fetch --recurse-submodules >../actual.out 2>../actual.err
) &&
- test_i18ncmp expect.out actual.out &&
+ test_must_be_empty actual.out &&
test_i18ncmp expect.err actual.err
'
@@ -166,7 +186,7 @@ test_expect_success "recurseSubmodules=true propagates into submodules" '
git config fetch.recurseSubmodules true
git fetch >../actual.out 2>../actual.err
) &&
- test_i18ncmp expect.out actual.out &&
+ test_must_be_empty actual.out &&
test_i18ncmp expect.err actual.err
'
@@ -180,7 +200,7 @@ test_expect_success "--recurse-submodules overrides config in submodule" '
) &&
git fetch --recurse-submodules >../actual.out 2>../actual.err
) &&
- test_i18ncmp expect.out actual.out &&
+ test_must_be_empty actual.out &&
test_i18ncmp expect.err actual.err
'
@@ -214,16 +234,15 @@ test_expect_success "Recursion stops when no new submodule commits are fetched"
git add submodule &&
git commit -m "new submodule" &&
head2=$(git rev-parse --short HEAD) &&
- echo "Fetching submodule submodule" > expect.out.sub &&
echo "From $pwd/." > expect.err.sub &&
echo " $head1..$head2 master -> origin/master" >>expect.err.sub &&
- head -2 expect.err >> expect.err.sub &&
+ head -3 expect.err >> expect.err.sub &&
(
cd downstream &&
git fetch >../actual.out 2>../actual.err
) &&
test_i18ncmp expect.err.sub actual.err &&
- test_i18ncmp expect.out.sub actual.out
+ test_must_be_empty actual.out
'
test_expect_success "Recursion doesn't happen when new superproject commits don't change any submodules" '
@@ -269,7 +288,7 @@ test_expect_success "Recursion picks up config in submodule" '
)
) &&
test_i18ncmp expect.err.sub actual.err &&
- test_i18ncmp expect.out actual.out
+ test_must_be_empty actual.out
'
test_expect_success "Recursion picks up all submodules when necessary" '
@@ -285,7 +304,8 @@ test_expect_success "Recursion picks up all submodules when necessary" '
git add subdir/deepsubmodule &&
git commit -m "new deepsubmodule"
head2=$(git rev-parse --short HEAD) &&
- echo "From $pwd/submodule" > ../expect.err.sub &&
+ echo "Fetching submodule submodule" > ../expect.err.sub &&
+ echo "From $pwd/submodule" >> ../expect.err.sub &&
echo " $head1..$head2 master -> origin/master" >> ../expect.err.sub
) &&
head1=$(git rev-parse --short HEAD) &&
@@ -295,13 +315,13 @@ test_expect_success "Recursion picks up all submodules when necessary" '
echo "From $pwd/." > expect.err.2 &&
echo " $head1..$head2 master -> origin/master" >> expect.err.2 &&
cat expect.err.sub >> expect.err.2 &&
- tail -2 expect.err >> expect.err.2 &&
+ tail -3 expect.err >> expect.err.2 &&
(
cd downstream &&
git fetch >../actual.out 2>../actual.err
) &&
test_i18ncmp expect.err.2 actual.err &&
- test_i18ncmp expect.out actual.out
+ test_must_be_empty actual.out
'
test_expect_success "'--recurse-submodules=on-demand' doesn't recurse when no new commits are fetched in the superproject (and ignores config)" '
@@ -317,7 +337,8 @@ test_expect_success "'--recurse-submodules=on-demand' doesn't recurse when no ne
git add subdir/deepsubmodule &&
git commit -m "new deepsubmodule" &&
head2=$(git rev-parse --short HEAD) &&
- echo "From $pwd/submodule" > ../expect.err.sub &&
+ echo Fetching submodule submodule > ../expect.err.sub &&
+ echo "From $pwd/submodule" >> ../expect.err.sub &&
echo " $head1..$head2 master -> origin/master" >> ../expect.err.sub
) &&
(
@@ -335,7 +356,7 @@ test_expect_success "'--recurse-submodules=on-demand' recurses as deep as necess
git add submodule &&
git commit -m "new submodule" &&
head2=$(git rev-parse --short HEAD) &&
- tail -2 expect.err > expect.err.deepsub &&
+ tail -3 expect.err > expect.err.deepsub &&
echo "From $pwd/." > expect.err &&
echo " $head1..$head2 master -> origin/master" >>expect.err &&
cat expect.err.sub >> expect.err &&
@@ -354,7 +375,7 @@ test_expect_success "'--recurse-submodules=on-demand' recurses as deep as necess
git config --unset -f .gitmodules submodule.subdir/deepsubmodule.fetchRecursive
)
) &&
- test_i18ncmp expect.out actual.out &&
+ test_must_be_empty actual.out &&
test_i18ncmp expect.err actual.err
'
@@ -388,7 +409,7 @@ test_expect_success "'fetch.recurseSubmodules=on-demand' overrides global config
head2=$(git rev-parse --short HEAD) &&
echo "From $pwd/." > expect.err.2 &&
echo " $head1..$head2 master -> origin/master" >>expect.err.2 &&
- head -2 expect.err >> expect.err.2 &&
+ head -3 expect.err >> expect.err.2 &&
(
cd downstream &&
git config fetch.recurseSubmodules on-demand &&
@@ -399,7 +420,7 @@ test_expect_success "'fetch.recurseSubmodules=on-demand' overrides global config
cd downstream &&
git config --unset fetch.recurseSubmodules
) &&
- test_i18ncmp expect.out.sub actual.out &&
+ test_must_be_empty actual.out &&
test_i18ncmp expect.err.2 actual.err
'
@@ -416,7 +437,7 @@ test_expect_success "'submodule.<sub>.fetchRecurseSubmodules=on-demand' override
head2=$(git rev-parse --short HEAD) &&
echo "From $pwd/." > expect.err.2 &&
echo " $head1..$head2 master -> origin/master" >>expect.err.2 &&
- head -2 expect.err >> expect.err.2 &&
+ head -3 expect.err >> expect.err.2 &&
(
cd downstream &&
git config submodule.submodule.fetchRecurseSubmodules on-demand &&
@@ -427,7 +448,7 @@ test_expect_success "'submodule.<sub>.fetchRecurseSubmodules=on-demand' override
cd downstream &&
git config --unset submodule.submodule.fetchRecurseSubmodules
) &&
- test_i18ncmp expect.out.sub actual.out &&
+ test_must_be_empty actual.out &&
test_i18ncmp expect.err.2 actual.err
'
diff --git a/test-run-command.c b/test-run-command.c
index 89c7de2c60..fbe0a27ef3 100644
--- a/test-run-command.c
+++ b/test-run-command.c
@@ -10,16 +10,54 @@
#include "git-compat-util.h"
#include "run-command.h"
+#include "argv-array.h"
+#include "strbuf.h"
#include <string.h>
#include <errno.h>
+static int number_callbacks;
+static int parallel_next(struct child_process *cp,
+ struct strbuf *err,
+ void *cb,
+ void **task_cb)
+{
+ struct child_process *d = cb;
+ if (number_callbacks >= 4)
+ return 0;
+
+ argv_array_pushv(&cp->args, d->argv);
+ strbuf_addf(err, "preloaded output of a child\n");
+ number_callbacks++;
+ return 1;
+}
+
+static int no_job(struct child_process *cp,
+ struct strbuf *err,
+ void *cb,
+ void **task_cb)
+{
+ strbuf_addf(err, "no further jobs available\n");
+ return 0;
+}
+
+static int task_finished(int result,
+ struct child_process *cp,
+ struct strbuf *err,
+ void *pp_cb,
+ void *pp_task_cb)
+{
+ strbuf_addf(err, "asking for a quick stop\n");
+ return 1;
+}
+
int main(int argc, char **argv)
{
struct child_process proc = CHILD_PROCESS_INIT;
+ int jobs;
if (argc < 3)
return 1;
- proc.argv = (const char **)argv+2;
+ proc.argv = (const char **)argv + 2;
if (!strcmp(argv[1], "start-command-ENOENT")) {
if (start_command(&proc) < 0 && errno == ENOENT)
@@ -30,6 +68,21 @@ int main(int argc, char **argv)
if (!strcmp(argv[1], "run-command"))
exit(run_command(&proc));
+ jobs = atoi(argv[2]);
+ proc.argv = (const char **)argv + 3;
+
+ if (!strcmp(argv[1], "run-command-parallel"))
+ exit(run_processes_parallel(jobs, parallel_next,
+ NULL, NULL, &proc));
+
+ if (!strcmp(argv[1], "run-command-abort"))
+ exit(run_processes_parallel(jobs, parallel_next,
+ NULL, task_finished, &proc));
+
+ if (!strcmp(argv[1], "run-command-no-jobs"))
+ exit(run_processes_parallel(jobs, no_job,
+ NULL, task_finished, &proc));
+
fprintf(stderr, "check usage\n");
return 1;
}
diff --git a/wrapper.c b/wrapper.c
index c95e2906b8..b43d437630 100644
--- a/wrapper.c
+++ b/wrapper.c
@@ -236,8 +236,24 @@ ssize_t xread(int fd, void *buf, size_t len)
len = MAX_IO_SIZE;
while (1) {
nr = read(fd, buf, len);
- if ((nr < 0) && (errno == EAGAIN || errno == EINTR))
- continue;
+ if (nr < 0) {
+ if (errno == EINTR)
+ continue;
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ struct pollfd pfd;
+ pfd.events = POLLIN;
+ pfd.fd = fd;
+ /*
+ * it is OK if this poll() failed; we
+ * want to leave this infinite loop
+ * only when read() returns with
+ * success, or an expected failure,
+ * which would be checked by the next
+ * call to read(2).
+ */
+ poll(&pfd, 1, -1);
+ }
+ }
return nr;
}
}