#!/usr/bin/python2.4 """ Layered Subversion Client V0.3 Copyright (C) 2005 Erich Schubert This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You can find the GNU General Public Licence at http://www.gnu.org/licenses/gpl.html or included with your favourite Linux Distribution. Changelog: V0.1: * initial public release * Update works v0.1.1: * check out missing layers automatically * beautify some output * hide some output in first run, which will come again in second v0.2: * added 'update', 'up-rep', 'explain' and 'status' commands v0.2.1: * added globbing for svn:ignore emulation v0.2.2: * added handling for non-writeable files * added 'diff' and 'revert' command for named files (undefined behaviour globbing or recursive) v0.3: * added authentication support (caching disabled) * minor fixes and tweaks Known issues: * checkin command not yet written * if a file was replaced by a dir an exception occurs """ import os, sys, shutil, fnmatch, getpass import pysvn from os import path from xml.dom import pulldom # This is pretty much the default subversion ignore list, with some debian and VIM strings added. global_ignores = [ "*.o", "*.lo", "*.la", "#*#", ".*.rej", "*.rej", ".*~", "*~", ".#*", ".DS_Store", "*.dpkg-dist", "*.dpkg-old", ".*.swp" ] """ Explain usage """ def usage(): print '''Supported commands: \texplain\t\tExplain layers \tupdate\tup\tUpdate repository and working copy \tup-rep\t\tUpdate repository only \tstatus\t\tShow status for full repository \tdiff\t\tPrint diff for single files (no dir support) \trevert\t\tRevert a file to its base (no dir support)''' """ Prompt for username and password """ def get_login( realm, username, may_save ): print "Authentication realm: %s" % realm sys.stdout.write("Username: ") username = sys.stdin.readline()[:-1] password = getpass.getpass("Password for '%s': " % username) return True, username, password, False """ Find document root, which is defined by being the first parent containing a .laysvn subdirectory Containing our laysvn.xml configuration file there. """ def locate_docroot(): global docroot, treeloc, configdir docroot = os.getcwd() while not os.access(path.join(docroot,".laysvn"), os.R_OK): if docroot == "/": raise "No laysvn directory found." docroot = path.dirname(docroot) # current location in tree treeloc = os.getcwd() treeloc = treeloc[len(docroot)+1:len(treeloc)] configdir = path.join(docroot,".laysvn") """ Load the laysvn configuration file and parse it """ class configFile: title = None layers = [] rlayers = [] #default_layer = None storage = None client = None """ Create a new configuration object """ def __init__(self): if not docroot or not configdir: raise "Document root has not been found" if not os.access(path.join(configdir,"laysvn.xml"), os.R_OK): raise "No configuration file readable in dir "+configdir self.parse_config(pulldom.parse(path.join(configdir,"laysvn.xml"))) self.process_layers() self.check_config() """ Parse the XML configuration file """ def parse_config(self, reader): for (event, node) in reader: if node is None: break if event is pulldom.START_ELEMENT: if node.nodeName == "config": self.title = node.getAttribute("title") if node.nodeName == "storage": self.storage = node.getAttribute("path") if node.nodeName == "layer": id = node.getAttribute("id") source = node.getAttribute("source") self.layers.append( layer(self,id,source) ) # if node.nodeName == "default-layer": # self.default_layer = node.getAttribute("layerid") """ Initialize layers (to use shared pysvn client) and storage path # Find the default layer by ID # This will replace the str self.default_layer with the layer object it refers to """ def process_layers(self): # # locate the "default" layer # if self.default_layer.__class__ is str: # for layer in self.layers: # if layer.id == self.default_layer: # self.default_layer = layer # break # if not self.default_layer.__class__ is layer: # raise "Default layer could not be found" # prepare all layers # this will make them call getClient() for layer in self.layers: layer.prepare() # clone and reverse the layers self.rlayers = list(self.layers) self.rlayers.reverse() """ Do some basic correctness tests on the configuration (i.e. complete?) """ def check_config(self): if self.title is None: raise "Configuration doesn't give a title for this repository" # if self.default_layer is None: # raise "No default layer was given." if len(self.layers) is 0: raise "No layers were configured" if self.storage is None: raise "No storage root was given" if not os.access(self.storage, os.R_OK | os.W_OK): raise "Storage directory does not exist or is not writeable" """ Explain which layers are available """ def explain_layers(self): print "Configuration '%s' has the following layers:" % self.title for layer in self.layers: layer.explain() """ Checkout or update all layers """ def update_missing_layers(self): for layer in self.layers: if not os.access(path.join(layer.storage, ".svn"), os.F_OK): layer.update_or_checkout() """ Checkout or update all layers """ def update_layers(self): for layer in self.layers: layer.update_or_checkout() """ To access shared pysvn client """ def getClient(self): if not self.client: self.client = pysvn.Client() self.client.callback_get_login = get_login return self.client """ Find layer for file """ def find_layer_for_file(self, file, rl = "all"): if rl == "all": rl = self.rlayers in_layer = None ignored = False for layer in rl: try: #print "Testing file '%s' on layer '%s'" % (file, layer.id) state = layer.find(file) if state.ignored(): ignored = True in_layer = layer break if state.found(): in_layer = layer break # if the parent directory doesn't exist in this layer we get an exception except pysvn._pysvn.ClientError: pass return (in_layer, ignored) """ Copy a file to the matching layer """ def copy_to_layers(self, file, visfile, rl): (in_layer, ignored) = self.find_layer_for_file(file, rl) if in_layer is not None: if not ignored: newstate = in_layer.copy_to_layer(file) #if newstate.modified(): # print "M %-6s %s" % (layer.id, visfile) #if newstate.added(): # print "A %-6s %s" % (layer.id, visfile) else: # Report the file as unknown print "? %-6s %s" % ("-", visfile) """ Copy a file from its layer """ def copy_from_layers(self, file, visfile): (in_layer, ignored) = self.find_layer_for_file(file) if in_layer is not None: if not ignored: newstate = in_layer.copy_from_layer(file) if newstate.modified(): print "M %-6s %s" % (in_layer.id, visfile) if newstate.added(): print "A %-6s %s" % (in_layer.id, visfile) else: #print "Ignored file '%s' at layer '%s'." % (file, layer.id) # only print "I" status if the file exists in this layer # since we will have the case that a file is "removed" by # setting the file on ignore in a higher layer if os.access(path.join(in_layer.storage, file), os.F_OK): print "I %-6s %s" % (in_layer.id, visfile) else: # this file was ignored on all layers, but exists in some layers? # moste likely an unresolved conflict... inlayer = "-" for layer in self.rlayers: if os.access(path.join(layer.storage, file), os.F_OK): inlayer = layer.id print "? %-6s %s" % (inlayer, visfile) """ Diff a file """ def diff_file(self, file, visfile): (in_layer, ignored) = self.find_layer_for_file(file) if in_layer is None: print "No layer specified for file '%s'" % visfile elif ignored: print "File '%s' was ignored on layer %s" % (visfile, in_layer.id) else: in_layer.diff(file, visfile) """ Diff multiple files """ def diff(self, files): if files.__class__ is list: for file in files: self.diff_file(path.join(treeloc, file), file) elif files.__class__ is str: self.diff_file(path.join(treeloc, files), files) else: raise "Invalid parameter for diff" """ Revert a file """ def revert_file(self, file, visfile): (in_layer, ignored) = self.find_layer_for_file(file) if in_layer is None: print "No layer specified for file '%s'" % visfile elif ignored: print "File '%s' was ignored on layer %s" % (visfile, in_layer.id) else: in_layer.revert(file) in_layer.copy_from_layer(file) print "%s reverted to layer %s" % (visfile, in_layer.id) """ Revert multiple files """ def revert(self, files): if files.__class__ is list: for file in files: self.revert_file(path.join(treeloc, file), file) elif files.__class__ is str: self.revert_file(path.join(treeloc, files), files) else: raise "Invalid parameter for revert" """ Update run """ def update(self): self.update_to_layers() self.update_layers() self.update_from_layers() """ Status """ def status(self): self.update_to_layers() self.update_from_layers() """ Copy files to their layers """ def update_to_layers(self): l1 = len(docroot) + 1 l2 = len(path.join(docroot, treeloc)) # in the first step, we copy the current files to their locations... for p, dirs, files in os.walk(path.join(docroot, treeloc)): # Skip .laysvn directory try: dirs.remove(".laysvn") except ValueError: pass p1 = p[l1:len(p)] p2 = p[l2:len(p)] rl = [] if p1 == "": rl = list(self.rlayers) else: for layer in self.rlayers: if path.isdir( path.join( layer.storage, p1 )): rl.append(layer) if len(rl) == 0: pass #print "! %-6s %s" % ("-", p2) else: for file in files: self.copy_to_layers(path.join(p1, file), path.join(p2, file), rl) """ Merge layers into repository """ def update_from_layers(self): files_to_handle = set() for layer in self.layers: l3 = len(layer.storage)+1 for p, dirs, files in os.walk(path.join(layer.storage, treeloc)): # Skip .svn directory try: dirs.remove(".svn") except ValueError: pass p1 = p[l3:len(p)] for file in files: files_to_handle.add(path.join(p1, file)) lt = len(treeloc) for file in files_to_handle: self.copy_from_layers(file, file[lt:len(file)]) """ Representing a subversion layer """ class layer: id = None source = None config = None revision = None client = None storage = None def __init__(self, config, id, source): self.config = config self.id = id self.source = source def prepare(self): self.client = self.config.getClient() self.storage = path.join(self.config.storage, self.id) def explain(self): print "Layer ID: '%s' is '%s'" % (self.id, self.source) def update_or_checkout(self): if os.access(path.join(self.storage,".svn"), os.R_OK | os.W_OK): self.revision = self.client.update( self.storage ) print "Layer '%s' updated to revision %d" % (self.id, self.revision.number) else: if os.access(self.storage, os.F_OK): raise "The directory '%s' already exists. Please remove prior to checkout." % self.storage os.mkdir(self.storage) self.revision = self.client.checkout( self.source, self.storage ) print "Layer '%s' checked out" % self.id def find(self, file): #print "finding file '%s' in layer '%s'" % (path.join(self.storage,file), self.id) status = self.client.status(path.join(self.storage,file)) # note that the API is really really weird here. If the file is unversioned, we don't get a result # if the file does not exist... EXCEPT if it would be ignored. if len(status) != 1: # is this maybe ignored? dirname = path.join(self.storage, path.dirname(file)) prop_list = self.client.propget( "svn:ignore", dirname) if len(prop_list) == 1: if test_ignore(path.basename(file), prop_list.values()[0]): return svnState(pysvn.wc_status_kind.ignored) if test_ignore(path.basename(file), global_ignores): return svnState(pysvn.wc_status_kind.ignored); return svnState(None) else: # print "state for '%s' in '%s' is '%s'" % (file, self.id, status[0].text_status) return svnState(status[0].text_status) def copy_to_layer(self, file): sname = path.join(docroot, file) dname = path.join(self.storage, file) if path.islink(sname): linkto = os.readlink(sname) if path.islink(dname) and os.readlink(dname) == linkto: pass else: if os.access(dname, os.F_OK): os.remove(dname) os.symlink(linkto, dname) s = os.lstat( sname ) os.lchown( dname, s.st_uid, s.st_gid) else: # if file is not writeable but exists if not os.access(dname, os.W_OK) and os.access(dname, os.F_OK): os.chmod(dname, 0600) os.unlink(dname) shutil.copy2(sname, dname) # get a new status status = self.client.status(dname) # not that status will be different from what we get if the file would not exist at all... if len(status) != 1: return svnState(None) else: # print "state for '%s' in '%s' is '%s'" % (file, self.id, status[0].text_status) return svnState(status[0].text_status) """ Recursive call to create missing directories, while copying permissions from the reference in the storage dir """ def makedirs(self,sname,dname): if not os.access(dname, os.F_OK): self.makedirs(path.dirname(sname), path.dirname(dname)) os.mkdir(dname, 0700) shutil.copystat( sname, dname ) def copy_from_layer(self, file): sname = path.join(self.storage, file) dname = path.join(docroot, file) # ensure the parent path names do exist... self.makedirs(path.dirname(sname), path.dirname(dname)) if path.islink(sname): linkto = os.readlink(sname) if path.islink(dname) and os.readlink(dname) == linkto: pass else: if os.access(dname, os.F_OK): os.remove(dname) os.symlink(linkto, dname) s = os.lstat( sname ) os.lchown( dname, s.st_uid, s.st_gid) else: # if file is not writeable but exists if not os.access(dname, os.W_OK) and os.access(dname, os.F_OK): os.chmod(dname, 0600) os.unlink(dname) shutil.copy2(sname, dname) # get a new status status = self.client.status(sname) # not that status will be different from what we get if the file would not exist at all... if len(status) != 1: return svnState(None) else: # print "state for '%s' in '%s' is '%s'" % (file, self.id, status[0].text_status) return svnState(status[0].text_status) def diff(self, file, visfile): diff_text = self.client.diff('./tmp-file-prefix-',path.join(self.storage,file)) print diff_text def revert(self, file): self.client.revert(path.join(self.storage,file)) """ Status object """ class svnState: textstatus = None def __init__(self, textstatus): self.textstatus = textstatus def ignored(self): return (self.textstatus == pysvn.wc_status_kind.ignored) def found(self): if self.textstatus is None: return False return not (self.textstatus == pysvn.wc_status_kind.unversioned) def added(self): return (self.textstatus == pysvn.wc_status_kind.added) def modified(self): return (self.textstatus == pysvn.wc_status_kind.modified) """ Ignore emulation """ def test_ignore(filename, ignores): if ignores.__class__ is list: patterns = ignores elif ignores.__class__ is str: patterns = ignores.split("\n") else: raise "Unknown object for ignore list: %s" % ignores.__class__ for pattern in global_ignores: #print "Testing %s against %s" % (filename, pattern) if fnmatch.fnmatch(filename, pattern): return True for pattern in patterns: #print "Testing %s against %s" % (filename, pattern) if fnmatch.fnmatch(filename, pattern): return True return False locate_docroot() config = configFile() if len(sys.argv) <= 1: usage() sys.exit(1) if sys.argv[1] == "update" or sys.argv[1] == "up": config.update_missing_layers() config.update() elif sys.argv[1] == "explain": config.explain_layers() elif sys.argv[1] == "up-rep": config.update_layers() elif sys.argv[1] == "status": config.status() elif sys.argv[1] == "diff": config.diff(sys.argv[2:len(sys.argv)]) elif sys.argv[1] == "revert": config.revert(sys.argv[2:len(sys.argv)]) else: usage() sys.exit(0)