#!/usr/bin/python2.6 -u # # Timothy Brooks 2012 # # PandaQuery - interface to the ATLAS pbook tool # import sys from re import search from os import environ, path from optparse import OptionParser from datetime import timedelta if __name__=='__main__': parser = OptionParser(version="%prog 0.3") parser.add_option("-p", "--period", default="5 minutes", help="polling period for sync " \ "default is \"5 minutes\"") parser.add_option("-r", "--retry_failed", default=False, action="store_true", help="retry all failed jobs " ) parser.add_option("-t", "--timeout", default="0 hours", help="timeout before retrying job elsewhere " \ "default is \"0 hours\" (0 means never)") parser.add_option("-l", "--length", default="1 days", help="get jobs upto LENGTH time ago " \ "default is \"1 days\"") parser.add_option("-s", "--string", default="Job[%(groupID)6s][%(site)24s] %(dbStatus)7s (%(statuses)s )", help="format for printing jobs " \ "default is \"Job[%(groupID)6s][%(site)24s] %(dbStatus)7s (%(statuses)s )\"") parser.add_option("-e", "--expression", action="append", help="show job if EXP evaluates true " \ "e.g: %prog -e\"job.site=ANALY_RHUL\"", metavar="EXP") parser.add_option("-c", "--condition", default=[], action="append", help="end if EXP evaluates true for all jobs " \ "e.g: %prog -c\"job.dbStatus='frozen'\"", metavar="EXP") parser.add_option("-a", "--action", default=[], action="append", help="execute EXP before ending if all conditions are True " \ "e.g: %prog -a\"call('dq2-get user.*/')\"", metavar="EXP") parser.add_option("-d", "--debug", default=False, action="store_true", help="print out debugging information during execution") (options, args) = parser.parse_args() period = str(options.period).split(' ') if len(period) == 1: period.append("minutes") options.period = eval("timedelta(%s = %d)" %(period[1], int(period[0]) ) ) timeout = str(options.timeout).split(' ') if len(timeout) == 1: timeout.append("hours") options.timeout = eval("timedelta(%s = %d)" %(timeout[1], int(timeout[0]) ) ) length = str(options.length).split(' ') if len(length) == 1: length.append("days") options.length = eval("timedelta(%s = %d)" %(length[1], int(length[0]) ) ) # import pbook from cvmfs: try: tmp, __name__ = __name__, 'pbook' execfile(path.join(environ["ATLAS_LOCAL_ROOT_BASE"], "x86_64/PandaClient/current/bin/pbook") ) __name__ = tmp except ImportError: print("Failed to load PandaClient, please set up locally") sys.exit(1) from time import sleep from subprocess import call as _call def call(*args, **kwargs): if not kwargs.has_key('shell'): kwargs['shell'] = True _call(*args, **kwargs) possible_statuses = ["failed", "running", "starting", "tobedone", "activated", "picked", "defined", "holding", "finished", "cancelled"] def all(iterable): """True if all elements of iterable are True - backport from python2.6""" for element in iterable: if not element: return False return True def total_seconds(td): """number of seconds in a timedelta""" return td.seconds + td.microseconds / 1E6 + td.days * 86400 def main(options): n_days = 1 enforceEnter = False verbose = False restoreDB = False pbookCore = PBookCore(enforceEnter, verbose, restoreDB) while True: pbookCore.sync() start_time = (datetime.datetime.utcnow() - options.length ).strftime('%Y-%m-%d %H:%M:%S') # status, jobIDs = Client.getJobIDsInTimeRange(start_time) status, jobIDs, jediDict = Client.getJobIDsJediTasksInTimeRange(start_time) if options.debug: # print(Client.getJobIDsJediTasksInTimeRange(start_time)) print(status, jobIDs) print(jediDict) if jobIDs == None: sleep(total_seconds(options.period) ) continue jobIDs.sort(reverse = True) complete = [] print("\nJobs since %s" % start_time) jobs = [pbookCore.getJobInfo(jobID) for jobID in jobIDs] jobs.reverse() for job in jobs: if not job: continue status = job.jobStatus.split(',') age = datetime.datetime.utcnow() - job.creationTime if job and (not options.expression or all([eval(c) for c in options.expression]) ): if options.condition: complete.append(all([eval(c) for c in options.condition]) ) if options.retry_failed and "failed" in status: pbookCore.retry(job.groupID) statuses = "" for stat in possible_statuses: if status.count(stat): statuses += "%9s: %4d" %(stat, status.count(stat) ) if options.timeout and age > options.timeout and status.count('activated') and status.count('defined'): print("Retrying Job[%6s] - %s" %(job.groupID, job.inDS) ) pbookCore.kill(job.groupID) pbookCore.retry(job.groupID, newSite = True, retryBuild = True) attrs = [attr for attr in dir(job) if attr[0] != "_"] pats = ['\d{6}', 'period[^.]+'] matches = [search(pat, job.jobName) for pat in pats] runNumber = [s.group() for s in matches if s] runNumber = runNumber[0] if runNumber else None # match = search('\d{6}', job.jobName) # runNumber = match.group() if match else None # match = search('period[^.]+', job.jobName) # if match: # runNumber = match.group() age = str(age).split('.')[0] jobdict = {'statuses': statuses, 'age': age, 'runNumber': runNumber} for attr in attrs: # print("%s: %s" %(attr, getattr(job, attr))) jobdict[attr] = getattr(job, attr) print(options.string %(jobdict) ) if options.period == timedelta(0): break if complete and all(complete): for action in options.action: eval(action) break print("") sleep(total_seconds(options.period) ) if __name__=='__main__': try: main(options) except KeyboardInterrupt: # python2.4+ safe print("Interrupting execution") _onExit(tmpDir,historyFile) print("Done") sys.exit(0) # except Exception, exception: # Historic exception value passing for python2.4 # print("Exception %s" % exception) # print(exception.args) # _onExit(tmpDir,historyFile) # raise exception # Re-raise exception for user to see # finally: # python2.4 doesn't support finally clause print("Execution finished") _onExit(tmpDir,historyFile) print("Done") sys.exit(0) # vim: sw=2 sts=2