ARC SDK
Job Filtering

Table of Contents

When managing multiple jobs it is a speedup and may be more convenient to use the JobSupervisor class, instead of working on single Job objects. In the JobSupervisor class jobs can be filtered so operations can be limited to a subset of jobs. Such examples are shown below:

C++

#include <iostream>
#include <arc/DateTime.h>
#include <arc/Logger.h>
#include <arc/UserConfig.h>
#include <arc/compute/Job.h>
#include <arc/compute/JobSupervisor.h>
/*
* Create a JobSelector class in order to specify a custom selection to be used
* with the JobSupervisor class.
*/
// Extend the arc.compute.JobSelector class and the select method.
class ThreeDaysOldJobSelector : public Arc::JobSelector {
public:
ThreeDaysOldJobSelector() {
now = Arc::Time();
three_days = Arc::Period(60*60*24*3);
//three_days = Arc::Period("P3D") // ISO duration
//three_days = Arc::Period(3*Arc::Time.DAY)
}
// The select method recieves a arc.compute.Job instance and must return a
// boolean, indicating whether the job should be selected or rejected.
// All attributes of the arc.compute.Job object can be used in this method.
bool Select(const Arc::Job& job) const {
return (now - job.EndTime) > three_days;
}
private:
Arc::Time now;
Arc::Period three_days;
};
int main(int argc, char** argv) {
Arc::LogStream logcerr(std::cerr);
j.JobManagementInterfaceName = "org.ogf.glue.emies.activitymanagement";
j.JobManagementURL = Arc::URL("https://localhost");
j.JobStatusInterfaceName = "org.ogf.glue.emies.activitymanagement";
j.JobStatusURL = Arc::URL("https://localhost");
j.JobID = "test-job-1-day-old";
j.EndTime = Arc::Time()-Arc::Period("P1D");
js.AddJob(j);
j.JobID = "test-job-2-days-old";
j.EndTime = Arc::Time()-Arc::Period("P2D");
js.AddJob(j);
j.JobID = "test-job-3-days-old";
j.EndTime = Arc::Time()-Arc::Period("P3D");
js.AddJob(j);
j.JobID = "test-job-4-days-old";
j.EndTime = Arc::Time()-Arc::Period("P4D");
js.AddJob(j);
ThreeDaysOldJobSelector selector;
js.Select(selector);
std::list<Arc::Job> selectedJobs = js.GetSelectedJobs();
for (std::list<Arc::Job>::iterator itJ = selectedJobs.begin();
itJ != selectedJobs.end(); ++itJ) {
std::cout << itJ->JobID << std::endl;
}
// Make operation on selected jobs. E.g.:
//js.Clean()
return 0;
}

Python

Select jobs using custom class

#!/usr/bin/env python
'''
Create a JobSelector class in order to specify a custom selection to be used
with the JobSupervisor class.
'''
from __future__ import print_function
import arc, sys
# Extend the arc.compute.JobSelector class and the select method.
class ThreeDaysOldJobSelector(arc.compute.JobSelector):
def __init__(self):
super(ThreeDaysOldJobSelector, self).__init__()
self.now = arc.common.Time()
self.three_days = arc.common.Period(60*60*24*3)
#self.three_days = arc.common.Period("P3D") # ISO duration
#self.three_days = arc.common.Period(3*arc.common.Time.DAY)
# The select method recieves a arc.compute.Job instance and must return a
# boolean, indicating whether the job should be selected or rejected.
# All attributes of the arc.compute.Job object can be used in this method.
def Select(self, job):
return (self.now - job.EndTime) > self.three_days
uc = arc.common.UserConfig()
arc.common.Logger_getRootLogger().addDestination(arc.common.LogStream(sys.stderr))
arc.common.Logger_getRootLogger().setThreshold(arc.common.VERBOSE)
j = arc.compute.Job()
j.JobManagementInterfaceName = "org.ogf.glue.emies.activitymanagement"
j.JobManagementURL = arc.common.URL("https://localhost")
j.JobStatusInterfaceName = "org.ogf.glue.emies.activitymanagement"
j.JobStatusURL = arc.common.URL("https://localhost")
js = arc.compute.JobSupervisor(uc)
j.JobID = "test-job-1-day-old"
j.EndTime = arc.common.Time()-arc.common.Period("P1D")
js.AddJob(j)
j.JobID = "test-job-2-days-old"
j.EndTime = arc.common.Time()-arc.common.Period("P2D")
js.AddJob(j)
j.JobID = "test-job-3-days-old"
j.EndTime = arc.common.Time()-arc.common.Period("P3D")
js.AddJob(j)
j.JobID = "test-job-4-days-old"
j.EndTime = arc.common.Time()-arc.common.Period("P4D")
js.AddJob(j)
selector = ThreeDaysOldJobSelector()
js.Select(selector)
for j in js.GetSelectedJobs():
print(j.JobID)
# Make operation on selected jobs. E.g.:
#js.Clean()

Select jobs based on job state

#! /usr/bin/env python
import arc
import sys
def example():
uc = arc.UserConfig()
# Create a JobSupervisor to handle all the jobs
job_supervisor = arc.JobSupervisor(uc)
# Retrieve all the jobs from this computing element
endpoint = arc.Endpoint("https://piff.hep.lu.se:443/arex", arc.Endpoint.JOBLIST)
sys.stdout.write("Querying %s for jobs...\n" % endpoint.str())
retriever = arc.JobListRetriever(uc)
retriever.addConsumer(job_supervisor)
retriever.addEndpoint(endpoint)
retriever.wait()
sys.stdout.write("%s jobs found\n" % len(job_supervisor.GetAllJobs()))
sys.stdout.write("Getting job states...\n")
# Update the states of the jobs
job_supervisor.Update()
# Print state of updated jobs
sys.stdout.write("The jobs have the following states: %s\n"%(", ".join([job.State.GetGeneralState() for job in job_supervisor.GetAllJobs()])))
# Select failed jobs
job_supervisor.SelectByStatus(["Failed"])
failed_jobs = job_supervisor.GetSelectedJobs()
sys.stdout.write("The failed jobs:\n")
for job in failed_jobs:
job.SaveToStream(arc.CPyOstream(sys.stdout), True)
# wait for all the background threads to finish before we destroy the objects they may use
import atexit
@atexit.register
def wait_exit():
arc.ThreadInitializer().waitExit()
# arc.Logger.getRootLogger().addDestination(arc.LogStream(sys.stderr))
# arc.Logger.getRootLogger().setThreshold(arc.DEBUG)
# run the example
example()