ARC SDK
Submitter.h
1 // -*- indent-tabs-mode: nil -*-
2 
3 #ifndef __ARC_SUBMITTER_H__
4 #define __ARC_SUBMITTER_H__
5 
6 #include <arc/UserConfig.h>
7 
8 #include <arc/compute/Endpoint.h>
9 #include <arc/compute/EndpointQueryingStatus.h>
10 #include <arc/compute/JobDescription.h>
11 #include <arc/compute/Job.h>
12 #include <arc/compute/SubmitterPlugin.h>
13 #include <arc/compute/ExecutionTarget.h>
14 #include <arc/compute/EntityRetriever.h>
15 
16 
17 namespace Arc {
18 
29  public:
32  UNKNOWN,
33  NOPLUGIN,
34  SUCCESSFUL
35  };
36 
38  static std::string str(EndpointSubmissionStatusType status);
39 
41  EndpointSubmissionStatus(EndpointSubmissionStatusType status = UNKNOWN, const std::string& description = "") : status(status), description(description) {};
42 
44  bool operator==(EndpointSubmissionStatusType s) const { return status == s; };
48  bool operator==(const EndpointSubmissionStatus& s) const { return status == s.status; };
50  bool operator!=(EndpointSubmissionStatusType s) const { return status != s; };
52  bool operator!=(const EndpointSubmissionStatus& s) const { return status != s.status; };
54  bool operator!() const { return status != SUCCESSFUL; };
56  operator bool() const { return status == SUCCESSFUL; };
57 
65  EndpointSubmissionStatus& operator=(const EndpointSubmissionStatus& s) { status = s.status; description = s.description; return *this; };
66 
68  EndpointSubmissionStatusType getStatus() const { return status; };
70  const std::string& getDescription() const { return description; };
74  std::string str() const { return str(status); };
75 
76  private:
78  std::string description;
79  };
80 
81  class SubmissionStatus;
82 
130  class Submitter {
131  public:
132  Submitter(const UserConfig& uc) : uc(uc) {}
133  ~Submitter() {}
134 
135  // === Using the consumer concept as in the EntityRetriever ===
136  void addConsumer(EntityConsumer<Job>& jc) { consumers.push_back(&jc); }
137  void removeConsumer(EntityConsumer<Job>& jc);
138  // ===
139 
140  // === No brokering ===
141 
163  SubmissionStatus Submit(const Endpoint& endpoint, const JobDescription& desc) { return Submit(endpoint, std::list<JobDescription>(1, desc)); }
175  SubmissionStatus Submit(const Endpoint& endpoint, const JobDescription& desc, Job& job);
184  SubmissionStatus Submit(const Endpoint& endpoint, const std::list<JobDescription>& descs);
194  SubmissionStatus Submit(const Endpoint& endpoint, const std::list<JobDescription>& descs, std::list<Job>& jobs);
212  SubmissionStatus Submit(const std::list<Endpoint>& endpoint, const std::list<JobDescription>& descs);
221  SubmissionStatus Submit(const std::list<Endpoint>& endpoint, const std::list<JobDescription>& descs, std::list<Job>& jobs);
222 
223  // ==== Submission to single configuration (adaption of job description) ====
224  // ===== Single job =====
225  SubmissionStatus Submit(const ExecutionTarget& et, const JobDescription& desc) { return Submit(et, std::list<JobDescription>(1, desc)); }
226  SubmissionStatus Submit(const ExecutionTarget& et, const JobDescription& desc, Job& job);
227  // =====
228  // ===== Multiple jobs =====
229  SubmissionStatus Submit(const ExecutionTarget& et, const std::list<JobDescription>& descs);
230  SubmissionStatus Submit(const ExecutionTarget& et, const std::list<JobDescription>& descs, std::list<Job>& jobs);
231  // =====
232  // ====
233  // ===
234 
235  // === Brokering with service discovery (multiple endpoints) ===
236  // ==== Using provided JobDescription objects for brokering ====
237  SubmissionStatus BrokeredSubmit(const std::list<std::string>& endpoints, const std::list<JobDescription>& descs, const std::list<std::string>& requestedSubmissionInterfaces = std::list<std::string>());
238  SubmissionStatus BrokeredSubmit(const std::list<std::string>& endpoints, const std::list<JobDescription>& descs, std::list<Job>&, const std::list<std::string>& requestedSubmissionInterfaces = std::list<std::string>());
239  SubmissionStatus BrokeredSubmit(const std::list<Endpoint>& endpoints, const std::list<JobDescription>& descs, const std::list<std::string>& requestedSubmissionInterfaces = std::list<std::string>());
240  SubmissionStatus BrokeredSubmit(const std::list<Endpoint>& endpoints, const std::list<JobDescription>& descs, std::list<Job>& jobs, const std::list<std::string>& requestedSubmissionInterfaces = std::list<std::string>());
241  // ====
242  // ===
243 
244  // === Methods for handling errors ===
245  const std::list<const JobDescription*>& GetDescriptionsNotSubmitted() const { return notsubmitted; }
246  void ClearNotSubmittedDescriptions() { notsubmitted.clear(); }
247 
248  const EndpointStatusMap& GetEndpointQueryingStatuses() const { return queryingStatusMap; }
249  void ClearEndpointQueryingStatuses() { queryingStatusMap.clear(); }
250 
251  const std::map<Endpoint, EndpointSubmissionStatus>& GetEndpointSubmissionStatuses() const { return submissionStatusMap; }
252  void ClearEndpointSubmissionStatuses() { submissionStatusMap.clear(); }
253 
254  void ClearAllStatuses() { queryingStatusMap.clear(); submissionStatusMap.clear(); }
255  void ClearAll() { notsubmitted.clear(); queryingStatusMap.clear(); submissionStatusMap.clear(); }
256  // ===
257 
258  private:
259  class ConsumerWrapper : public EntityConsumer<Job> {
260  public:
261  ConsumerWrapper(Submitter& s) : s(s) {}
262  void addEntity(const Job& j) {
263  for (std::list<EntityConsumer<Job>*>::iterator it = s.consumers.begin(); it != s.consumers.end(); ++it) {
264  (*it)->addEntity(j);
265  }
266  }
267  private:
268  Submitter& s;
269  };
270 
271  SubmissionStatus SubmitNoClear(const Endpoint& endpoint, const std::list<JobDescription>& descs);
272 
273  const UserConfig& uc;
274 
275  EndpointStatusMap queryingStatusMap;
276  std::map<Endpoint, EndpointSubmissionStatus> submissionStatusMap;
277 
278  std::list<const JobDescription*> notsubmitted;
279 
280  std::list<EntityConsumer<Job>*> consumers;
281 
282  static SubmitterPluginLoader& getLoader();
283 
284  static Logger logger;
285  };
286 }
287 
288 #endif // __ARC_SUBMITTER_H__