diff options
Diffstat (limited to 'git_remote_helpers/git/git.py')
-rw-r--r-- | git_remote_helpers/git/git.py | 678 |
1 files changed, 0 insertions, 678 deletions
diff --git a/git_remote_helpers/git/git.py b/git_remote_helpers/git/git.py deleted file mode 100644 index 007a1bfdf3..0000000000 --- a/git_remote_helpers/git/git.py +++ /dev/null @@ -1,678 +0,0 @@ -#!/usr/bin/env python - -"""Functionality for interacting with Git repositories. - -This module provides classes for interfacing with a Git repository. -""" - -import os -import re -import time -from binascii import hexlify -from cStringIO import StringIO -import unittest - -from git_remote_helpers.util import debug, error, die, start_command, run_command - - -def get_git_dir (): - """Return the path to the GIT_DIR for this repo.""" - args = ("git", "rev-parse", "--git-dir") - exit_code, output, errors = run_command(args) - if exit_code: - die("Failed to retrieve git dir") - assert not errors - return output.strip() - - -def parse_git_config (): - """Return a dict containing the parsed version of 'git config -l'.""" - exit_code, output, errors = run_command(("git", "config", "-z", "-l")) - if exit_code: - die("Failed to retrieve git configuration") - assert not errors - return dict([e.split('\n', 1) for e in output.split("\0") if e]) - - -def git_config_bool (value): - """Convert the given git config string value to True or False. - - Raise ValueError if the given string was not recognized as a - boolean value. - - """ - norm_value = str(value).strip().lower() - if norm_value in ("true", "1", "yes", "on", ""): - return True - if norm_value in ("false", "0", "no", "off", "none"): - return False - raise ValueError("Failed to parse '%s' into a boolean value" % (value)) - - -def valid_git_ref (ref_name): - """Return True iff the given ref name is a valid git ref name.""" - # The following is a reimplementation of the git check-ref-format - # command. The rules were derived from the git check-ref-format(1) - # manual page. This code should be replaced by a call to - # check_refname_format() in the git library, when such is available. - if ref_name.endswith('/') or \ - ref_name.startswith('.') or \ - ref_name.count('/.') or \ - ref_name.count('..') or \ - ref_name.endswith('.lock'): - return False - for c in ref_name: - if ord(c) < 0x20 or ord(c) == 0x7f or c in " ~^:?*[": - return False - return True - - -class GitObjectFetcher(object): - - """Provide parsed access to 'git cat-file --batch'. - - This provides a read-only interface to the Git object database. - - """ - - def __init__ (self): - """Initiate a 'git cat-file --batch' session.""" - self.queue = [] # List of object names to be submitted - self.in_transit = None # Object name currently in transit - - # 'git cat-file --batch' produces binary output which is likely - # to be corrupted by the default "rU"-mode pipe opened by - # start_command. (Mode == "rU" does universal new-line - # conversion, which mangles carriage returns.) Therefore, we - # open an explicitly binary-safe pipe for transferring the - # output from 'git cat-file --batch'. - pipe_r_fd, pipe_w_fd = os.pipe() - pipe_r = os.fdopen(pipe_r_fd, "rb") - pipe_w = os.fdopen(pipe_w_fd, "wb") - self.proc = start_command(("git", "cat-file", "--batch"), - stdout = pipe_w) - self.f = pipe_r - - def __del__ (self): - """Verify completed communication with 'git cat-file --batch'.""" - assert not self.queue - assert self.in_transit is None - self.proc.stdin.close() - assert self.proc.wait() == 0 # Zero exit code - assert self.f.read() == "" # No remaining output - - def _submit_next_object (self): - """Submit queue items to the 'git cat-file --batch' process. - - If there are items in the queue, and there is currently no item - currently in 'transit', then pop the first item off the queue, - and submit it. - - """ - if self.queue and self.in_transit is None: - self.in_transit = self.queue.pop(0) - print >> self.proc.stdin, self.in_transit[0] - - def push (self, obj, callback): - """Push the given object name onto the queue. - - The given callback function will at some point in the future - be called exactly once with the following arguments: - - self - this GitObjectFetcher instance - - obj - the object name provided to push() - - sha1 - the SHA1 of the object, if 'None' obj is missing - - t - the type of the object (tag/commit/tree/blob) - - size - the size of the object in bytes - - data - the object contents - - """ - self.queue.append((obj, callback)) - self._submit_next_object() # (Re)start queue processing - - def process_next_entry (self): - """Read the next entry off the queue and invoke callback.""" - obj, cb = self.in_transit - self.in_transit = None - header = self.f.readline() - if header == "%s missing\n" % (obj): - cb(self, obj, None, None, None, None) - return - sha1, t, size = header.split(" ") - assert len(sha1) == 40 - assert t in ("tag", "commit", "tree", "blob") - assert size.endswith("\n") - size = int(size.strip()) - data = self.f.read(size) - assert self.f.read(1) == "\n" - cb(self, obj, sha1, t, size, data) - self._submit_next_object() - - def process (self): - """Process the current queue until empty.""" - while self.in_transit is not None: - self.process_next_entry() - - # High-level convenience methods: - - def get_sha1 (self, objspec): - """Return the SHA1 of the object specified by 'objspec'. - - Return None if 'objspec' does not specify an existing object. - - """ - class _ObjHandler(object): - """Helper class for getting the returned SHA1.""" - def __init__ (self, parser): - self.parser = parser - self.sha1 = None - - def __call__ (self, parser, obj, sha1, t, size, data): - # FIXME: Many unused arguments. Could this be cheaper? - assert parser == self.parser - self.sha1 = sha1 - - handler = _ObjHandler(self) - self.push(objspec, handler) - self.process() - return handler.sha1 - - def open_obj (self, objspec): - """Return a file object wrapping the contents of a named object. - - The caller is responsible for calling .close() on the returned - file object. - - Raise KeyError if 'objspec' does not exist in the repo. - - """ - class _ObjHandler(object): - """Helper class for parsing the returned git object.""" - def __init__ (self, parser): - """Set up helper.""" - self.parser = parser - self.contents = StringIO() - self.err = None - - def __call__ (self, parser, obj, sha1, t, size, data): - """Git object callback (see GitObjectFetcher documentation).""" - assert parser == self.parser - if not sha1: # Missing object - self.err = "Missing object '%s'" % obj - else: - assert size == len(data) - self.contents.write(data) - - handler = _ObjHandler(self) - self.push(objspec, handler) - self.process() - if handler.err: - raise KeyError(handler.err) - handler.contents.seek(0) - return handler.contents - - def walk_tree (self, tree_objspec, callback, prefix = ""): - """Recursively walk the given Git tree object. - - Recursively walk all subtrees of the given tree object, and - invoke the given callback passing three arguments: - (path, mode, data) with the path, permission bits, and contents - of all the blobs found in the entire tree structure. - - """ - class _ObjHandler(object): - """Helper class for walking a git tree structure.""" - def __init__ (self, parser, cb, path, mode = None): - """Set up helper.""" - self.parser = parser - self.cb = cb - self.path = path - self.mode = mode - self.err = None - - def parse_tree (self, treedata): - """Parse tree object data, yield tree entries. - - Each tree entry is a 3-tuple (mode, sha1, path) - - self.path is prepended to all paths yielded - from this method. - - """ - while treedata: - mode = int(treedata[:6], 10) - # Turn 100xxx into xxx - if mode > 100000: - mode -= 100000 - assert treedata[6] == " " - i = treedata.find("\0", 7) - assert i > 0 - path = treedata[7:i] - sha1 = hexlify(treedata[i + 1: i + 21]) - yield (mode, sha1, self.path + path) - treedata = treedata[i + 21:] - - def __call__ (self, parser, obj, sha1, t, size, data): - """Git object callback (see GitObjectFetcher documentation).""" - assert parser == self.parser - if not sha1: # Missing object - self.err = "Missing object '%s'" % (obj) - return - assert size == len(data) - if t == "tree": - if self.path: - self.path += "/" - # Recurse into all blobs and subtrees - for m, s, p in self.parse_tree(data): - parser.push(s, - self.__class__(self.parser, self.cb, p, m)) - elif t == "blob": - self.cb(self.path, self.mode, data) - else: - raise ValueError("Unknown object type '%s'" % (t)) - - self.push(tree_objspec, _ObjHandler(self, callback, prefix)) - self.process() - - -class GitRefMap(object): - - """Map Git ref names to the Git object names they currently point to. - - Behaves like a dictionary of Git ref names -> Git object names. - - """ - - def __init__ (self, obj_fetcher): - """Create a new Git ref -> object map.""" - self.obj_fetcher = obj_fetcher - self._cache = {} # dict: refname -> objname - - def _load (self, ref): - """Retrieve the object currently bound to the given ref. - - The name of the object pointed to by the given ref is stored - into this mapping, and also returned. - - """ - if ref not in self._cache: - self._cache[ref] = self.obj_fetcher.get_sha1(ref) - return self._cache[ref] - - def __contains__ (self, refname): - """Return True if the given refname is present in this cache.""" - return bool(self._load(refname)) - - def __getitem__ (self, refname): - """Return the git object name pointed to by the given refname.""" - commit = self._load(refname) - if commit is None: - raise KeyError("Unknown ref '%s'" % (refname)) - return commit - - def get (self, refname, default = None): - """Return the git object name pointed to by the given refname.""" - commit = self._load(refname) - if commit is None: - return default - return commit - - -class GitFICommit(object): - - """Encapsulate the data in a Git fast-import commit command.""" - - SHA1RE = re.compile(r'^[0-9a-f]{40}$') - - @classmethod - def parse_mode (cls, mode): - """Verify the given git file mode, and return it as a string.""" - assert mode in (644, 755, 100644, 100755, 120000) - return "%i" % (mode) - - @classmethod - def parse_objname (cls, objname): - """Return the given object name (or mark number) as a string.""" - if isinstance(objname, int): # Object name is a mark number - assert objname > 0 - return ":%i" % (objname) - - # No existence check is done, only checks for valid format - assert cls.SHA1RE.match(objname) # Object name is valid SHA1 - return objname - - @classmethod - def quote_path (cls, path): - """Return a quoted version of the given path.""" - path = path.replace("\\", "\\\\") - path = path.replace("\n", "\\n") - path = path.replace('"', '\\"') - return '"%s"' % (path) - - @classmethod - def parse_path (cls, path): - """Verify that the given path is valid, and quote it, if needed.""" - assert not isinstance(path, int) # Cannot be a mark number - - # These checks verify the rules on the fast-import man page - assert not path.count("//") - assert not path.endswith("/") - assert not path.startswith("/") - assert not path.count("/./") - assert not path.count("/../") - assert not path.endswith("/.") - assert not path.endswith("/..") - assert not path.startswith("./") - assert not path.startswith("../") - - if path.count('"') + path.count('\n') + path.count('\\'): - return cls.quote_path(path) - return path - - def __init__ (self, name, email, timestamp, timezone, message): - """Create a new Git fast-import commit, with the given metadata.""" - self.name = name - self.email = email - self.timestamp = timestamp - self.timezone = timezone - self.message = message - self.pathops = [] # List of path operations in this commit - - def modify (self, mode, blobname, path): - """Add a file modification to this Git fast-import commit.""" - self.pathops.append(("M", - self.parse_mode(mode), - self.parse_objname(blobname), - self.parse_path(path))) - - def delete (self, path): - """Add a file deletion to this Git fast-import commit.""" - self.pathops.append(("D", self.parse_path(path))) - - def copy (self, path, newpath): - """Add a file copy to this Git fast-import commit.""" - self.pathops.append(("C", - self.parse_path(path), - self.parse_path(newpath))) - - def rename (self, path, newpath): - """Add a file rename to this Git fast-import commit.""" - self.pathops.append(("R", - self.parse_path(path), - self.parse_path(newpath))) - - def note (self, blobname, commit): - """Add a note object to this Git fast-import commit.""" - self.pathops.append(("N", - self.parse_objname(blobname), - self.parse_objname(commit))) - - def deleteall (self): - """Delete all files in this Git fast-import commit.""" - self.pathops.append("deleteall") - - -class TestGitFICommit(unittest.TestCase): - - """GitFICommit selftests.""" - - def test_basic (self): - """GitFICommit basic selftests.""" - - def expect_fail (method, data): - """Verify that the method(data) raises an AssertionError.""" - try: - method(data) - except AssertionError: - return - raise AssertionError("Failed test for invalid data '%s(%s)'" % - (method.__name__, repr(data))) - - def test_parse_mode (self): - """GitFICommit.parse_mode() selftests.""" - self.assertEqual(GitFICommit.parse_mode(644), "644") - self.assertEqual(GitFICommit.parse_mode(755), "755") - self.assertEqual(GitFICommit.parse_mode(100644), "100644") - self.assertEqual(GitFICommit.parse_mode(100755), "100755") - self.assertEqual(GitFICommit.parse_mode(120000), "120000") - self.assertRaises(AssertionError, GitFICommit.parse_mode, 0) - self.assertRaises(AssertionError, GitFICommit.parse_mode, 123) - self.assertRaises(AssertionError, GitFICommit.parse_mode, 600) - self.assertRaises(AssertionError, GitFICommit.parse_mode, "644") - self.assertRaises(AssertionError, GitFICommit.parse_mode, "abc") - - def test_parse_objname (self): - """GitFICommit.parse_objname() selftests.""" - self.assertEqual(GitFICommit.parse_objname(1), ":1") - self.assertRaises(AssertionError, GitFICommit.parse_objname, 0) - self.assertRaises(AssertionError, GitFICommit.parse_objname, -1) - self.assertEqual(GitFICommit.parse_objname("0123456789" * 4), - "0123456789" * 4) - self.assertEqual(GitFICommit.parse_objname("2468abcdef" * 4), - "2468abcdef" * 4) - self.assertRaises(AssertionError, GitFICommit.parse_objname, - "abcdefghij" * 4) - - def test_parse_path (self): - """GitFICommit.parse_path() selftests.""" - self.assertEqual(GitFICommit.parse_path("foo/bar"), "foo/bar") - self.assertEqual(GitFICommit.parse_path("path/with\n and \" in it"), - '"path/with\\n and \\" in it"') - self.assertRaises(AssertionError, GitFICommit.parse_path, 1) - self.assertRaises(AssertionError, GitFICommit.parse_path, 0) - self.assertRaises(AssertionError, GitFICommit.parse_path, -1) - self.assertRaises(AssertionError, GitFICommit.parse_path, "foo//bar") - self.assertRaises(AssertionError, GitFICommit.parse_path, "foo/bar/") - self.assertRaises(AssertionError, GitFICommit.parse_path, "/foo/bar") - self.assertRaises(AssertionError, GitFICommit.parse_path, "foo/./bar") - self.assertRaises(AssertionError, GitFICommit.parse_path, "foo/../bar") - self.assertRaises(AssertionError, GitFICommit.parse_path, "foo/bar/.") - self.assertRaises(AssertionError, GitFICommit.parse_path, "foo/bar/..") - self.assertRaises(AssertionError, GitFICommit.parse_path, "./foo/bar") - self.assertRaises(AssertionError, GitFICommit.parse_path, "../foo/bar") - - -class GitFastImport(object): - - """Encapsulate communication with git fast-import.""" - - def __init__ (self, f, obj_fetcher, last_mark = 0): - """Set up self to communicate with a fast-import process through f.""" - self.f = f # File object where fast-import stream is written - self.obj_fetcher = obj_fetcher # GitObjectFetcher instance - self.next_mark = last_mark + 1 # Next mark number - self.refs = set() # Keep track of the refnames we've seen - - def comment (self, s): - """Write the given comment in the fast-import stream.""" - assert "\n" not in s, "Malformed comment: '%s'" % (s) - self.f.write("# %s\n" % (s)) - - def commit (self, ref, commitdata): - """Make a commit on the given ref, with the given GitFICommit. - - Return the mark number identifying this commit. - - """ - self.f.write("""\ -commit %(ref)s -mark :%(mark)i -committer %(name)s <%(email)s> %(timestamp)i %(timezone)s -data %(msgLength)i -%(msg)s -""" % { - 'ref': ref, - 'mark': self.next_mark, - 'name': commitdata.name, - 'email': commitdata.email, - 'timestamp': commitdata.timestamp, - 'timezone': commitdata.timezone, - 'msgLength': len(commitdata.message), - 'msg': commitdata.message, -}) - - if ref not in self.refs: - self.refs.add(ref) - parent = ref + "^0" - if self.obj_fetcher.get_sha1(parent): - self.f.write("from %s\n" % (parent)) - - for op in commitdata.pathops: - self.f.write(" ".join(op)) - self.f.write("\n") - self.f.write("\n") - retval = self.next_mark - self.next_mark += 1 - return retval - - def blob (self, data): - """Import the given blob. - - Return the mark number identifying this blob. - - """ - self.f.write("blob\nmark :%i\ndata %i\n%s\n" % - (self.next_mark, len(data), data)) - retval = self.next_mark - self.next_mark += 1 - return retval - - def reset (self, ref, objname): - """Reset the given ref to point at the given Git object.""" - self.f.write("reset %s\nfrom %s\n\n" % - (ref, GitFICommit.parse_objname(objname))) - if ref not in self.refs: - self.refs.add(ref) - - -class GitNotes(object): - - """Encapsulate access to Git notes. - - Simulates a dictionary of object name (SHA1) -> Git note mappings. - - """ - - def __init__ (self, notes_ref, obj_fetcher): - """Create a new Git notes interface, bound to the given notes ref.""" - self.notes_ref = notes_ref - self.obj_fetcher = obj_fetcher # Used to get objects from repo - self.imports = [] # list: (objname, note data blob name) tuples - - def __del__ (self): - """Verify that self.commit_notes() was called before destruction.""" - if self.imports: - error("Missing call to self.commit_notes().") - error("%i notes are not committed!", len(self.imports)) - - def _load (self, objname): - """Return the note data associated with the given git object. - - The note data is returned in string form. If no note is found - for the given object, None is returned. - - """ - try: - f = self.obj_fetcher.open_obj("%s:%s" % (self.notes_ref, objname)) - ret = f.read() - f.close() - except KeyError: - ret = None - return ret - - def __getitem__ (self, objname): - """Return the note contents associated with the given object. - - Raise KeyError if given object has no associated note. - - """ - blobdata = self._load(objname) - if blobdata is None: - raise KeyError("Object '%s' has no note" % (objname)) - return blobdata - - def get (self, objname, default = None): - """Return the note contents associated with the given object. - - Return given default if given object has no associated note. - - """ - blobdata = self._load(objname) - if blobdata is None: - return default - return blobdata - - def import_note (self, objname, data, gfi): - """Tell git fast-import to store data as a note for objname. - - This method uses the given GitFastImport object to create a - blob containing the given note data. Also an entry mapping the - given object name to the created blob is stored until - commit_notes() is called. - - Note that this method only works if it is later followed by a - call to self.commit_notes() (which produces the note commit - that refers to the blob produced here). - - """ - if not data.endswith("\n"): - data += "\n" - gfi.comment("Importing note for object %s" % (objname)) - mark = gfi.blob(data) - self.imports.append((objname, mark)) - - def commit_notes (self, gfi, author, message): - """Produce a git fast-import note commit for the imported notes. - - This method uses the given GitFastImport object to create a - commit on the notes ref, introducing the notes previously - submitted to import_note(). - - """ - if not self.imports: - return - commitdata = GitFICommit(author[0], author[1], - time.time(), "0000", message) - for objname, blobname in self.imports: - assert isinstance(objname, int) and objname > 0 - assert isinstance(blobname, int) and blobname > 0 - commitdata.note(blobname, objname) - gfi.commit(self.notes_ref, commitdata) - self.imports = [] - - -class GitCachedNotes(GitNotes): - - """Encapsulate access to Git notes (cached version). - - Only use this class if no caching is done at a higher level. - - Simulates a dictionary of object name (SHA1) -> Git note mappings. - - """ - - def __init__ (self, notes_ref, obj_fetcher): - """Set up a caching wrapper around GitNotes.""" - GitNotes.__init__(self, notes_ref, obj_fetcher) - self._cache = {} # Cache: object name -> note data - - def __del__ (self): - """Verify that GitNotes' destructor is called.""" - GitNotes.__del__(self) - - def _load (self, objname): - """Extend GitNotes._load() with a local objname -> note cache.""" - if objname not in self._cache: - self._cache[objname] = GitNotes._load(self, objname) - return self._cache[objname] - - def import_note (self, objname, data, gfi): - """Extend GitNotes.import_note() with a local objname -> note cache.""" - if not data.endswith("\n"): - data += "\n" - assert objname not in self._cache - self._cache[objname] = data - GitNotes.import_note(self, objname, data, gfi) - - -if __name__ == '__main__': - unittest.main() |