# -*- coding: utf-8 -*- import base64 import re import zlib from collections import namedtuple from . import exceptions from .snippets import findall_regex, split_by_regex header = namedtuple( 'header', 'index_path old_path old_version new_path new_version', ) diffobj = namedtuple('diffobj', 'header changes text') Change = namedtuple('Change', 'old new line hunk') file_timestamp_str = '(.+?)(?:\t|:| +)(.*)' # .+? was previously [^:\t\n\r\f\v]+ # general diff regex diffcmd_header = re.compile('^diff.* (.+) (.+)$') unified_header_index = re.compile('^Index: (.+)$') unified_header_old_line = re.compile(r'^--- ' + file_timestamp_str + '$') unified_header_new_line = re.compile(r'^\+\+\+ ' + file_timestamp_str + '$') unified_hunk_start = re.compile(r'^@@ -(\d+),?(\d*) \+(\d+),?(\d*) @@(.*)$') unified_change = re.compile('^([-+ ])(.*)$', re.MULTILINE) context_header_old_line = re.compile(r'^\*\*\* ' + file_timestamp_str + '$') context_header_new_line = re.compile('^--- ' + file_timestamp_str + '$') context_hunk_start = re.compile(r'^\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*$') context_hunk_old = re.compile(r'^\*\*\* (\d+),?(\d*) \*\*\*\*$') context_hunk_new = re.compile(r'^--- (\d+),?(\d*) ----$') context_change = re.compile('^([-+ !]) (.*)$') ed_hunk_start = re.compile(r'^(\d+),?(\d*)([acd])$') ed_hunk_end = re.compile('^.$') # much like forward ed, but no 'c' type rcs_ed_hunk_start = re.compile(r'^([ad])(\d+) ?(\d*)$') default_hunk_start = re.compile(r'^(\d+),?(\d*)([acd])(\d+),?(\d*)$') default_hunk_mid = re.compile('^---$') default_change = re.compile('^([><]) (.*)$') # Headers # git has a special index header and no end part git_diffcmd_header = re.compile('^diff --git a/(.+) b/(.+)$') git_header_index = re.compile(r'^index ([a-f0-9]+)..([a-f0-9]+) ?(\d*)$') git_header_old_line = re.compile('^--- (.+)$') git_header_new_line = re.compile(r'^\+\+\+ (.+)$') git_header_file_mode = re.compile(r'^(new|deleted) file mode \d{6}$') git_header_binary_file = re.compile('^Binary files (.+) and (.+) differ') git_binary_patch_start = re.compile(r'^GIT binary patch$') git_binary_literal_start = re.compile(r'^literal (\d+)$') git_binary_delta_start = re.compile(r'^delta (\d+)$') base85string = re.compile(r'^[0-9A-Za-z!#$%&()*+;<=>?@^_`{|}~-]+$') bzr_header_index = re.compile('=== (.+)') bzr_header_old_line = unified_header_old_line bzr_header_new_line = unified_header_new_line svn_header_index = unified_header_index svn_header_timestamp_version = re.compile(r'\((?:working copy|revision (\d+))\)') svn_header_timestamp = re.compile(r'.*(\(.*\))$') cvs_header_index = unified_header_index cvs_header_rcs = re.compile(r'^RCS file: (.+)(?:,\w{1}$|$)') cvs_header_timestamp = re.compile(r'(.+)\t([\d.]+)') cvs_header_timestamp_colon = re.compile(r':([\d.]+)\t(.+)') old_cvs_diffcmd_header = re.compile('^diff.* (.+):(.*) (.+):(.*)$') def parse_patch(text): try: lines = text.splitlines() except AttributeError: lines = text # maybe use this to nuke all of those line endings? # lines = [x.splitlines()[0] for x in lines] lines = [x if len(x) == 0 else x.splitlines()[0] for x in lines] check = [ unified_header_index, diffcmd_header, cvs_header_rcs, git_header_index, context_header_old_line, unified_header_old_line, ] diffs = [] for c in check: diffs = split_by_regex(lines, c) if len(diffs) > 1: break for diff in diffs: difftext = '\n'.join(diff) + '\n' h = parse_header(diff) d = parse_diff(diff) if h or d: yield diffobj(header=h, changes=d, text=difftext) def parse_header(text): h = parse_scm_header(text) if h is None: h = parse_diff_header(text) return h def parse_scm_header(text): try: lines = text.splitlines() except AttributeError: lines = text check = [ (git_header_index, parse_git_header), (old_cvs_diffcmd_header, parse_cvs_header), (cvs_header_rcs, parse_cvs_header), (svn_header_index, parse_svn_header), ] for regex, parser in check: diffs = findall_regex(lines, regex) if len(diffs) > 0: git_opt = findall_regex(lines, git_diffcmd_header) if len(git_opt) > 0: res = parser(lines) if res: old_path = res.old_path new_path = res.new_path if old_path.startswith('a/'): old_path = old_path[2:] if new_path.startswith('b/'): new_path = new_path[2:] return header( index_path=res.index_path, old_path=old_path, old_version=res.old_version, new_path=new_path, new_version=res.new_version, ) else: res = parser(lines) return res return None def parse_diff_header(text): try: lines = text.splitlines() except AttributeError: lines = text check = [ (unified_header_new_line, parse_unified_header), (context_header_old_line, parse_context_header), (diffcmd_header, parse_diffcmd_header), # TODO: # git_header can handle version-less unified headers, but # will trim a/ and b/ in the paths if they exist... (git_header_new_line, parse_git_header), ] for regex, parser in check: diffs = findall_regex(lines, regex) if len(diffs) > 0: return parser(lines) return None # no header? def parse_diff(text): try: lines = text.splitlines() except AttributeError: lines = text check = [ (unified_hunk_start, parse_unified_diff), (context_hunk_start, parse_context_diff), (default_hunk_start, parse_default_diff), (ed_hunk_start, parse_ed_diff), (rcs_ed_hunk_start, parse_rcs_ed_diff), (git_binary_patch_start, parse_git_binary_diff), ] for hunk, parser in check: diffs = findall_regex(lines, hunk) if len(diffs) > 0: return parser(lines) return None def parse_git_header(text): try: lines = text.splitlines() except AttributeError: lines = text old_version = None new_version = None old_path = None new_path = None cmd_old_path = None cmd_new_path = None for line in lines: hm = git_diffcmd_header.match(line) if hm: cmd_old_path = hm.group(1) cmd_new_path = hm.group(2) continue g = git_header_index.match(line) if g: old_version = g.group(1) new_version = g.group(2) continue # git always has it's own special headers o = git_header_old_line.match(line) if o: old_path = o.group(1) n = git_header_new_line.match(line) if n: new_path = n.group(1) binary = git_header_binary_file.match(line) if binary: old_path = binary.group(1) new_path = binary.group(2) if old_path and new_path: if old_path.startswith('a/'): old_path = old_path[2:] if new_path.startswith('b/'): new_path = new_path[2:] return header( index_path=None, old_path=old_path, old_version=old_version, new_path=new_path, new_version=new_version, ) # if we go through all of the text without finding our normal info, # use the cmd if available if cmd_old_path and cmd_new_path and old_version and new_version: if cmd_old_path.startswith('a/'): cmd_old_path = cmd_old_path[2:] if cmd_new_path.startswith('b/'): cmd_new_path = cmd_new_path[2:] return header( index_path=None, # wow, I kind of hate this: # assume /dev/null if the versions are zeroed out old_path='/dev/null' if old_version == '0000000' else cmd_old_path, old_version=old_version, new_path='/dev/null' if new_version == '0000000' else cmd_new_path, new_version=new_version, ) return None def parse_svn_header(text): try: lines = text.splitlines() except AttributeError: lines = text headers = findall_regex(lines, svn_header_index) if len(headers) == 0: return None while len(lines) > 0: i = svn_header_index.match(lines[0]) del lines[0] if not i: continue diff_header = parse_diff_header(lines) if not diff_header: return header( index_path=i.group(1), old_path=i.group(1), old_version=None, new_path=i.group(1), new_version=None, ) opath = diff_header.old_path over = diff_header.old_version if over: oend = svn_header_timestamp_version.match(over) if oend and oend.group(1): over = int(oend.group(1)) elif opath: ts = svn_header_timestamp.match(opath) if ts: opath = opath[: -len(ts.group(1))] oend = svn_header_timestamp_version.match(ts.group(1)) if oend and oend.group(1): over = int(oend.group(1)) npath = diff_header.new_path nver = diff_header.new_version if nver: nend = svn_header_timestamp_version.match(diff_header.new_version) if nend and nend.group(1): nver = int(nend.group(1)) elif npath: ts = svn_header_timestamp.match(npath) if ts: npath = npath[: -len(ts.group(1))] nend = svn_header_timestamp_version.match(ts.group(1)) if nend and nend.group(1): nver = int(nend.group(1)) if not isinstance(over, int): over = None if not isinstance(nver, int): nver = None return header( index_path=i.group(1), old_path=opath, old_version=over, new_path=npath, new_version=nver, ) return None def parse_cvs_header(text): try: lines = text.splitlines() except AttributeError: lines = text headers = findall_regex(lines, cvs_header_rcs) headers_old = findall_regex(lines, old_cvs_diffcmd_header) if headers: # parse rcs style headers while len(lines) > 0: i = cvs_header_index.match(lines[0]) del lines[0] if not i: continue diff_header = parse_diff_header(lines) if diff_header: over = diff_header.old_version if over: oend = cvs_header_timestamp.match(over) oend_c = cvs_header_timestamp_colon.match(over) if oend: over = oend.group(2) elif oend_c: over = oend_c.group(1) nver = diff_header.new_version if nver: nend = cvs_header_timestamp.match(nver) nend_c = cvs_header_timestamp_colon.match(nver) if nend: nver = nend.group(2) elif nend_c: nver = nend_c.group(1) return header( index_path=i.group(1), old_path=diff_header.old_path, old_version=over, new_path=diff_header.new_path, new_version=nver, ) return header( index_path=i.group(1), old_path=i.group(1), old_version=None, new_path=i.group(1), new_version=None, ) elif headers_old: # parse old style headers while len(lines) > 0: i = cvs_header_index.match(lines[0]) del lines[0] if not i: continue d = old_cvs_diffcmd_header.match(lines[0]) if not d: return header( index_path=i.group(1), old_path=i.group(1), old_version=None, new_path=i.group(1), new_version=None, ) # will get rid of the useless stuff for us parse_diff_header(lines) over = d.group(2) if d.group(2) else None nver = d.group(4) if d.group(4) else None return header( index_path=i.group(1), old_path=d.group(1), old_version=over, new_path=d.group(3), new_version=nver, ) return None def parse_diffcmd_header(text): try: lines = text.splitlines() except AttributeError: lines = text headers = findall_regex(lines, diffcmd_header) if len(headers) == 0: return None while len(lines) > 0: d = diffcmd_header.match(lines[0]) del lines[0] if d: return header( index_path=None, old_path=d.group(1), old_version=None, new_path=d.group(2), new_version=None, ) return None def parse_unified_header(text): try: lines = text.splitlines() except AttributeError: lines = text headers = findall_regex(lines, unified_header_new_line) if len(headers) == 0: return None while len(lines) > 1: o = unified_header_old_line.match(lines[0]) del lines[0] if o: n = unified_header_new_line.match(lines[0]) del lines[0] if n: over = o.group(2) if len(over) == 0: over = None nver = n.group(2) if len(nver) == 0: nver = None return header( index_path=None, old_path=o.group(1), old_version=over, new_path=n.group(1), new_version=nver, ) return None def parse_context_header(text): try: lines = text.splitlines() except AttributeError: lines = text headers = findall_regex(lines, context_header_old_line) if len(headers) == 0: return None while len(lines) > 1: o = context_header_old_line.match(lines[0]) del lines[0] if o: n = context_header_new_line.match(lines[0]) del lines[0] if n: over = o.group(2) if len(over) == 0: over = None nver = n.group(2) if len(nver) == 0: nver = None return header( index_path=None, old_path=o.group(1), old_version=over, new_path=n.group(1), new_version=nver, ) return None def parse_default_diff(text): try: lines = text.splitlines() except AttributeError: lines = text old = 0 new = 0 old_len = 0 new_len = 0 r = 0 i = 0 changes = list() hunks = split_by_regex(lines, default_hunk_start) for hunk_n, hunk in enumerate(hunks): if not len(hunk): continue r = 0 i = 0 while len(hunk) > 0: h = default_hunk_start.match(hunk[0]) c = default_change.match(hunk[0]) del hunk[0] if h: old = int(h.group(1)) if len(h.group(2)) > 0: old_len = int(h.group(2)) - old + 1 else: old_len = 0 new = int(h.group(4)) if len(h.group(5)) > 0: new_len = int(h.group(5)) - new + 1 else: new_len = 0 elif c: kind = c.group(1) line = c.group(2) if kind == '<' and (r != old_len or r == 0): changes.append(Change(old + r, None, line, hunk_n)) r += 1 elif kind == '>' and (i != new_len or i == 0): changes.append(Change(None, new + i, line, hunk_n)) i += 1 if len(changes) > 0: return changes return None def parse_unified_diff(text): try: lines = text.splitlines() except AttributeError: lines = text old = 0 new = 0 r = 0 i = 0 old_len = 0 new_len = 0 changes = list() hunks = split_by_regex(lines, unified_hunk_start) for hunk_n, hunk in enumerate(hunks): # reset counters r = 0 i = 0 while len(hunk) > 0: h = unified_hunk_start.match(hunk[0]) del hunk[0] if h: # The hunk header @@ -1,6 +1,6 @@ means: # - Start at line 1 in the old file and show 6 lines # - Start at line 1 in the new file and show 6 lines old = int(h.group(1)) # Starting line in old file old_len = ( int(h.group(2)) if len(h.group(2)) > 0 else 1 ) # Number of lines in old file new = int(h.group(3)) # Starting line in new file new_len = ( int(h.group(4)) if len(h.group(4)) > 0 else 1 ) # Number of lines in new file h = None break # Process each line in the hunk for n in hunk: # Each line in a unified diff starts with a space (context), + (addition), or - (deletion) # The first character is the kind, the rest is the line content kind = ( n[0] if len(n) > 0 else ' ' ) # Empty lines in the hunk are treated as context lines line = n[1:] if len(n) > 1 else '' # Process the line based on its kind if kind == '-' and (r != old_len or r == 0): # Line was removed from the old file changes.append(Change(old + r, None, line, hunk_n)) r += 1 elif kind == '+' and (i != new_len or i == 0): # Line was added in the new file changes.append(Change(None, new + i, line, hunk_n)) i += 1 elif kind == ' ': # Context line - exists in both old and new file changes.append(Change(old + r, new + i, line, hunk_n)) r += 1 i += 1 if len(changes) > 0: return changes return None def parse_context_diff(text): try: lines = text.splitlines() except AttributeError: lines = text old = 0 new = 0 j = 0 k = 0 changes = list() hunks = split_by_regex(lines, context_hunk_start) for hunk_n, hunk in enumerate(hunks): if not len(hunk): continue j = 0 k = 0 parts = split_by_regex(hunk, context_hunk_new) if len(parts) != 2: raise exceptions.ParseException('Context diff invalid', hunk_n) old_hunk = parts[0] new_hunk = parts[1] while len(old_hunk) > 0: o = context_hunk_old.match(old_hunk[0]) del old_hunk[0] if not o: continue old = int(o.group(1)) old_len = int(o.group(2)) + 1 - old while len(new_hunk) > 0: n = context_hunk_new.match(new_hunk[0]) del new_hunk[0] if not n: continue new = int(n.group(1)) new_len = int(n.group(2)) + 1 - new break break # now have old and new set, can start processing? if len(old_hunk) > 0 and len(new_hunk) == 0: msg = 'Got unexpected change in removal hunk: ' # only removes left? while len(old_hunk) > 0: c = context_change.match(old_hunk[0]) del old_hunk[0] if not c: continue kind = c.group(1) line = c.group(2) if kind == '-' and (j != old_len or j == 0): changes.append(Change(old + j, None, line, hunk_n)) j += 1 elif kind == ' ' and ( (j != old_len and k != new_len) or (j == 0 or k == 0) ): changes.append(Change(old + j, new + k, line, hunk_n)) j += 1 k += 1 elif kind == '+' or kind == '!': raise exceptions.ParseException(msg + kind, hunk_n) continue if len(old_hunk) == 0 and len(new_hunk) > 0: msg = 'Got unexpected change in removal hunk: ' # only insertions left? while len(new_hunk) > 0: c = context_change.match(new_hunk[0]) del new_hunk[0] if not c: continue kind = c.group(1) line = c.group(2) if kind == '+' and (k != new_len or k == 0): changes.append(Change(None, new + k, line, hunk_n)) k += 1 elif kind == ' ' and ( (j != old_len and k != new_len) or (j == 0 or k == 0) ): changes.append(Change(old + j, new + k, line, hunk_n)) j += 1 k += 1 elif kind == '-' or kind == '!': raise exceptions.ParseException(msg + kind, hunk_n) continue # both while len(old_hunk) > 0 and len(new_hunk) > 0: oc = context_change.match(old_hunk[0]) nc = context_change.match(new_hunk[0]) okind = None nkind = None if oc: okind = oc.group(1) oline = oc.group(2) if nc: nkind = nc.group(1) nline = nc.group(2) if not (oc or nc): del old_hunk[0] del new_hunk[0] elif okind == ' ' and nkind == ' ' and oline == nline: changes.append(Change(old + j, new + k, oline, hunk_n)) j += 1 k += 1 del old_hunk[0] del new_hunk[0] elif okind == '-' or okind == '!' and (j != old_len or j == 0): changes.append(Change(old + j, None, oline, hunk_n)) j += 1 del old_hunk[0] elif nkind == '+' or nkind == '!' and (k != new_len or k == 0): changes.append(Change(None, new + k, nline, hunk_n)) k += 1 del new_hunk[0] else: return None if len(changes) > 0: return changes return None def parse_ed_diff(text): try: lines = text.splitlines() except AttributeError: lines = text old = 0 j = 0 k = 0 r = 0 i = 0 changes = list() hunks = split_by_regex(lines, ed_hunk_start) hunks.reverse() for hunk_n, hunk in enumerate(hunks): if not len(hunk): continue j = 0 k = 0 while len(hunk) > 0: o = ed_hunk_start.match(hunk[0]) del hunk[0] if not o: continue old = int(o.group(1)) old_end = int(o.group(2)) if len(o.group(2)) else old hunk_kind = o.group(3) if hunk_kind == 'd': k = 0 while old_end >= old: changes.append(Change(old + k, None, None, hunk_n)) r += 1 k += 1 old_end -= 1 continue while len(hunk) > 0: e = ed_hunk_end.match(hunk[0]) if not e and hunk_kind == 'c': k = 0 while old_end >= old: changes.append(Change(old + k, None, None, hunk_n)) r += 1 k += 1 old_end -= 1 # I basically have no idea why this works # for these tests. changes.append( Change( None, old - r + i + k + j, hunk[0], hunk_n, ) ) i += 1 j += 1 if not e and hunk_kind == 'a': changes.append( Change( None, old - r + i + 1, hunk[0], hunk_n, ) ) i += 1 del hunk[0] if len(changes) > 0: return changes return None def parse_rcs_ed_diff(text): # much like forward ed, but no 'c' type try: lines = text.splitlines() except AttributeError: lines = text old = 0 j = 0 size = 0 total_change_size = 0 changes = list() hunks = split_by_regex(lines, rcs_ed_hunk_start) for hunk_n, hunk in enumerate(hunks): if len(hunk): j = 0 while len(hunk) > 0: o = rcs_ed_hunk_start.match(hunk[0]) del hunk[0] if not o: continue hunk_kind = o.group(1) old = int(o.group(2)) size = int(o.group(3)) if hunk_kind == 'a': old += total_change_size + 1 total_change_size += size while size > 0 and len(hunk) > 0: changes.append(Change(None, old + j, hunk[0], hunk_n)) j += 1 size -= 1 del hunk[0] elif hunk_kind == 'd': total_change_size -= size while size > 0: changes.append(Change(old + j, None, None, hunk_n)) j += 1 size -= 1 if len(changes) > 0: return changes return None def parse_git_binary_diff(text): try: lines = text.splitlines() except AttributeError: lines = text changes: list[Change] = list() old_version = None new_version = None cmd_old_path = None cmd_new_path = None # the sizes are used as latch-up new_size = 0 old_size = 0 old_encoded = '' new_encoded = '' for line in lines: if cmd_old_path is None and cmd_new_path is None: hm = git_diffcmd_header.match(line) if hm: cmd_old_path = hm.group(1) cmd_new_path = hm.group(2) continue if old_version is None and new_version is None: g = git_header_index.match(line) if g: old_version = g.group(1) new_version = g.group(2) continue # the first is added file if new_size == 0: literal = git_binary_literal_start.match(line) if literal: new_size = int(literal.group(1)) continue delta = git_binary_delta_start.match(line) if delta: # not supported new_size = 0 continue elif new_size > 0: if base85string.match(line): assert len(line) >= 6 and ((len(line) - 1) % 5) == 0 new_encoded += line[1:] elif 0 == len(line): if new_encoded: decoded = base64.b85decode(new_encoded) added_data = zlib.decompress(decoded) assert new_size == len(added_data) change = Change(None, 0, added_data, None) changes.append(change) new_size = 0 new_encoded = '' else: # Invalid line format new_size = 0 new_encoded = '' # the second is removed file if old_size == 0: literal = git_binary_literal_start.match(line) if literal: old_size = int(literal.group(1)) delta = git_binary_delta_start.match(line) if delta: # not supported old_size = 0 continue elif old_size > 0: if base85string.match(line): assert len(line) >= 6 and ((len(line) - 1) % 5) == 0 old_encoded += line[1:] elif 0 == len(line): if old_encoded: decoded = base64.b85decode(old_encoded) removed_data = zlib.decompress(decoded) assert old_size == len(removed_data) change = Change(0, None, None, removed_data) changes.append(change) old_size = 0 old_encoded = '' else: # Invalid line format old_size = 0 old_encoded = '' return changes