When managing multiple jobs it is a speedup and may be more convenient to use the JobSupervisor class, instead of working on single Job objects. In the JobSupervisor class jobs can be filtered so operations can be limited to a subset of jobs. Such examples are shown below:
C++
#include <iostream>
#include <arc/DateTime.h>
#include <arc/Logger.h>
#include <arc/UserConfig.h>
#include <arc/compute/Job.h>
#include <arc/compute/JobSupervisor.h>
public:
ThreeDaysOldJobSelector() {
}
bool Select(
const Arc::Job& job)
const {
return (now - job.EndTime) > three_days;
}
private:
};
int main(int argc, char** argv) {
j.JobManagementInterfaceName = "org.ogf.glue.emies.activitymanagement";
j.JobManagementURL =
Arc::URL(
"https://localhost");
j.JobStatusInterfaceName = "org.ogf.glue.emies.activitymanagement";
j.JobStatusURL =
Arc::URL(
"https://localhost");
j.JobID = "test-job-1-day-old";
js.AddJob(j);
j.JobID = "test-job-2-days-old";
js.AddJob(j);
j.JobID = "test-job-3-days-old";
js.AddJob(j);
j.JobID = "test-job-4-days-old";
js.AddJob(j);
ThreeDaysOldJobSelector selector;
js.Select(selector);
std::list<Arc::Job> selectedJobs = js.GetSelectedJobs();
for (std::list<Arc::Job>::iterator itJ = selectedJobs.begin();
itJ != selectedJobs.end(); ++itJ) {
std::cout << itJ->JobID << std::endl;
}
return 0;
}
Python
Select jobs using custom class
'''
Create a JobSelector class in order to specify a custom selection to be used
with the JobSupervisor class.
'''
from __future__ import print_function
import arc, sys
class ThreeDaysOldJobSelector(arc.compute.JobSelector):
def __init__(self):
super(ThreeDaysOldJobSelector, self).__init__()
self.now = arc.common.Time()
self.three_days = arc.common.Period(60*60*24*3)
def Select(self, job):
return (self.now - job.EndTime) > self.three_days
uc = arc.common.UserConfig()
arc.common.Logger_getRootLogger().addDestination(arc.common.LogStream(sys.stderr))
arc.common.Logger_getRootLogger().setThreshold(arc.common.VERBOSE)
j = arc.compute.Job()
j.JobManagementInterfaceName = "org.ogf.glue.emies.activitymanagement"
j.JobManagementURL = arc.common.URL("https://localhost")
j.JobStatusInterfaceName = "org.ogf.glue.emies.activitymanagement"
j.JobStatusURL = arc.common.URL("https://localhost")
js = arc.compute.JobSupervisor(uc)
j.JobID = "test-job-1-day-old"
j.EndTime = arc.common.Time()-arc.common.Period("P1D")
js.AddJob(j)
j.JobID = "test-job-2-days-old"
j.EndTime = arc.common.Time()-arc.common.Period("P2D")
js.AddJob(j)
j.JobID = "test-job-3-days-old"
j.EndTime = arc.common.Time()-arc.common.Period("P3D")
js.AddJob(j)
j.JobID = "test-job-4-days-old"
j.EndTime = arc.common.Time()-arc.common.Period("P4D")
js.AddJob(j)
selector = ThreeDaysOldJobSelector()
js.Select(selector)
for j in js.GetSelectedJobs():
print(j.JobID)
Select jobs based on job state
import arc
import sys
def example():
uc = arc.UserConfig()
job_supervisor = arc.JobSupervisor(uc)
endpoint = arc.Endpoint("https://piff.hep.lu.se:443/arex", arc.Endpoint.JOBLIST)
sys.stdout.write("Querying %s for jobs...\n" % endpoint.str())
retriever = arc.JobListRetriever(uc)
retriever.addConsumer(job_supervisor)
retriever.addEndpoint(endpoint)
retriever.wait()
sys.stdout.write("%s jobs found\n" % len(job_supervisor.GetAllJobs()))
sys.stdout.write("Getting job states...\n")
job_supervisor.Update()
sys.stdout.write(
"The jobs have the following states: %s\n"%(
", ".
join([job.State.GetGeneralState()
for job
in job_supervisor.GetAllJobs()])))
job_supervisor.SelectByStatus(["Failed"])
failed_jobs = job_supervisor.GetSelectedJobs()
sys.stdout.write("The failed jobs:\n")
for job in failed_jobs:
job.SaveToStream(arc.CPyOstream(sys.stdout), True)
import atexit
@atexit.register
def wait_exit():
arc.ThreadInitializer().waitExit()
example()