#!/usr/bin/env python3
|
#
|
# Barnum, a Patchset Tool (pt)
|
#
|
# This Git wrapper script is designed to reduce the ceremony of working with Gitblit patchsets.
|
#
|
# Copyright 2014 gitblit.com.
|
#
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
# you may not use this file except in compliance with the License.
|
# You may obtain a copy of the License at
|
#
|
# http://www.apache.org/licenses/LICENSE-2.0
|
#
|
# Unless required by applicable law or agreed to in writing, software
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# See the License for the specific language governing permissions and
|
# limitations under the License.
|
#
|
#
|
# Usage:
|
#
|
# pt fetch <id> [-p,--patchset <n>]
|
# pt checkout <id> [-p,--patchset <n>] [-f,--force]
|
# pt pull <id> [-p,--patchset <n>]
|
# pt push [<id>] [-i,--ignore] [-f,--force] [-m,--milestone <milestone>] [-t,--topic <topic>] [-cc <user> <user>]
|
# pt start <topic> | <id>
|
# pt propose [new | <branch> | <id>] [-i,--ignore] [-m,--milestone <milestone>] [-t,--topic <topic>] [-cc <user> <user>]
|
# pt cleanup [<id>]
|
#
|
|
__author__ = 'James Moger'
|
__version__ = '1.0.6'
|
|
import subprocess
|
import argparse
|
import errno
|
import sys
|
|
|
def fetch(args):
|
"""
|
fetch(args)
|
|
Fetches the specified patchset for the ticket from the specified remote.
|
"""
|
|
__resolve_remote(args)
|
|
# fetch the patchset from the remote repository
|
|
if args.patchset is None:
|
# fetch all current ticket patchsets
|
print("Fetching ticket patchsets from the '{}' repository".format(args.remote))
|
if args.quiet:
|
__call(['git', 'fetch', '-p', args.remote, '--quiet'])
|
else:
|
__call(['git', 'fetch', '-p', args.remote])
|
else:
|
# fetch specific patchset
|
__resolve_patchset(args)
|
print("Fetching ticket {} patchset {} from the '{}' repository".format(args.id, args.patchset, args.remote))
|
patchset_ref = 'refs/tickets/{:02d}/{:d}/{:d}'.format(args.id % 100, args.id, args.patchset)
|
if args.quiet:
|
__call(['git', 'fetch', args.remote, patchset_ref, '--quiet'])
|
else:
|
__call(['git', 'fetch', args.remote, patchset_ref])
|
|
return
|
|
|
def checkout(args):
|
"""
|
checkout(args)
|
|
Checkout the patchset on a named branch.
|
"""
|
|
__resolve_uncommitted_changes_checkout(args)
|
fetch(args)
|
|
# collect local branch names
|
branches = []
|
for branch in __call(['git', 'branch']):
|
if branch[0] == '*':
|
branches.append(branch[1:].strip())
|
else:
|
branches.append(branch.strip())
|
|
if args.patchset is None or args.patchset is 0:
|
branch = 'ticket/{:d}'.format(args.id)
|
illegals = set(branches) & {'ticket'}
|
else:
|
branch = 'patchset/{:d}/{:d}'.format(args.id, args.patchset)
|
illegals = set(branches) & {'patchset', 'patchset/{:d}'.format(args.id)}
|
|
# ensure there are no local branch names that will interfere with branch creation
|
if len(illegals) > 0:
|
print('')
|
print('Sorry, can not complete the checkout for ticket {}.'.format(args.id))
|
print("The following branches are blocking '{}' branch creation:".format(branch))
|
for illegal in illegals:
|
print(' ' + illegal)
|
exit(errno.EINVAL)
|
|
if args.patchset is None or args.patchset is 0:
|
# checkout the current ticket patchset
|
if args.force:
|
__call(['git', 'checkout', '-B', branch, '{}/{}'.format(args.remote, branch)])
|
else:
|
__call(['git', 'checkout', branch])
|
else:
|
# checkout a specific patchset
|
__checkout(args.remote, args.id, args.patchset, branch, args.force)
|
|
return
|
|
|
def pull(args):
|
"""
|
pull(args)
|
|
Pull (fetch & merge) a ticket patchset into the current branch.
|
"""
|
|
__resolve_uncommitted_changes_checkout(args)
|
__resolve_remote(args)
|
|
# reset the checkout before pulling
|
__call(['git', 'reset', '--hard'])
|
|
# pull the patchset from the remote repository
|
if args.patchset is None or args.patchset is 0:
|
print("Pulling ticket {} from the '{}' repository".format(args.id, args.remote))
|
patchset_ref = 'ticket/{:d}'.format(args.id)
|
else:
|
__resolve_patchset(args)
|
print("Pulling ticket {} patchset {} from the '{}' repository".format(args.id, args.patchset, args.remote))
|
patchset_ref = 'refs/tickets/{:02d}/{:d}/{:d}'.format(args.id % 100, args.id, args.patchset)
|
|
if args.squash:
|
__call(['git', 'pull', '--squash', '--no-log', '--no-rebase', args.remote, patchset_ref], echo=True)
|
else:
|
__call(['git', 'pull', '--commit', '--no-ff', '--no-log', '--no-rebase', args.remote, patchset_ref], echo=True)
|
|
return
|
|
|
def push(args):
|
"""
|
push(args)
|
|
Push your patchset update or a patchset rewrite.
|
"""
|
|
if args.id is None:
|
# try to determine ticket and patchset from current branch name
|
for line in __call(['git', 'status', '-b', '-s']):
|
if line[0:2] == '##':
|
branch = line[2:].strip()
|
segments = branch.split('/')
|
if len(segments) >= 2:
|
if segments[0] == 'ticket' or segments[0] == 'patchset':
|
if '...' in segments[1]:
|
args.id = int(segments[1][:segments[1].index('...')])
|
else:
|
args.id = int(segments[1])
|
args.patchset = None
|
|
if args.id is None:
|
print('Please specify a ticket id for the push command.')
|
exit(errno.EINVAL)
|
|
__resolve_uncommitted_changes_push(args)
|
__resolve_remote(args)
|
|
if args.force:
|
# rewrite a patchset for an existing ticket
|
push_ref = 'refs/for/' + str(args.id)
|
else:
|
# fast-forward update to an existing patchset
|
push_ref = 'refs/heads/ticket/{:d}'.format(args.id)
|
|
ref_params = __get_pushref_params(args)
|
ref_spec = 'HEAD:' + push_ref + ref_params
|
|
print("Pushing your patchset to the '{}' repository".format(args.remote))
|
__call(['git', 'push', args.remote, ref_spec], echo=True)
|
return
|
|
|
def start(args):
|
"""
|
start(args)
|
|
Start development of a topic on a new branch.
|
"""
|
|
# collect local branch names
|
branches = []
|
for branch in __call(['git', 'branch']):
|
if branch[0] == '*':
|
branches.append(branch[1:].strip())
|
else:
|
branches.append(branch.strip())
|
|
branch = 'topic/' + args.topic
|
illegals = set(branches) & {'topic', branch}
|
|
# ensure there are no local branch names that will interfere with branch creation
|
if len(illegals) > 0:
|
print('Sorry, can not complete the creation of the topic branch.')
|
print("The following branches are blocking '{}' branch creation:".format(branch))
|
for illegal in illegals:
|
print(' ' + illegal)
|
exit(errno.EINVAL)
|
|
__call(['git', 'checkout', '-b', branch])
|
|
return
|
|
|
def propose(args):
|
"""
|
propose_patchset(args)
|
|
Push a patchset to create a new proposal ticket or to attach a proposal patchset to an existing ticket.
|
"""
|
|
__resolve_uncommitted_changes_push(args)
|
__resolve_remote(args)
|
|
curr_branch = None
|
push_ref = None
|
if args.target is None:
|
# see if the topic is a ticket id
|
# else default to new
|
for branch in __call(['git', 'branch']):
|
if branch[0] == '*':
|
curr_branch = branch[1:].strip()
|
if curr_branch.startswith('topic/'):
|
topic = curr_branch[6:].strip()
|
try:
|
int(topic)
|
push_ref = topic
|
except ValueError:
|
pass
|
if push_ref is None:
|
push_ref = 'new'
|
else:
|
push_ref = args.target
|
|
try:
|
# check for current patchset and current branch
|
args.id = int(push_ref)
|
args.patchset = __get_current_patchset(args.remote, args.id)
|
if args.patchset > 0:
|
print('You can not propose a patchset for ticket {} because it already has one.'.format(args.id))
|
|
# check current branch for accidental propose instead of push
|
for line in __call(['git', 'status', '-b', '-s']):
|
if line[0:2] == '##':
|
branch = line[2:].strip()
|
segments = branch.split('/')
|
if len(segments) >= 2:
|
if segments[0] == 'ticket':
|
if '...' in segments[1]:
|
args.id = int(segments[1][:segments[1].index('...')])
|
else:
|
args.id = int(segments[1])
|
args.patchset = None
|
print("You are on the '{}' branch, perhaps you meant to push instead?".format(branch))
|
elif segments[0] == 'patchset':
|
args.id = int(segments[1])
|
args.patchset = int(segments[2])
|
print("You are on the '{}' branch, perhaps you meant to push instead?".format(branch))
|
exit(errno.EINVAL)
|
except ValueError:
|
pass
|
|
ref_params = __get_pushref_params(args)
|
ref_spec = 'HEAD:refs/for/{}{}'.format(push_ref, ref_params)
|
|
print("Pushing your proposal to the '{}' repository".format(args.remote))
|
for line in __call(['git', 'push', args.remote, ref_spec, '-q'], echo=True, err=subprocess.STDOUT):
|
fields = line.split(':')
|
if fields[0] == 'remote' and fields[1].strip().startswith('--> #'):
|
# set the upstream branch configuration
|
args.id = int(fields[1].strip()[len('--> #'):])
|
__call(['git', 'fetch', '-p', args.remote])
|
__call(['git', 'branch', '-u', '{}/ticket/{:d}'.format(args.remote, args.id)])
|
break
|
|
return
|
|
|
def cleanup(args):
|
"""
|
cleanup(args)
|
|
Removes local branches for the ticket.
|
"""
|
|
if args.id is None:
|
branches = __call(['git', 'branch', '--list', 'ticket/*'])
|
branches += __call(['git', 'branch', '--list', 'patchset/*'])
|
else:
|
branches = __call(['git', 'branch', '--list', 'ticket/{:d}'.format(args.id)])
|
branches += __call(['git', 'branch', '--list', 'patchset/{:d}/*'.format(args.id)])
|
|
if len(branches) == 0:
|
print("No local branches found for ticket {}, cleanup skipped.".format(args.id))
|
return
|
|
if not args.force:
|
print('Cleanup would remove the following local branches for ticket {}.'.format(args.id))
|
for branch in branches:
|
if branch[0] == '*':
|
print(' ' + branch[1:].strip() + ' (skip)')
|
else:
|
print(' ' + branch)
|
print("To discard these local branches, repeat this command with '--force'.")
|
exit(errno.EINVAL)
|
|
for branch in branches:
|
if branch[0] == '*':
|
print('Skipped {} because it is the current branch.'.format(branch[1:].strip()))
|
continue
|
__call(['git', 'branch', '-D', branch.strip()], echo=True)
|
|
return
|
|
|
def __resolve_uncommitted_changes_checkout(args):
|
"""
|
__resolve_uncommitted_changes_checkout(args)
|
|
Ensures the current checkout has no uncommitted changes that would be discarded by a checkout or pull.
|
"""
|
|
status = __call(['git', 'status', '--porcelain'])
|
for line in status:
|
if not args.force and line[0] != '?':
|
print('Your local changes to the following files would be overwritten by {}:'.format(args.command))
|
print('')
|
for state in status:
|
print(state)
|
print('')
|
print("To discard your local changes, repeat the {} with '--force'.".format(args.command))
|
print('NOTE: forcing a {} will HARD RESET your working directory!'.format(args.command))
|
exit(errno.EINVAL)
|
|
|
def __resolve_uncommitted_changes_push(args):
|
"""
|
__resolve_uncommitted_changes_push(args)
|
|
Ensures the current checkout has no uncommitted changes that should be part of a propose or push.
|
"""
|
|
status = __call(['git', 'status', '--porcelain'])
|
for line in status:
|
if not args.ignore and line[0] != '?':
|
print('You have local changes that have not been committed:')
|
print('')
|
for state in status:
|
print(state)
|
print('')
|
print("To ignore these uncommitted changes, repeat the {} with '--ignore'.".format(args.command))
|
exit(errno.EINVAL)
|
|
|
def __resolve_remote(args):
|
"""
|
__resolve_remote(args)
|
|
Identifies the git remote to use for fetching and pushing patchsets by parsing .git/config.
|
"""
|
|
remotes = __call(['git', 'remote'])
|
|
if len(remotes) == 0:
|
# no remotes defined
|
print("Please define a Git remote")
|
exit(errno.EINVAL)
|
elif len(remotes) == 1:
|
# only one remote, use it
|
args.remote = remotes[0]
|
return
|
else:
|
# multiple remotes, read .git/config
|
output = __call(['git', 'config', '--local', 'patchsets.remote'], fail=False)
|
preferred = output[0] if len(output) > 0 else ''
|
|
if len(preferred) == 0:
|
print("You have multiple remote repositories and you have not configured 'patchsets.remote'.")
|
print("")
|
print("Available remote repositories:")
|
for remote in remotes:
|
print(' ' + remote)
|
print("")
|
print("Please set the remote repository to use for patchsets.")
|
print(" git config --local patchsets.remote <remote>")
|
exit(errno.EINVAL)
|
else:
|
try:
|
remotes.index(preferred)
|
except ValueError:
|
print("The '{}' repository specified in 'patchsets.remote' is not configured!".format(preferred))
|
print("")
|
print("Available remotes:")
|
for remote in remotes:
|
print(' ' + remote)
|
print("")
|
print("Please set the remote repository to use for patchsets.")
|
print(" git config --local patchsets.remote <remote>")
|
exit(errno.EINVAL)
|
|
args.remote = preferred
|
return
|
|
|
def __resolve_patchset(args):
|
"""
|
__resolve_patchset(args)
|
|
Resolves the current patchset or validates the the specified patchset exists.
|
"""
|
if args.patchset is None:
|
# resolve current patchset
|
args.patchset = __get_current_patchset(args.remote, args.id)
|
|
if args.patchset == 0:
|
# there are no patchsets for the ticket or the ticket does not exist
|
print("There are no patchsets for ticket {} in the '{}' repository".format(args.id, args.remote))
|
exit(errno.EINVAL)
|
else:
|
# validate specified patchset
|
args.patchset = __validate_patchset(args.remote, args.id, args.patchset)
|
|
if args.patchset == 0:
|
# there are no patchsets for the ticket or the ticket does not exist
|
print("Patchset {} for ticket {} can not be found in the '{}' repository".format(args.patchset, args.id, args.remote))
|
exit(errno.EINVAL)
|
|
return
|
|
|
def __validate_patchset(remote, ticket, patchset):
|
"""
|
__validate_patchset(remote, ticket, patchset)
|
|
Validates that the specified ticket patchset exists.
|
"""
|
|
nps = 0
|
patchset_ref = 'refs/tickets/{:02d}/{:d}/{:d}'.format(ticket % 100, ticket, patchset)
|
for line in __call(['git', 'ls-remote', remote, patchset_ref]):
|
ps = int(line.split('/')[4])
|
if ps > nps:
|
nps = ps
|
|
if nps == patchset:
|
return patchset
|
return 0
|
|
|
def __get_current_patchset(remote, ticket):
|
"""
|
__get_current_patchset(remote, ticket)
|
|
Determines the most recent patchset for the ticket by listing the remote patchset refs
|
for the ticket and parsing the patchset numbers from the resulting set.
|
"""
|
|
nps = 0
|
patchset_refs = 'refs/tickets/{:02d}/{:d}/*'.format(ticket % 100, ticket)
|
for line in __call(['git', 'ls-remote', remote, patchset_refs]):
|
ps = int(line.split('/')[4])
|
if ps > nps:
|
nps = ps
|
|
return nps
|
|
|
def __checkout(remote, ticket, patchset, branch, force=False):
|
"""
|
__checkout(remote, ticket, patchset, branch)
|
__checkout(remote, ticket, patchset, branch, force)
|
|
Checkout the patchset on a detached head or on a named branch.
|
"""
|
|
has_branch = False
|
on_branch = False
|
|
if branch is None or len(branch) == 0:
|
# checkout the patchset on a detached head
|
print('Checking out ticket {} patchset {} on a detached HEAD'.format(ticket, patchset))
|
__call(['git', 'checkout', 'FETCH_HEAD'], echo=True)
|
return
|
else:
|
# checkout on named branch
|
|
# determine if we are already on the target branch
|
for line in __call(['git', 'branch', '--list', branch]):
|
has_branch = True
|
if line[0] == '*':
|
# current branch (* name)
|
on_branch = True
|
|
if not has_branch:
|
if force:
|
# force the checkout the patchset to the new named branch
|
# used when there are local changes to discard
|
print("Forcing checkout of ticket {} patchset {} on named branch '{}'".format(ticket, patchset, branch))
|
__call(['git', 'checkout', '-b', branch, 'FETCH_HEAD', '--force'], echo=True)
|
else:
|
# checkout the patchset to the new named branch
|
__call(['git', 'checkout', '-b', branch, 'FETCH_HEAD'], echo=True)
|
return
|
|
if not on_branch:
|
# switch to existing local branch
|
__call(['git', 'checkout', branch], echo=True)
|
|
#
|
# now we are on the local branch for the patchset
|
#
|
|
if force:
|
# reset HEAD to FETCH_HEAD, this drops any local changes
|
print("Forcing checkout of ticket {} patchset {} on named branch '{}'".format(ticket, patchset, branch))
|
__call(['git', 'reset', '--hard', 'FETCH_HEAD'], echo=True)
|
return
|
else:
|
# try to merge the existing ref with the FETCH_HEAD
|
merge = __call(['git', 'merge', '--ff-only', branch, 'FETCH_HEAD'], echo=True, fail=False)
|
if len(merge) is 1:
|
up_to_date = merge[0].lower().index('up-to-date') > 0
|
if up_to_date:
|
return
|
elif len(merge) is 0:
|
print('')
|
print("Your '{}' branch has diverged from patchset {} on the '{}' repository.".format(branch, patchset, remote))
|
print('')
|
print("To discard your local changes, repeat the checkout with '--force'.")
|
print('NOTE: forcing a checkout will HARD RESET your working directory!')
|
exit(errno.EINVAL)
|
return
|
|
|
def __get_pushref_params(args):
|
"""
|
__get_pushref_params(args)
|
|
Returns the push ref parameters for ticket field assignments.
|
"""
|
|
params = []
|
|
if args.milestone is not None:
|
params.append('m=' + args.milestone)
|
|
if args.topic is not None:
|
params.append('t=' + args.topic)
|
else:
|
for branch in __call(['git', 'branch']):
|
if branch[0] == '*':
|
b = branch[1:].strip()
|
if b.startswith('topic/'):
|
topic = b[len('topic/'):]
|
try:
|
# ignore ticket id topics
|
int(topic)
|
except:
|
# topic is a string
|
params.append('t=' + topic)
|
|
if args.responsible is not None:
|
params.append('r=' + args.responsible)
|
|
if args.cc is not None:
|
for cc in args.cc:
|
params.append('cc=' + cc)
|
|
if len(params) > 0:
|
return '%' + ','.join(params)
|
|
return ''
|
|
|
def __call(cmd_args, echo=False, fail=True, err=None):
|
"""
|
__call(cmd_args)
|
|
Executes the specified command as a subprocess. The output is parsed and returned as a list
|
of strings. If the process returns a non-zero exit code, the script terminates with that
|
exit code. Std err of the subprocess is passed-through to the std err of the parent process.
|
"""
|
|
p = subprocess.Popen(cmd_args, stdout=subprocess.PIPE, stderr=err, universal_newlines=True)
|
lines = []
|
for line in iter(p.stdout.readline, b''):
|
line_str = str(line).strip()
|
if len(line_str) is 0:
|
break
|
lines.append(line_str)
|
if echo:
|
print(line_str)
|
p.wait()
|
if fail and p.returncode is not 0:
|
exit(p.returncode)
|
|
return lines
|
|
#
|
# define the acceptable arguments and their usage/descriptions
|
#
|
|
# force argument
|
force_arg = argparse.ArgumentParser(add_help=False)
|
force_arg.add_argument('-f', '--force', default=False, help='force the command to complete', action='store_true')
|
|
# quiet argument
|
quiet_arg = argparse.ArgumentParser(add_help=False)
|
quiet_arg.add_argument('-q', '--quiet', default=False, help='suppress git stderr output', action='store_true')
|
|
# ticket & patchset arguments
|
ticket_args = argparse.ArgumentParser(add_help=False)
|
ticket_args.add_argument('id', help='the ticket id', type=int)
|
ticket_args.add_argument('-p', '--patchset', help='the patchset number', type=int)
|
|
# push refspec arguments
|
push_args = argparse.ArgumentParser(add_help=False)
|
push_args.add_argument('-i', '--ignore', default=False, help='ignore uncommitted changes', action='store_true')
|
push_args.add_argument('-m', '--milestone', help='set the milestone')
|
push_args.add_argument('-r', '--responsible', help='set the responsible user')
|
push_args.add_argument('-t', '--topic', help='set the topic')
|
push_args.add_argument('-cc', nargs='+', help='specify accounts to add to the watch list')
|
|
# the commands
|
parser = argparse.ArgumentParser(description='a Patchset Tool for Gitblit Tickets')
|
parser.add_argument('--version', action='version', version='%(prog)s {}'.format(__version__))
|
commands = parser.add_subparsers(dest='command', title='commands')
|
|
fetch_parser = commands.add_parser('fetch', help='fetch a patchset', parents=[ticket_args, quiet_arg])
|
fetch_parser.set_defaults(func=fetch)
|
|
checkout_parser = commands.add_parser('checkout', aliases=['co'],
|
help='fetch & checkout a patchset to a branch',
|
parents=[ticket_args, force_arg, quiet_arg])
|
checkout_parser.set_defaults(func=checkout)
|
|
pull_parser = commands.add_parser('pull',
|
help='fetch & merge a patchset into the current branch',
|
parents=[ticket_args, force_arg])
|
pull_parser.add_argument('-s', '--squash',
|
help='squash the pulled patchset into your working directory',
|
default=False,
|
action='store_true')
|
pull_parser.set_defaults(func=pull)
|
|
push_parser = commands.add_parser('push', aliases=['up'],
|
help='upload your patchset changes',
|
parents=[push_args, force_arg])
|
push_parser.add_argument('id', help='the ticket id', nargs='?', type=int)
|
push_parser.set_defaults(func=push)
|
|
propose_parser = commands.add_parser('propose', help='propose a new ticket or the first patchset', parents=[push_args])
|
propose_parser.add_argument('target', help="the ticket id, 'new', or the integration branch", nargs='?')
|
propose_parser.set_defaults(func=propose)
|
|
cleanup_parser = commands.add_parser('cleanup', aliases=['rm'],
|
help='remove local ticket branches',
|
parents=[force_arg])
|
cleanup_parser.add_argument('id', help='the ticket id', nargs='?', type=int)
|
cleanup_parser.set_defaults(func=cleanup)
|
|
start_parser = commands.add_parser('start', help='start a new branch for the topic or ticket')
|
start_parser.add_argument('topic', help="the topic or ticket id")
|
start_parser.set_defaults(func=start)
|
|
if len(sys.argv) < 2:
|
parser.parse_args(['--help'])
|
else:
|
# parse the command-line arguments
|
script_args = parser.parse_args()
|
|
# exec the specified command
|
script_args.func(script_args)
|