Skip to content

Instantly share code, notes, and snippets.

@tyrcho
Last active March 28, 2023 16:16
Show Gist options
  • Save tyrcho/3f15273497ff9d5b5a0b12c672094387 to your computer and use it in GitHub Desktop.
Save tyrcho/3f15273497ff9d5b5a0b12c672094387 to your computer and use it in GitHub Desktop.
Script to accept Gitlab MRs when the pipeline has succeeded. Sample : https://gitlab.com/tyrcho/michel-steward/-/blob/master/.gitlab-ci.yml
#!/usr/bin/env python3
#
# git-subline-merge
#
# Created by Paul Altin on 02.03.18.
# Downloaded from https://github.com/paulaltin/git-subline-merge
#
# An interactive git merge driver which can resolve non-overlapping conflicts on individual or adjacent lines.
#
# To install for use during merge/rebase, place this script somewhere on your path, add these lines to your ~/.gitconfig:
#
# [merge "git-subline-merge"]
# name = An interactive merge driver for resolving sub-line conflicts
# driver = git-subline-merge %O %A %B %L %P
# recursive = binary
#
# and this to your git attributes file (e.g. ~/.config/git/attributes):
#
# * merge=git-subline-merge
#
# Alternatively, run this script on a conflicted file using 'git-subline-merge /path/to/file'
#
import os, sys, re, tempfile
from subprocess import call
from shutil import copyfile
from builtins import input
# improved command line editing on *nix
try:
import readline
except ImportError:
pass
# for interpreting environment variables
try:
from distutils.util import strtobool
except ImportError:
from distutils import strtobool
#############
### SETUP ###
#############
WINDOWS = os.name == 'nt'
# max hunk sizes
MAX_HUNK_SIZE = int(os.getenv('GIT_SUBLINE_MERGE_MAX_HUNK_SIZE', 16))
MAX_HUNK_SIZE_DIFF = int(os.getenv('GIT_SUBLINE_MERGE_MAX_HUNK_SIZE_DIFF', 8))
# colors
# don't use color if stdout is not a terminal or if it doesn't support at least 8-bit color
# also respect NO_COLOR informal standard
stdout_isatty = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty()
term_colors = int(os.popen('which tput >/dev/null && tput colors').read() or 0)
no_color = os.getenv('NO_COLOR') is not None
show_color = stdout_isatty and term_colors >= 8 and not no_color
# use colorama if available for colored terminal output on Windows
if WINDOWS and show_color:
try:
from colorama import init
init()
except ImportError:
print('Install colorama (pip install colorama) for colored output on Windows.')
pass
# interactive or non-interactive mode
stdin_isatty = hasattr(sys.stdin, 'isatty') and sys.stdin.isatty()
interactive = not bool(strtobool(os.getenv('GIT_SUBLINE_MERGE_NON_INTERACTIVE_MODE', 'False')))
if interactive and not stdin_isatty and not WINDOWS:
try:
sys.stdin = open('/dev/tty')
except IOError:
print('Warning: git-subline-merge could not run in interactive mode because there is no controlling terminal.\n'
'You can enable non-interactive mode using the GIT_SUBLINE_MERGE_NON_INTERACTIVE_MODE environment variable,\n'
' however note that in this mode you will NOT be able to review merges before they are written to file.')
sys.exit(1)
###############
### HELPERS ###
###############
# for colored output in terminal
class color:
bold = '\033[1m' if show_color else ''
welcome = '\033[1m\033[91m' if show_color else '' # bold, red
info = '\033[96m' if show_color else '' # cyan
highlight = '\033[103m\033[1m\033[30m' if show_color else '' # bold, black, yellow bg
added1 = '\033[48;5;28m' if show_color else '' # bright green
added2 = '\033[48;5;22m' if show_color else '' # dark green
deleted1 = '\033[48;5;124m' if show_color else '' # bright red
deleted2 = '\033[48;5;88m' if show_color else '' # dark red
deleted_both = '\033[48;5;166m' if show_color else '' # orange
success = '\033[92m' if show_color else '' # green
warning = '\033[93m' if show_color else '' # yellow
error = '\033[91m' if show_color else '' # red
end = '\033[0m' if show_color else ''
# get number of lines in a file, either from a name or a file handle
# if given a handle, ensure position is reset to its original value
def file_len(f):
i = 0
# file
try:
with open(f, 'r') as fh:
for i,l in enumerate(fh):
pass
# file handle
except TypeError:
pos = f.tell()
f.seek(0)
for i,l in enumerate(f):
pass
f.seek(pos)
return i + 1
# get the line numbers of the beginning and end of a conflicted hunk
# the file parameter is a path to a file on disk
def find_nth_conflicted_hunk(file, n):
start = stop = -1
with open(file, 'r') as f:
m = 0
for i,l in enumerate(f):
if l.startswith(marker_start):
m += 1
if (m == n and start == -1):
start = i
elif start >= 0 and l.startswith(marker_end):
stop = i + 1
break
if start == -1 or stop == -1:
raise IndexError('Hunk %d hunk not found!' % n)
return (start, stop)
# find nth conflict in file and replace it with given lines
# the file parameter is a path to a file on disk
# lines is a list of strings
def replace_nth_conflicted_hunk_with_lines(file, n, lines):
contents = None
with open(file, 'r') as f:
contents = f.readlines()
start, stop = find_nth_conflicted_hunk(file, n)
contents[start:stop] = lines
with open(file, 'w') as f:
f.write(''.join(contents))
# prompt user for input, repeating until one of the allowed responses is given
def ask_for_input(msg, allowed=None):
if allowed is not None:
msg = color.bold + color.info + msg + ' (' + '/'.join(allowed) + ')? ' + color.end
while True:
r = input(msg).strip().lower()
if r in [s.strip().lower() for s in allowed]:
break
else:
msg = color.bold + color.info + msg + color.end
r = input(msg)
return r
# get exit status from a call to os.system
# on Windows, os.WEXITSTATUS() doesn't work and os.system() returns the status directly
def get_exit_status(exit_code):
return exit_code if WINDOWS else os.WEXITSTATUS(exit_code)
#############
### DIFFS ###
#############
# print one version of a hunk, highlighting the differences between it and the other version(s)
# display is either 'current', 'base' or 'other'
# added lines are indicated by a leading '+' and are highlighted in green
# deleted lines are indicated by a leading '-' and are highlighted in red
# changed lines are indicated by a leading '*' and changes within them are highlighted in either green or red
# for current or other version, we just need to highlight the differences between it
# and the base version, which can only be changes or additions
# for the base version, we want to highlight the differences between it and BOTH
# the current and the other version, which can only be changes or deletions
# we take the diff with both versions and combine them by keeping all color codes
def print_formatted_diff(hunk, display):
if display == 'current' or display == 'other':
old = hunk[1]
new = hunk[0] if display == 'current' else hunk[2]
color_added = color.added1 if display == 'current' else color.added2
color_deleted = color.deleted1 if display == 'current' else color.deleted2
clines = format_diff(old, new, 'new', color_added, color_deleted)
for ln in clines:
print(ln)
elif display == 'base':
clines1 = format_diff(hunk[1], hunk[0], 'old', color.added1, color.deleted1)
clines2 = format_diff(hunk[1], hunk[2], 'old', color.added2, color.deleted2)
clines = combine_colors(clines1, clines2, color.deleted1, color.deleted2, color.deleted_both)
for ln in clines:
print(ln)
# add colors and prefixes showing the parts added or deleted between two versions
# fold and fnew are file handles for the old and new versions
# display must be either 'old' or 'new'
# added lines are indicated by a leading '+' and are highlighted using color_added
# deleted lines are indicated by a leading '-' and are highlighted using color_deleted
# changed lines are indicated by a leading '*' and changes within them are highlighted using color_added and color_deleted
def format_diff(fold, fnew, display, color_added, color_deleted):
# reset file handles
fold.seek(0)
fnew.seek(0)
# find line numbers of added, deleted and changed lines
# for added and deleted groups, get the (zero-indexed) number of the first line and the length of the group
# for changed groups, get the (zero-indexed) numbers of the first and one past the last line
# ideally we'd use \n as the delimiter, but these break the shell on Windows
delimiter = marker_start + "GIT_SUBLINE_MERGE_DELIMITER" + marker_end
oldformat = ' --old-group-format="D%de+%dn,%dE+%dN' + delimiter + '" '
newformat = ' --new-group-format="A%de+%dn,%dE+%dN' + delimiter + '" '
changedformat = ' --changed-group-format="C%de+%dl,%dE+%dL' + delimiter + '" '
unchangedformat = ' --unchanged-group-format="" '
cmd = 'diff' + oldformat + newformat + changedformat + unchangedformat + fold.name + ' ' + fnew.name
hunks = [h for h in os.popen(cmd).read().split(delimiter) if len(h)]
# extract indices from diff output
# added and deleted are a list of line numbers
# changed are lists of (start, stop) pairs
added, deleted, oldchange, newchange = [], [], [], []
for h in hunks:
if h[0] == 'A':
a = h[1:].split(',')[1]
start,length = [int(x) for x in a.split('+')]
added += list(range(start, start+length))
elif h[0] == 'D':
d = h[1:].split(',')[0]
start,length = [int(x) for x in d.split('+')]
deleted += list(range(start, start+length))
elif h[0] == 'C':
c = h[1:].split(',')
start,stop = [int(x) for x in c[0].split('+')]
oldchange += [[start, stop]]
start,stop = [int(x) for x in c[1].split('+')]
newchange += [[start, stop]]
# generate a list of lines with prefixes and highlighting
lines = []
i, j = 0, 0
old = fold.read().splitlines()
new = fnew.read().splitlines()
version = old if display == 'old' else new
change = oldchange if display == 'old' else newchange
while i < len(version):
if display == 'new' and i in added:
lines.append('+ ' + color_added + version[i] + color.end)
elif display == 'old' and i in deleted:
lines.append('- ' + color_deleted + version[i] + color.end)
elif j < len(change) and i == change[j][0]:
oldhunk = old[oldchange[j][0]:oldchange[j][1]]
newhunk = new[newchange[j][0]:newchange[j][1]]
coloredhunk = format_changed_hunk(oldhunk, newhunk, display, color_added, color_deleted)
for ln in coloredhunk:
lines.append('* ' + ln)
i += change[j][1] - change[j][0] - 1 # 1 will be added later
j += 1
else:
lines.append(' ' + version[i])
i += 1
return lines
# word-level diff with coloring
# additions are highlighted using color_added, deletions using color_deleted
# h1 and h2 are lists of strings (lines)
# display must be either 'old' or 'new'
# return value is a list of strings (lines) that matches
# either h1 or h2, depending on the value of display
def format_changed_hunk(h1, h2, display, color_added, color_deleted):
result = ''
# split lines at non-word characters
# keep track of where the linebreaks are in terms of segment index
splA, splB, lcA, lcB = [], [], [0], [0]
for ln in h1:
segments = re.split('(\W)', ln)
splA += segments
lcA.append(lcA[-1] + len(segments))
for ln in h2:
segments = re.split('(\W)', ln)
splB += segments
lcB.append(lcB[-1] + len(segments))
lcA.pop(0)
lcB.pop(0)
# write split lines to temp files
tmpA = tempfile.NamedTemporaryFile(mode='w+')
tmpA.write('\n'.join(splA))
tmpA.flush()
tmpB = tempfile.NamedTemporaryFile(mode='w+')
tmpB.write('\n'.join(splB))
tmpB.flush()
# use diff to find added and removed section indices
old = os.popen('diff --unchanged-line-format="." --old-line-format="-" --new-line-format="" ' + tmpA.name + ' ' + tmpB.name).read()
new = os.popen('diff --unchanged-line-format="." --old-line-format="" --new-line-format="+" ' + tmpA.name + ' ' + tmpB.name).read()
removed = []
for i,c in enumerate(old):
if c == '-':
removed.append(i)
added = []
for i,c in enumerate(new):
if c == '+':
added.append(i)
if display == 'old':
for i,w in enumerate(splA):
if i in lcA: result += '\n'
if i in removed: result += color_deleted
result += w.rstrip('\n')
if i in removed: result += color.end
elif display == 'new':
for i,w in enumerate(splB):
if i in lcB: result += '\n'
if i in added: result += color_added
result += w.rstrip('\n')
if i in added: result += color.end
tmpA.close()
tmpB.close()
return result.split('\n')
# combine highlighting from two versions of the same (base) hunk and print
# lines1 and lines2 are lists of strings (lines) and must be identical
# except for highlighting with color1 and color2
# regions which are highlighted in both versions will be printed with color_both
def combine_colors(lines1, lines2, color1, color2, color_both):
# merge the two versions
lines = []
assert len(lines1) == len(lines2)
for ln1, ln2 in zip(lines1, lines2):
i, j = 0, 0
result = ''
highlight1, highlight2 = False, False
# remove prefixes (we will add them back later)
prefixes = [ln1[0], ln2[0]]
ln1, ln2 = ln1[2:], ln2[2:]
# i iterates through ln1, j iterates through ln2
while i < len(ln1) or j < len(ln2):
# add special characters from version 1
if i < len(ln1):
if show_color and ln1[i:i+len(color1)] == color1:
result += (color.end + color_both) if highlight2 else color1
highlight1 = True
i += len(color1)
continue
elif show_color and ln1[i:i+len(color.end)] == color.end:
highlight1 = False
result += color.end
if highlight2: result += color2
i += len(color.end)
continue
# add special characters from version 2
if j < len(ln2):
if show_color and ln2[j:j+len(color2)] == color2:
result += (color.end + color_both) if highlight1 else color2
highlight2 = True
j += len(color2)
continue
elif show_color and ln2[j:j+len(color.end)] == color.end:
highlight2 = False
result += color.end
if highlight1: result += color1
j += len(color.end)
continue
# all other characters should match
assert ln1[i] == ln2[j]
result += ln1[i]
i += 1
j += 1
# add prefix and print line
prefix = '- ' if '-' in prefixes else '* ' if '*' in prefixes else ' '
lines.append(prefix + result)
return lines
###############
### MERGING ###
###############
def process_hunk(hunk, index, resolved, num_conflicts):
first_run = True
fixed = abort = False
# get number of lines in current, base and other hunks
sizes = [file_len(f) for f in hunk]
# only process small hunks
if all([s <= MAX_HUNK_SIZE for s in sizes]) and max(abs(sizes[0]-sizes[1]), abs(sizes[2]-sizes[1])) <= MAX_HUNK_SIZE_DIFF:
# interactive mode
if interactive:
print(color.highlight + '\nConflicted hunk %d of %d (spans %d/%d/%d lines) in %s...' % (index, num_conflicts, sizes[0], sizes[1], sizes[2], filename) + color.end)
# ask user for action to take
while True:
for f in hunk: f.seek(0)
if first_run:
print('')
action = 'v'
first_run = False
else:
print('')
print(color.bold + ' v - view entire hunk' + color.end)
print(color.bold + ' x - view hunk in context' + color.end)
print(color.bold + ' s - attempt sub-line merge' + color.end)
print(color.bold + ' m - resolve manually' + color.end)
print(color.bold + ' c - use current version' + color.end)
print(color.bold + ' b - use base version' + color.end)
print(color.bold + ' o - use other version' + color.end)
print(color.bold + ' k - skip this hunk' + color.end)
print(color.bold + ' q - skip all hunks in this file' + color.end)
action = ask_for_input('Resolve this hunk', ['v','x','s','m','c','b','o','k','q'])
print('')
# actions
if action == 'x':
start, stop = find_nth_conflicted_hunk(current, index-resolved)
with open(current, 'r') as c:
for i,l in enumerate(c):
if i in range(max(0, start-10), start):
print(l.rstrip('\n'))
elif i >= start:
break
if action in ['v','x']:
print(color.info + marker_start + ' Current version' + color.end)
print_formatted_diff(hunk, 'current')
print(color.info + marker_base + ' Base version' + color.end)
print_formatted_diff(hunk, 'base')
print(color.info + marker_other + ' Other version' + color.end)
print_formatted_diff(hunk, 'other')
print(color.info + marker_end + color.end)
if action == 'x':
length = file_len(current)
with open(current, 'r') as c:
for i,l in enumerate(c):
if i in range(stop, min(stop+10, length)):
print(l.rstrip('\n'))
elif i >= stop:
break
elif action == 's':
result = subline_merge_hunk(hunk)
if result is not None:
print(color.info + marker_start + ' Sub-line merge yields:' + color.end)
if len(result):
print('\n'.join([x.rstrip('\n') for x in result]).rstrip('\n'))
print(color.info + marker_end + color.end + '\n')
accept = ask_for_input('Accept sub-line merge', ['y','n'])
success = True if accept == 'y' else False
if (success):
replace_nth_conflicted_hunk_with_lines(current, index-resolved, result)
fixed = True
break
else:
print(color.bold + color.error + 'Sub-line merge failed, hunk has overlapping changes' + color.end)
elif action == 'm':
result = manual_merge_hunk(hunk)
if result is not None:
print(color.info + marker_start + ' Manual resolution is:' + color.end)
if len(result):
print('\n'.join([x.rstrip('\n') for x in result]).rstrip('\n'))
print(color.info + marker_end + color.end + '\n')
accept = ask_for_input('Accept manual resolution', ['y','n'])
success = True if accept == 'y' else False
if (success):
replace_nth_conflicted_hunk_with_lines(current, index-resolved, result)
fixed = True
break
else:
print(color.bold + color.warning + 'User cancelled manual resolve' + color.end)
elif action in ['c','b','o']:
idx = ['c','b','o'].index(action)
result = hunk[idx].readlines()
branchname = ['Current','Base','Other'][idx]
print(color.info + marker_start + ' ' + branchname + ' version is:' + color.end)
if len(result):
print('\n'.join([x.rstrip('\n') for x in result]).rstrip('\n'))
print(color.info + marker_end + color.end + '\n')
warn = ['the other branch', 'both branches', 'the current branch'][idx]
print(color.bold + color.warning + 'Warning: this will discard changes on ' + warn + '!' + color.end + '\n')
accept = ask_for_input('Accept', ['y','n'])
success = True if accept == 'y' else False
if (success):
replace_nth_conflicted_hunk_with_lines(current, index-resolved, result)
fixed = True
break
elif action == 'k':
break
elif action == 'q':
abort = True
break
# non-interactive mode
else:
for f in hunk: f.seek(0)
result = subline_merge_hunk(hunk)
if result is not None:
replace_nth_conflicted_hunk_with_lines(current, index-resolved, result)
print(color.bold + color.success + 'git-subline-merge resolved conflict %d of %d in %s, resulting hunk was:' % (index, num_conflicts, filename) + color.end)
print(color.info + marker_start + color.end)
if len(result): print('\n'.join([x.rstrip('\n') for x in result]).rstrip('\n'))
print(color.info + marker_end + color.end)
fixed = True
else:
print(color.bold + color.error + 'git-subline-merge failed to resolve conflict %d of %d in %s, hunk has overlapping changes' % (index, num_conflicts, filename) + color.end)
# hunk too large
else:
print(color.bold + color.info + '\nSkipping hunk %d of %d (spans %d/%d/%d lines) in %s, too large...' % (index, num_conflicts, sizes[0], sizes[1], sizes[2], filename) + color.end)
return fixed, abort
# attempt to do sub-line merging of a conflicted hunk
def subline_merge_hunk(hunk):
result = None
fs = [None, None, None]
# separate each character in a string by a newline character
# on Windows the temp file can't be open twice, so we have to close it before calling
# git-merge-file and delete it manually later
for i in range(3):
h = hunk[i].read()
fs[i] = tempfile.NamedTemporaryFile(mode='w+', delete=False)
fs[i].write('\n'.join(h[i:i+1] for i in range(len(h))))
fs[i].close()
# attempt merge on separated text
status = os.system('git merge-file %s %s %s' % (fs[0].name, fs[1].name, fs[2].name))
num_conflicts = get_exit_status(status)
# if successful, take merge result after removing newlines
# split into a list of lines
if (num_conflicts == 0):
with open(fs[0].name, 'r+') as f0:
h = f0.read()
result = h[0::2].splitlines(True)
for f in fs: os.unlink(f.name)
return result
# open an editor for manual merging of a conflicted hunk
def manual_merge_hunk(hunk):
# prepare temp file for editing
f = tempfile.NamedTemporaryFile(mode='w+', delete=False)
f.write(marker_start + ' Current version is:\n')
f.write(hunk[0].read())
f.write(marker_base + ' Base version is:\n')
f.write(hunk[1].read())
f.write(marker_other + ' Other version is:\n')
f.write(hunk[2].read())
f.write(marker_end + '\n')
f.close()
# imitate the process that Git uses to determine which editor to use
editor = os.getenv('GIT_EDITOR')
if editor is None: editor = os.popen('git config core.editor').read().rstrip('\n') or None
if editor is None: editor = os.getenv('VISUAL')
if editor is None: editor = os.getenv('EDITOR')
if editor is None: editor = 'vi'
# known issue with vim on Mac OS X: can give nonzero exit status even when quit cleanly due to
# errors in .vimrc, so check its normal exit code first.
success_code = 0
if editor == 'vi':
success_code = call(['vi -c "q" %s' % f.name], shell=(not WINDOWS), stdin=sys.stdin)
status = call(['%s %s' % (editor, f.name)], shell=(not WINDOWS), stdin=sys.stdin)
# if editor returned without an error, return the saved lines
result = None
if status <= success_code:
with open(f.name, 'r') as f:
result = f.readlines()
os.unlink(f.name)
return result
############
### MAIN ###
############
# invoked by git
if len(sys.argv) == 6:
# arguments passed in by git:
# [1] name of temp file containing base version of file
# [2] name of temp file containing current version of file
# [3] name of temp file containing other version of file
# [4] length of conflict markers (default is 7 for <<<<<<<)
# [5] name of conflicted file
# the result should be left in the current version (argv[2])
base, current, other, marker_len, filename = sys.argv[1:]
marker_len = int(marker_len)
marker_start = '<' * marker_len
marker_base = '|' * marker_len
marker_other = '=' * marker_len
marker_end = '>' * marker_len
# run git merge on the files using diff3 (result is written to 'current')
# the number of conflicts is encoded in the exit status of the git-merge-file command
status = os.system('git merge-file --diff3 --marker-size=%d -L "Current version" -L "Base version" -L "Other version" %s %s %s' % (marker_len, current, base, other))
num_conflicts = get_exit_status(status)
# invoked manually on a single file
elif len(sys.argv) >= 2 and os.path.isfile(sys.argv[1]):
# first argument is path to file
current = sys.argv[1]
filename = current
# optional second argument for conflict marker size (default is 7 for <<<<<<<)
try:
marker_len = int(sys.argv[2])
except ValueError:
print('Error: unable to read conflict marker size from argument "%s".' % (sys.argv[2]))
sys.exit(1)
except IndexError:
pass
finally:
marker_len = 7;
# extra arguments are ignored
if len(sys.argv) > 3:
print('Warning: ignoring extra arguments (%s).' % (', '.join(sys.argv[3:])))
marker_start = '<' * marker_len
marker_base = '|' * marker_len
marker_other = '=' * marker_len
marker_end = '>' * marker_len
# use grep to find number of conflicts in the file
num_start = int(os.popen('grep -c "^%s" "%s"' % (marker_start, current)).read())
num_base = int(os.popen('grep -c "^%s" "%s"' % (marker_base, current)).read())
num_other = int(os.popen('grep -c "^%s" "%s"' % (marker_other, current)).read())
num_end = int(os.popen('grep -c "^%s" "%s"' % (marker_end, current)).read())
# check that the number of conflict markers match
# if they don't, probably the merge wasn't generated using diff3, and we can't resolve conflicts without the base version
if num_start == num_base == num_other == num_end:
num_conflicts = num_start
else:
print('Badly formatted conflicts found in file. Possibly you need to change your conflictstyle to diff3?')
sys.exit(1)
# otherwise show help text
else:
print("usage: git-subline-merge <file> [conflict-marker-size]\n"
" <file>: path to file with conflicts (must be generated using diff3 style, see README)\n"
" [conflict-marker-size] (optional): number of characters used in conflict markers (default 7)")
sys.exit(1)
# sometimes (e.g. during interactive rebase) this script is called even though
# there are no conflicts - in that case just exit successfully here
if num_conflicts == 0:
sys.exit(0)
# welcome message
if interactive:
print(color.welcome + '\ngit-subline-merge v1.0\n' + color.end)
# make temporary files
# one is a copy of the conflicted 'current', to iterate with
# three are used to hold the different versions of each conflicted hunk
# open with 'a' so we can append without worrying about newline characters
tmp = tempfile.TemporaryFile(mode='w+')
with open(current, 'r') as c: tmp.write(c.read())
tmp.seek(0)
hunk = [ tempfile.NamedTemporaryFile(mode='a+'), tempfile.NamedTemporaryFile(mode='a+'), tempfile.NamedTemporaryFile(mode='a+') ]
# process file line by line
resolved = zone = index = 0
for line in tmp:
# beginning of conflict zone
if line.startswith(marker_start):
zone = 1
for f in hunk:
f.truncate(0)
f.seek(0)
# base version of conflict
elif line.startswith(marker_base):
zone = 2
# other version of conflict
elif line.startswith(marker_other):
zone = 3
# end of conflict zone
elif line.startswith(marker_end):
zone = 0
index += 1
fixed, abort = process_hunk(hunk, index, resolved, num_conflicts)
if fixed: resolved += 1
elif abort: break
# append line to appropriate hunk zone
elif zone:
hunk[zone-1].write(line)
# close all temp files (will be automatically deleted)
tmp.close()
for h in hunk: h.close()
# print resolution
newline = '\n' if interactive else ''
col = color.success if resolved == num_conflicts else color.warning if resolved > 0 else color.error
print(col + color.bold + newline + 'Resolved %d of %d conflicts in %s' % (resolved, num_conflicts, filename) + newline + color.end)
# only exit with success if all conflicts were resolved
sys.exit(0 if resolved == num_conflicts else 1)
#!/usr/bin/env bash
script_full_path="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )"
curl_gitlab() {
echo $* >&2
URL_SUFFIX=$1
shift
curl \
--silent \
--show-error \
--fail \
--header "PRIVATE-TOKEN: $GITLAB_PAT" \
"https://gitlab.com/api/v4/$URL_SUFFIX" $*
}
get_project_id() {
PROJECTS_FILTER=$1
NAME=$2
curl_gitlab "$PROJECTS_FILTER$NAME" \
| jq -r '.[] | select(.path == "'$NAME'") | "\(.id)"'
}
merge_mr() {
PROJECT_ID=$1
MR_ID=$2
MERGE_WHEN_PIPELINE_SUCCEEDS=${3:-false}
curl_gitlab "projects/$PROJECT_ID/merge_requests/$MR_ID/merge?should_remove_source_branch=true&merge_when_pipeline_succeeds=$MERGE_WHEN_PIPELINE_SUCCEEDS" -X PUT
ret=$?
echo
if [ $ret -eq 0 ] ; then
echo "MR $MR_ID was set to merge when pipeline succeeds" >&2
else
echo "MR $MR_ID was not merged" >&2
fi
return $ret
}
# clones the git repository locally and uses a subline merge algorithm to rebase and force push the branch
rebase_local() {
local PROJECT_ID=$1
local MR_ID=$2
read -r SOURCE_BRANCH TARGET_BRANCH << EOF
$(curl_gitlab "projects/$PROJECT_ID/merge_requests/$MR_ID" |
jq -r '[.source_branch, .target_branch] | @tsv')
EOF
echo "Try a local rebase for MR $MR_ID from $SOURCE_BRANCH to $TARGET_BRANCH"
read -r CI_PROJECT_PATH PROJECT_NAME << EOF
$(curl_gitlab "projects/$PROJECT_ID" |
jq -r '[.path_with_namespace, .path] | @tsv')
EOF
local GITLAB_USER_NAME="gitlab.ci1"
echo "Checkout https://${GITLAB_USER_NAME}@gitlab.com/${CI_PROJECT_PATH}.git in /tmp/$PROJECT_NAME"
cd /tmp
rm -rf $PROJECT_NAME
git clone "https://${GITLAB_USER_NAME}:${GITLAB_PAT}@gitlab.com/${CI_PROJECT_PATH}.git"
cd $PROJECT_NAME
git checkout $SOURCE_BRANCH
git config advice.detachedHead false
git config user.email $GITLAB_USER_EMAIL
git config user.name "$GITLAB_USER_NAME"
# setup subline merge https://github.com/paulaltin/git-subline-merge
git config merge.conflictStyle diff3
git config merge.subline.driver "$script_full_path/git-subline-merge %O %A %B %L %P"
git config merge.subline.recursive binary
echo "* merge=subline" >> .git/info/attributes
GIT_SUBLINE_MERGE_NON_INTERACTIVE_MODE=TRUE git rebase --verbose origin/$TARGET_BRANCH
pushed=$(git push -f 2>&1 )
if [ "$pushed" = "Everything up-to-date" ] ; then
echo "No changes pushed to MR"
return 1
else
sleep 10
return 0
fi
}
get_pipeline_status() {
PROJECT_ID=$1
MR_IID=$2
curl_gitlab "projects/$PROJECT_ID/merge_requests/$MR_IID" \
| jq -r ' "\(.pipeline.status)"'
}
get_merge_status() {
PROJECT_ID=$1
MR_IID=$2
curl_gitlab "projects/$PROJECT_ID/merge_requests/$MR_IID" \
| jq -r ' "\(.merge_status)"'
}
all_open_mrs() {
AUTHOR_ID=$1
PROJECT_ID=$2
curl_gitlab \
"projects/$PROJECT_ID/merge_requests?state=opened&scope=all&author_id=$AUTHOR_ID&sort=asc" \
| jq -r '.[] | "\(.iid)"' \
| tr '\r\n' ' '
}
nurture_mr() {
local project_id=$1
local mr_id=$2
local project_name=$3
echo "nurturing MR $mr_id" >&2
pipeline_status=$(get_pipeline_status $project_id $mr_id)
echo "pipeline status is $pipeline_status" >&2
merge_status=$(get_merge_status $project_id $mr_id)
echo "merge status is $merge_status" >&2
if [ "$pipeline_status" = "success" ] && [ "$merge_status" = "can_be_merged" ] ; then
if ! merge_mr $project_id $mr_id ; then
if rebase_local $project_id $mr_id ; then
merge_mr $project_id $mr_id true
fi
fi
else
if rebase_local $project_id $mr_id ; then
merge_mr $project_id $mr_id true
fi
fi
}
nurture_project() {
project_name=$1
project_id=$(get_project_id $PROJECTS_FILTER $project_name)
if [ "$project_id" = "" ] ; then
echo "no id found for project $project_name" >&2
exit 1
fi
echo "" >&2
echo "nurturing project $project_name (id $project_id)" >&2
echo "==========================================" >&2
local nurtured_count=0
local mrs=$(all_open_mrs $AUTHOR_ID $project_id)
echo "MRs found : [$mrs]" >&2
for mr_id in $mrs ; do
((nurtured_count < MAX_MRS)) || return 0
if nurture_mr $project_id $mr_id $project_name; then
((nurtured_count++))
fi
done
# finish with a return code 0 even if some MRs could not be merged / rebased
echo "finished with project $project_name (id $project_id)" >&2
echo "" >&2
}
usage() {
echo "Usage:"
echo "$0 GITLAB_ACCESS_TOKEN AUTHOR_ID FILTER 'name1 name2 name3' MAX_MRS"
echo
echo "examples:"
echo "$0 df12fs1sdd 12345 'groups/789/search?scope=projects&search=' 'a b c' 3"
echo "$0 df12fs1sdd 12345 'users/123/projects?search=' 'a b c'"
exit 1
}
[ $# -eq 5 ] || usage
GITLAB_PAT=$1
AUTHOR_ID=$2
PROJECTS_FILTER=$3
PROJECT_NAMES="$4"
MAX_MRS=$5
for project_name in $PROJECT_NAMES ; do
nurture_project $project_name
done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment