|
|
|
import base64
|
|
import re
|
|
import zlib
|
|
from collections import namedtuple
|
|
|
|
from . import exceptions
|
|
from .snippets import findall_regex, split_by_regex
|
|
|
|
header = namedtuple(
|
|
'header',
|
|
'index_path old_path old_version new_path new_version',
|
|
)
|
|
|
|
diffobj = namedtuple('diffobj', 'header changes text')
|
|
Change = namedtuple('Change', 'old new line hunk')
|
|
|
|
file_timestamp_str = '(.+?)(?:\t|:| +)(.*)'
|
|
|
|
|
|
|
|
diffcmd_header = re.compile('^diff.* (.+) (.+)$')
|
|
unified_header_index = re.compile('^Index: (.+)$')
|
|
unified_header_old_line = re.compile(r'^--- ' + file_timestamp_str + '$')
|
|
unified_header_new_line = re.compile(r'^\+\+\+ ' + file_timestamp_str + '$')
|
|
unified_hunk_start = re.compile(r'^@@ -(\d+),?(\d*) \+(\d+),?(\d*) @@(.*)$')
|
|
unified_change = re.compile('^([-+ ])(.*)$', re.MULTILINE)
|
|
|
|
context_header_old_line = re.compile(r'^\*\*\* ' + file_timestamp_str + '$')
|
|
context_header_new_line = re.compile('^--- ' + file_timestamp_str + '$')
|
|
context_hunk_start = re.compile(r'^\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*$')
|
|
context_hunk_old = re.compile(r'^\*\*\* (\d+),?(\d*) \*\*\*\*$')
|
|
context_hunk_new = re.compile(r'^--- (\d+),?(\d*) ----$')
|
|
context_change = re.compile('^([-+ !]) (.*)$')
|
|
|
|
ed_hunk_start = re.compile(r'^(\d+),?(\d*)([acd])$')
|
|
ed_hunk_end = re.compile('^.$')
|
|
|
|
rcs_ed_hunk_start = re.compile(r'^([ad])(\d+) ?(\d*)$')
|
|
|
|
default_hunk_start = re.compile(r'^(\d+),?(\d*)([acd])(\d+),?(\d*)$')
|
|
default_hunk_mid = re.compile('^---$')
|
|
default_change = re.compile('^([><]) (.*)$')
|
|
|
|
|
|
|
|
|
|
git_diffcmd_header = re.compile('^diff --git a/(.+) b/(.+)$')
|
|
git_header_index = re.compile(r'^index ([a-f0-9]+)..([a-f0-9]+) ?(\d*)$')
|
|
git_header_old_line = re.compile('^--- (.+)$')
|
|
git_header_new_line = re.compile(r'^\+\+\+ (.+)$')
|
|
git_header_file_mode = re.compile(r'^(new|deleted) file mode \d{6}$')
|
|
git_header_binary_file = re.compile('^Binary files (.+) and (.+) differ')
|
|
git_binary_patch_start = re.compile(r'^GIT binary patch$')
|
|
git_binary_literal_start = re.compile(r'^literal (\d+)$')
|
|
git_binary_delta_start = re.compile(r'^delta (\d+)$')
|
|
base85string = re.compile(r'^[0-9A-Za-z!#$%&()*+;<=>?@^_`{|}~-]+$')
|
|
|
|
bzr_header_index = re.compile('=== (.+)')
|
|
bzr_header_old_line = unified_header_old_line
|
|
bzr_header_new_line = unified_header_new_line
|
|
|
|
svn_header_index = unified_header_index
|
|
svn_header_timestamp_version = re.compile(r'\((?:working copy|revision (\d+))\)')
|
|
svn_header_timestamp = re.compile(r'.*(\(.*\))$')
|
|
|
|
cvs_header_index = unified_header_index
|
|
cvs_header_rcs = re.compile(r'^RCS file: (.+)(?:,\w{1}$|$)')
|
|
cvs_header_timestamp = re.compile(r'(.+)\t([\d.]+)')
|
|
cvs_header_timestamp_colon = re.compile(r':([\d.]+)\t(.+)')
|
|
old_cvs_diffcmd_header = re.compile('^diff.* (.+):(.*) (.+):(.*)$')
|
|
|
|
|
|
def parse_patch(text):
|
|
try:
|
|
lines = text.splitlines()
|
|
except AttributeError:
|
|
lines = text
|
|
|
|
|
|
|
|
lines = [x if len(x) == 0 else x.splitlines()[0] for x in lines]
|
|
|
|
check = [
|
|
unified_header_index,
|
|
diffcmd_header,
|
|
cvs_header_rcs,
|
|
git_header_index,
|
|
context_header_old_line,
|
|
unified_header_old_line,
|
|
]
|
|
|
|
diffs = []
|
|
for c in check:
|
|
diffs = split_by_regex(lines, c)
|
|
if len(diffs) > 1:
|
|
break
|
|
|
|
for diff in diffs:
|
|
difftext = '\n'.join(diff) + '\n'
|
|
h = parse_header(diff)
|
|
d = parse_diff(diff)
|
|
if h or d:
|
|
yield diffobj(header=h, changes=d, text=difftext)
|
|
|
|
|
|
def parse_header(text):
|
|
h = parse_scm_header(text)
|
|
if h is None:
|
|
h = parse_diff_header(text)
|
|
return h
|
|
|
|
|
|
def parse_scm_header(text):
|
|
try:
|
|
lines = text.splitlines()
|
|
except AttributeError:
|
|
lines = text
|
|
|
|
check = [
|
|
(git_header_index, parse_git_header),
|
|
(old_cvs_diffcmd_header, parse_cvs_header),
|
|
(cvs_header_rcs, parse_cvs_header),
|
|
(svn_header_index, parse_svn_header),
|
|
]
|
|
|
|
for regex, parser in check:
|
|
diffs = findall_regex(lines, regex)
|
|
if len(diffs) > 0:
|
|
git_opt = findall_regex(lines, git_diffcmd_header)
|
|
if len(git_opt) > 0:
|
|
res = parser(lines)
|
|
if res:
|
|
old_path = res.old_path
|
|
new_path = res.new_path
|
|
if old_path.startswith('a/'):
|
|
old_path = old_path[2:]
|
|
|
|
if new_path.startswith('b/'):
|
|
new_path = new_path[2:]
|
|
|
|
return header(
|
|
index_path=res.index_path,
|
|
old_path=old_path,
|
|
old_version=res.old_version,
|
|
new_path=new_path,
|
|
new_version=res.new_version,
|
|
)
|
|
else:
|
|
res = parser(lines)
|
|
|
|
return res
|
|
|
|
return None
|
|
|
|
|
|
def parse_diff_header(text):
|
|
try:
|
|
lines = text.splitlines()
|
|
except AttributeError:
|
|
lines = text
|
|
|
|
check = [
|
|
(unified_header_new_line, parse_unified_header),
|
|
(context_header_old_line, parse_context_header),
|
|
(diffcmd_header, parse_diffcmd_header),
|
|
|
|
|
|
|
|
(git_header_new_line, parse_git_header),
|
|
]
|
|
|
|
for regex, parser in check:
|
|
diffs = findall_regex(lines, regex)
|
|
if len(diffs) > 0:
|
|
return parser(lines)
|
|
|
|
return None
|
|
|
|
|
|
def parse_diff(text):
|
|
try:
|
|
lines = text.splitlines()
|
|
except AttributeError:
|
|
lines = text
|
|
|
|
check = [
|
|
(unified_hunk_start, parse_unified_diff),
|
|
(context_hunk_start, parse_context_diff),
|
|
(default_hunk_start, parse_default_diff),
|
|
(ed_hunk_start, parse_ed_diff),
|
|
(rcs_ed_hunk_start, parse_rcs_ed_diff),
|
|
(git_binary_patch_start, parse_git_binary_diff),
|
|
]
|
|
|
|
for hunk, parser in check:
|
|
diffs = findall_regex(lines, hunk)
|
|
if len(diffs) > 0:
|
|
return parser(lines)
|
|
return None
|
|
|
|
|
|
def parse_git_header(text):
|
|
try:
|
|
lines = text.splitlines()
|
|
except AttributeError:
|
|
lines = text
|
|
|
|
old_version = None
|
|
new_version = None
|
|
old_path = None
|
|
new_path = None
|
|
cmd_old_path = None
|
|
cmd_new_path = None
|
|
for line in lines:
|
|
hm = git_diffcmd_header.match(line)
|
|
if hm:
|
|
cmd_old_path = hm.group(1)
|
|
cmd_new_path = hm.group(2)
|
|
continue
|
|
|
|
g = git_header_index.match(line)
|
|
if g:
|
|
old_version = g.group(1)
|
|
new_version = g.group(2)
|
|
continue
|
|
|
|
|
|
o = git_header_old_line.match(line)
|
|
if o:
|
|
old_path = o.group(1)
|
|
|
|
n = git_header_new_line.match(line)
|
|
if n:
|
|
new_path = n.group(1)
|
|
|
|
binary = git_header_binary_file.match(line)
|
|
if binary:
|
|
old_path = binary.group(1)
|
|
new_path = binary.group(2)
|
|
|
|
if old_path and new_path:
|
|
if old_path.startswith('a/'):
|
|
old_path = old_path[2:]
|
|
|
|
if new_path.startswith('b/'):
|
|
new_path = new_path[2:]
|
|
return header(
|
|
index_path=None,
|
|
old_path=old_path,
|
|
old_version=old_version,
|
|
new_path=new_path,
|
|
new_version=new_version,
|
|
)
|
|
|
|
|
|
|
|
if cmd_old_path and cmd_new_path and old_version and new_version:
|
|
if cmd_old_path.startswith('a/'):
|
|
cmd_old_path = cmd_old_path[2:]
|
|
|
|
if cmd_new_path.startswith('b/'):
|
|
cmd_new_path = cmd_new_path[2:]
|
|
|
|
return header(
|
|
index_path=None,
|
|
|
|
|
|
old_path='/dev/null' if old_version == '0000000' else cmd_old_path,
|
|
old_version=old_version,
|
|
new_path='/dev/null' if new_version == '0000000' else cmd_new_path,
|
|
new_version=new_version,
|
|
)
|
|
|
|
return None
|
|
|
|
|
|
def parse_svn_header(text):
|
|
try:
|
|
lines = text.splitlines()
|
|
except AttributeError:
|
|
lines = text
|
|
|
|
headers = findall_regex(lines, svn_header_index)
|
|
if len(headers) == 0:
|
|
return None
|
|
|
|
while len(lines) > 0:
|
|
i = svn_header_index.match(lines[0])
|
|
del lines[0]
|
|
if not i:
|
|
continue
|
|
|
|
diff_header = parse_diff_header(lines)
|
|
if not diff_header:
|
|
return header(
|
|
index_path=i.group(1),
|
|
old_path=i.group(1),
|
|
old_version=None,
|
|
new_path=i.group(1),
|
|
new_version=None,
|
|
)
|
|
|
|
opath = diff_header.old_path
|
|
over = diff_header.old_version
|
|
if over:
|
|
oend = svn_header_timestamp_version.match(over)
|
|
if oend and oend.group(1):
|
|
over = int(oend.group(1))
|
|
elif opath:
|
|
ts = svn_header_timestamp.match(opath)
|
|
if ts:
|
|
opath = opath[: -len(ts.group(1))]
|
|
oend = svn_header_timestamp_version.match(ts.group(1))
|
|
if oend and oend.group(1):
|
|
over = int(oend.group(1))
|
|
|
|
npath = diff_header.new_path
|
|
nver = diff_header.new_version
|
|
if nver:
|
|
nend = svn_header_timestamp_version.match(diff_header.new_version)
|
|
if nend and nend.group(1):
|
|
nver = int(nend.group(1))
|
|
elif npath:
|
|
ts = svn_header_timestamp.match(npath)
|
|
if ts:
|
|
npath = npath[: -len(ts.group(1))]
|
|
nend = svn_header_timestamp_version.match(ts.group(1))
|
|
if nend and nend.group(1):
|
|
nver = int(nend.group(1))
|
|
|
|
if not isinstance(over, int):
|
|
over = None
|
|
|
|
if not isinstance(nver, int):
|
|
nver = None
|
|
|
|
return header(
|
|
index_path=i.group(1),
|
|
old_path=opath,
|
|
old_version=over,
|
|
new_path=npath,
|
|
new_version=nver,
|
|
)
|
|
|
|
return None
|
|
|
|
|
|
def parse_cvs_header(text):
|
|
try:
|
|
lines = text.splitlines()
|
|
except AttributeError:
|
|
lines = text
|
|
|
|
headers = findall_regex(lines, cvs_header_rcs)
|
|
headers_old = findall_regex(lines, old_cvs_diffcmd_header)
|
|
|
|
if headers:
|
|
|
|
while len(lines) > 0:
|
|
i = cvs_header_index.match(lines[0])
|
|
del lines[0]
|
|
if not i:
|
|
continue
|
|
|
|
diff_header = parse_diff_header(lines)
|
|
if diff_header:
|
|
over = diff_header.old_version
|
|
if over:
|
|
oend = cvs_header_timestamp.match(over)
|
|
oend_c = cvs_header_timestamp_colon.match(over)
|
|
if oend:
|
|
over = oend.group(2)
|
|
elif oend_c:
|
|
over = oend_c.group(1)
|
|
|
|
nver = diff_header.new_version
|
|
if nver:
|
|
nend = cvs_header_timestamp.match(nver)
|
|
nend_c = cvs_header_timestamp_colon.match(nver)
|
|
if nend:
|
|
nver = nend.group(2)
|
|
elif nend_c:
|
|
nver = nend_c.group(1)
|
|
|
|
return header(
|
|
index_path=i.group(1),
|
|
old_path=diff_header.old_path,
|
|
old_version=over,
|
|
new_path=diff_header.new_path,
|
|
new_version=nver,
|
|
)
|
|
return header(
|
|
index_path=i.group(1),
|
|
old_path=i.group(1),
|
|
old_version=None,
|
|
new_path=i.group(1),
|
|
new_version=None,
|
|
)
|
|
elif headers_old:
|
|
|
|
while len(lines) > 0:
|
|
i = cvs_header_index.match(lines[0])
|
|
del lines[0]
|
|
if not i:
|
|
continue
|
|
|
|
d = old_cvs_diffcmd_header.match(lines[0])
|
|
if not d:
|
|
return header(
|
|
index_path=i.group(1),
|
|
old_path=i.group(1),
|
|
old_version=None,
|
|
new_path=i.group(1),
|
|
new_version=None,
|
|
)
|
|
|
|
|
|
parse_diff_header(lines)
|
|
over = d.group(2) if d.group(2) else None
|
|
nver = d.group(4) if d.group(4) else None
|
|
return header(
|
|
index_path=i.group(1),
|
|
old_path=d.group(1),
|
|
old_version=over,
|
|
new_path=d.group(3),
|
|
new_version=nver,
|
|
)
|
|
|
|
return None
|
|
|
|
|
|
def parse_diffcmd_header(text):
|
|
try:
|
|
lines = text.splitlines()
|
|
except AttributeError:
|
|
lines = text
|
|
|
|
headers = findall_regex(lines, diffcmd_header)
|
|
if len(headers) == 0:
|
|
return None
|
|
|
|
while len(lines) > 0:
|
|
d = diffcmd_header.match(lines[0])
|
|
del lines[0]
|
|
if d:
|
|
return header(
|
|
index_path=None,
|
|
old_path=d.group(1),
|
|
old_version=None,
|
|
new_path=d.group(2),
|
|
new_version=None,
|
|
)
|
|
return None
|
|
|
|
|
|
def parse_unified_header(text):
|
|
try:
|
|
lines = text.splitlines()
|
|
except AttributeError:
|
|
lines = text
|
|
|
|
headers = findall_regex(lines, unified_header_new_line)
|
|
if len(headers) == 0:
|
|
return None
|
|
|
|
while len(lines) > 1:
|
|
o = unified_header_old_line.match(lines[0])
|
|
del lines[0]
|
|
if o:
|
|
n = unified_header_new_line.match(lines[0])
|
|
del lines[0]
|
|
if n:
|
|
over = o.group(2)
|
|
if len(over) == 0:
|
|
over = None
|
|
|
|
nver = n.group(2)
|
|
if len(nver) == 0:
|
|
nver = None
|
|
|
|
return header(
|
|
index_path=None,
|
|
old_path=o.group(1),
|
|
old_version=over,
|
|
new_path=n.group(1),
|
|
new_version=nver,
|
|
)
|
|
|
|
return None
|
|
|
|
|
|
def parse_context_header(text):
|
|
try:
|
|
lines = text.splitlines()
|
|
except AttributeError:
|
|
lines = text
|
|
|
|
headers = findall_regex(lines, context_header_old_line)
|
|
if len(headers) == 0:
|
|
return None
|
|
|
|
while len(lines) > 1:
|
|
o = context_header_old_line.match(lines[0])
|
|
del lines[0]
|
|
if o:
|
|
n = context_header_new_line.match(lines[0])
|
|
del lines[0]
|
|
if n:
|
|
over = o.group(2)
|
|
if len(over) == 0:
|
|
over = None
|
|
|
|
nver = n.group(2)
|
|
if len(nver) == 0:
|
|
nver = None
|
|
|
|
return header(
|
|
index_path=None,
|
|
old_path=o.group(1),
|
|
old_version=over,
|
|
new_path=n.group(1),
|
|
new_version=nver,
|
|
)
|
|
|
|
return None
|
|
|
|
|
|
def parse_default_diff(text):
|
|
try:
|
|
lines = text.splitlines()
|
|
except AttributeError:
|
|
lines = text
|
|
|
|
old = 0
|
|
new = 0
|
|
old_len = 0
|
|
new_len = 0
|
|
r = 0
|
|
i = 0
|
|
|
|
changes = list()
|
|
|
|
hunks = split_by_regex(lines, default_hunk_start)
|
|
for hunk_n, hunk in enumerate(hunks):
|
|
if not len(hunk):
|
|
continue
|
|
|
|
r = 0
|
|
i = 0
|
|
while len(hunk) > 0:
|
|
h = default_hunk_start.match(hunk[0])
|
|
c = default_change.match(hunk[0])
|
|
del hunk[0]
|
|
if h:
|
|
old = int(h.group(1))
|
|
if len(h.group(2)) > 0:
|
|
old_len = int(h.group(2)) - old + 1
|
|
else:
|
|
old_len = 0
|
|
|
|
new = int(h.group(4))
|
|
if len(h.group(5)) > 0:
|
|
new_len = int(h.group(5)) - new + 1
|
|
else:
|
|
new_len = 0
|
|
|
|
elif c:
|
|
kind = c.group(1)
|
|
line = c.group(2)
|
|
|
|
if kind == '<' and (r != old_len or r == 0):
|
|
changes.append(Change(old + r, None, line, hunk_n))
|
|
r += 1
|
|
elif kind == '>' and (i != new_len or i == 0):
|
|
changes.append(Change(None, new + i, line, hunk_n))
|
|
i += 1
|
|
|
|
if len(changes) > 0:
|
|
return changes
|
|
|
|
return None
|
|
|
|
|
|
def parse_unified_diff(text):
|
|
try:
|
|
lines = text.splitlines()
|
|
except AttributeError:
|
|
lines = text
|
|
|
|
old = 0
|
|
new = 0
|
|
r = 0
|
|
i = 0
|
|
old_len = 0
|
|
new_len = 0
|
|
|
|
changes = list()
|
|
|
|
hunks = split_by_regex(lines, unified_hunk_start)
|
|
for hunk_n, hunk in enumerate(hunks):
|
|
|
|
r = 0
|
|
i = 0
|
|
while len(hunk) > 0:
|
|
h = unified_hunk_start.match(hunk[0])
|
|
del hunk[0]
|
|
if h:
|
|
|
|
|
|
|
|
old = int(h.group(1))
|
|
old_len = (
|
|
int(h.group(2)) if len(h.group(2)) > 0 else 1
|
|
)
|
|
|
|
new = int(h.group(3))
|
|
new_len = (
|
|
int(h.group(4)) if len(h.group(4)) > 0 else 1
|
|
)
|
|
|
|
h = None
|
|
break
|
|
|
|
|
|
for n in hunk:
|
|
|
|
|
|
kind = (
|
|
n[0] if len(n) > 0 else ' '
|
|
)
|
|
line = n[1:] if len(n) > 1 else ''
|
|
|
|
|
|
if kind == '-' and (r != old_len or r == 0):
|
|
|
|
changes.append(Change(old + r, None, line, hunk_n))
|
|
r += 1
|
|
elif kind == '+' and (i != new_len or i == 0):
|
|
|
|
changes.append(Change(None, new + i, line, hunk_n))
|
|
i += 1
|
|
elif kind == ' ':
|
|
|
|
changes.append(Change(old + r, new + i, line, hunk_n))
|
|
r += 1
|
|
i += 1
|
|
|
|
if len(changes) > 0:
|
|
return changes
|
|
|
|
return None
|
|
|
|
|
|
def parse_context_diff(text):
|
|
try:
|
|
lines = text.splitlines()
|
|
except AttributeError:
|
|
lines = text
|
|
|
|
old = 0
|
|
new = 0
|
|
j = 0
|
|
k = 0
|
|
|
|
changes = list()
|
|
|
|
hunks = split_by_regex(lines, context_hunk_start)
|
|
for hunk_n, hunk in enumerate(hunks):
|
|
if not len(hunk):
|
|
continue
|
|
|
|
j = 0
|
|
k = 0
|
|
parts = split_by_regex(hunk, context_hunk_new)
|
|
if len(parts) != 2:
|
|
raise exceptions.ParseException('Context diff invalid', hunk_n)
|
|
|
|
old_hunk = parts[0]
|
|
new_hunk = parts[1]
|
|
|
|
while len(old_hunk) > 0:
|
|
o = context_hunk_old.match(old_hunk[0])
|
|
del old_hunk[0]
|
|
|
|
if not o:
|
|
continue
|
|
|
|
old = int(o.group(1))
|
|
old_len = int(o.group(2)) + 1 - old
|
|
while len(new_hunk) > 0:
|
|
n = context_hunk_new.match(new_hunk[0])
|
|
del new_hunk[0]
|
|
|
|
if not n:
|
|
continue
|
|
|
|
new = int(n.group(1))
|
|
new_len = int(n.group(2)) + 1 - new
|
|
break
|
|
break
|
|
|
|
|
|
if len(old_hunk) > 0 and len(new_hunk) == 0:
|
|
msg = 'Got unexpected change in removal hunk: '
|
|
|
|
while len(old_hunk) > 0:
|
|
c = context_change.match(old_hunk[0])
|
|
del old_hunk[0]
|
|
|
|
if not c:
|
|
continue
|
|
|
|
kind = c.group(1)
|
|
line = c.group(2)
|
|
|
|
if kind == '-' and (j != old_len or j == 0):
|
|
changes.append(Change(old + j, None, line, hunk_n))
|
|
j += 1
|
|
elif kind == ' ' and (
|
|
(j != old_len and k != new_len) or (j == 0 or k == 0)
|
|
):
|
|
changes.append(Change(old + j, new + k, line, hunk_n))
|
|
j += 1
|
|
k += 1
|
|
elif kind == '+' or kind == '!':
|
|
raise exceptions.ParseException(msg + kind, hunk_n)
|
|
|
|
continue
|
|
|
|
if len(old_hunk) == 0 and len(new_hunk) > 0:
|
|
msg = 'Got unexpected change in removal hunk: '
|
|
|
|
while len(new_hunk) > 0:
|
|
c = context_change.match(new_hunk[0])
|
|
del new_hunk[0]
|
|
|
|
if not c:
|
|
continue
|
|
|
|
kind = c.group(1)
|
|
line = c.group(2)
|
|
|
|
if kind == '+' and (k != new_len or k == 0):
|
|
changes.append(Change(None, new + k, line, hunk_n))
|
|
k += 1
|
|
elif kind == ' ' and (
|
|
(j != old_len and k != new_len) or (j == 0 or k == 0)
|
|
):
|
|
changes.append(Change(old + j, new + k, line, hunk_n))
|
|
j += 1
|
|
k += 1
|
|
elif kind == '-' or kind == '!':
|
|
raise exceptions.ParseException(msg + kind, hunk_n)
|
|
continue
|
|
|
|
|
|
while len(old_hunk) > 0 and len(new_hunk) > 0:
|
|
oc = context_change.match(old_hunk[0])
|
|
nc = context_change.match(new_hunk[0])
|
|
okind = None
|
|
nkind = None
|
|
|
|
if oc:
|
|
okind = oc.group(1)
|
|
oline = oc.group(2)
|
|
|
|
if nc:
|
|
nkind = nc.group(1)
|
|
nline = nc.group(2)
|
|
|
|
if not (oc or nc):
|
|
del old_hunk[0]
|
|
del new_hunk[0]
|
|
elif okind == ' ' and nkind == ' ' and oline == nline:
|
|
changes.append(Change(old + j, new + k, oline, hunk_n))
|
|
j += 1
|
|
k += 1
|
|
del old_hunk[0]
|
|
del new_hunk[0]
|
|
elif okind == '-' or okind == '!' and (j != old_len or j == 0):
|
|
changes.append(Change(old + j, None, oline, hunk_n))
|
|
j += 1
|
|
del old_hunk[0]
|
|
elif nkind == '+' or nkind == '!' and (k != new_len or k == 0):
|
|
changes.append(Change(None, new + k, nline, hunk_n))
|
|
k += 1
|
|
del new_hunk[0]
|
|
else:
|
|
return None
|
|
|
|
if len(changes) > 0:
|
|
return changes
|
|
|
|
return None
|
|
|
|
|
|
def parse_ed_diff(text):
|
|
try:
|
|
lines = text.splitlines()
|
|
except AttributeError:
|
|
lines = text
|
|
|
|
old = 0
|
|
j = 0
|
|
k = 0
|
|
|
|
r = 0
|
|
i = 0
|
|
|
|
changes = list()
|
|
|
|
hunks = split_by_regex(lines, ed_hunk_start)
|
|
hunks.reverse()
|
|
for hunk_n, hunk in enumerate(hunks):
|
|
if not len(hunk):
|
|
continue
|
|
j = 0
|
|
k = 0
|
|
while len(hunk) > 0:
|
|
o = ed_hunk_start.match(hunk[0])
|
|
del hunk[0]
|
|
|
|
if not o:
|
|
continue
|
|
|
|
old = int(o.group(1))
|
|
old_end = int(o.group(2)) if len(o.group(2)) else old
|
|
|
|
hunk_kind = o.group(3)
|
|
if hunk_kind == 'd':
|
|
k = 0
|
|
while old_end >= old:
|
|
changes.append(Change(old + k, None, None, hunk_n))
|
|
r += 1
|
|
k += 1
|
|
old_end -= 1
|
|
continue
|
|
|
|
while len(hunk) > 0:
|
|
e = ed_hunk_end.match(hunk[0])
|
|
if not e and hunk_kind == 'c':
|
|
k = 0
|
|
while old_end >= old:
|
|
changes.append(Change(old + k, None, None, hunk_n))
|
|
r += 1
|
|
k += 1
|
|
old_end -= 1
|
|
|
|
|
|
|
|
changes.append(
|
|
Change(
|
|
None,
|
|
old - r + i + k + j,
|
|
hunk[0],
|
|
hunk_n,
|
|
)
|
|
)
|
|
i += 1
|
|
j += 1
|
|
if not e and hunk_kind == 'a':
|
|
changes.append(
|
|
Change(
|
|
None,
|
|
old - r + i + 1,
|
|
hunk[0],
|
|
hunk_n,
|
|
)
|
|
)
|
|
i += 1
|
|
|
|
del hunk[0]
|
|
|
|
if len(changes) > 0:
|
|
return changes
|
|
|
|
return None
|
|
|
|
|
|
def parse_rcs_ed_diff(text):
|
|
|
|
try:
|
|
lines = text.splitlines()
|
|
except AttributeError:
|
|
lines = text
|
|
|
|
old = 0
|
|
j = 0
|
|
size = 0
|
|
total_change_size = 0
|
|
|
|
changes = list()
|
|
|
|
hunks = split_by_regex(lines, rcs_ed_hunk_start)
|
|
for hunk_n, hunk in enumerate(hunks):
|
|
if len(hunk):
|
|
j = 0
|
|
while len(hunk) > 0:
|
|
o = rcs_ed_hunk_start.match(hunk[0])
|
|
del hunk[0]
|
|
|
|
if not o:
|
|
continue
|
|
|
|
hunk_kind = o.group(1)
|
|
old = int(o.group(2))
|
|
size = int(o.group(3))
|
|
|
|
if hunk_kind == 'a':
|
|
old += total_change_size + 1
|
|
total_change_size += size
|
|
while size > 0 and len(hunk) > 0:
|
|
changes.append(Change(None, old + j, hunk[0], hunk_n))
|
|
j += 1
|
|
size -= 1
|
|
|
|
del hunk[0]
|
|
|
|
elif hunk_kind == 'd':
|
|
total_change_size -= size
|
|
while size > 0:
|
|
changes.append(Change(old + j, None, None, hunk_n))
|
|
j += 1
|
|
size -= 1
|
|
|
|
if len(changes) > 0:
|
|
return changes
|
|
|
|
return None
|
|
|
|
|
|
def parse_git_binary_diff(text):
|
|
try:
|
|
lines = text.splitlines()
|
|
except AttributeError:
|
|
lines = text
|
|
|
|
changes: list[Change] = list()
|
|
|
|
old_version = None
|
|
new_version = None
|
|
cmd_old_path = None
|
|
cmd_new_path = None
|
|
|
|
new_size = 0
|
|
old_size = 0
|
|
old_encoded = ''
|
|
new_encoded = ''
|
|
for line in lines:
|
|
if cmd_old_path is None and cmd_new_path is None:
|
|
hm = git_diffcmd_header.match(line)
|
|
if hm:
|
|
cmd_old_path = hm.group(1)
|
|
cmd_new_path = hm.group(2)
|
|
continue
|
|
|
|
if old_version is None and new_version is None:
|
|
g = git_header_index.match(line)
|
|
if g:
|
|
old_version = g.group(1)
|
|
new_version = g.group(2)
|
|
continue
|
|
|
|
|
|
if new_size == 0:
|
|
literal = git_binary_literal_start.match(line)
|
|
if literal:
|
|
new_size = int(literal.group(1))
|
|
continue
|
|
delta = git_binary_delta_start.match(line)
|
|
if delta:
|
|
|
|
new_size = 0
|
|
continue
|
|
elif new_size > 0:
|
|
if base85string.match(line):
|
|
assert len(line) >= 6 and ((len(line) - 1) % 5) == 0
|
|
new_encoded += line[1:]
|
|
elif 0 == len(line):
|
|
if new_encoded:
|
|
decoded = base64.b85decode(new_encoded)
|
|
added_data = zlib.decompress(decoded)
|
|
assert new_size == len(added_data)
|
|
change = Change(None, 0, added_data, None)
|
|
changes.append(change)
|
|
new_size = 0
|
|
new_encoded = ''
|
|
else:
|
|
|
|
new_size = 0
|
|
new_encoded = ''
|
|
|
|
|
|
if old_size == 0:
|
|
literal = git_binary_literal_start.match(line)
|
|
if literal:
|
|
old_size = int(literal.group(1))
|
|
delta = git_binary_delta_start.match(line)
|
|
if delta:
|
|
|
|
old_size = 0
|
|
continue
|
|
elif old_size > 0:
|
|
if base85string.match(line):
|
|
assert len(line) >= 6 and ((len(line) - 1) % 5) == 0
|
|
old_encoded += line[1:]
|
|
elif 0 == len(line):
|
|
if old_encoded:
|
|
decoded = base64.b85decode(old_encoded)
|
|
removed_data = zlib.decompress(decoded)
|
|
assert old_size == len(removed_data)
|
|
change = Change(0, None, None, removed_data)
|
|
changes.append(change)
|
|
old_size = 0
|
|
old_encoded = ''
|
|
else:
|
|
|
|
old_size = 0
|
|
old_encoded = ''
|
|
|
|
return changes
|
|
|