ARC SDK
Scheduler.h
1 #ifndef SCHEDULER_H_
2 #define SCHEDULER_H_
3 
4 #include <arc/JobPerfLog.h>
5 #include <arc/Thread.h>
6 #include <arc/Logger.h>
7 #include <arc/URL.h>
8 #include <arc/data/URLMap.h>
9 
10 #include "DTR.h"
11 #include "DTRList.h"
12 #include "Processor.h"
13 #include "DataDelivery.h"
14 #include "TransferShares.h"
15 
16 namespace DataStaging {
17 
19 
26 class Scheduler: public DTRCallback {
27 
28  private:
29 
31 
33  DTRList DtrList;
34 
36 
38  std::list<std::string> cancelled_jobs;
39 
41  std::list<DTR_ptr> events;
42 
44  std::map<std::string, std::list<DTR_ptr> > staged_queue;
45 
47  Arc::SimpleCondition cancelled_jobs_lock;
48 
50  TransferSharesConf transferSharesConf;
51 
53  Arc::URLMap url_map;
54 
56  std::string preferred_pattern;
57 
59  Arc::SimpleCondition state_lock;
60 
62  Arc::SimpleCondition event_lock;
63 
65  Arc::SimpleCondition run_signal;
66 
68  Arc::SimpleCondition dump_signal;
69 
71  unsigned int PreProcessorSlots;
73  unsigned int DeliverySlots;
75  unsigned int PostProcessorSlots;
77  unsigned int EmergencySlots;
79  unsigned int StagedPreparedSlots;
80 
82  std::string dumplocation;
83 
85  Arc::JobPerfLog job_perf_log;
86 
88  std::vector<Arc::URL> configured_delivery_services;
89 
92  std::map<Arc::URL, std::vector<std::string> > usable_delivery_services;
93 
95  Arc::Time delivery_last_checked;
96 
98  unsigned long long int remote_size_limit;
99 
101  std::map<std::string, int> delivery_hosts;
102 
104  static Arc::Logger logger;
105 
107  std::list<Arc::LogDestination*> root_destinations;
108 
110  ProcessState scheduler_state;
111 
113  Processor processor;
114 
116  DataDelivery delivery;
117 
119  static Scheduler* scheduler_instance;
120 
122  static Glib::Mutex instance_lock;
123 
125  Scheduler(const Scheduler&); // should not happen
127  Scheduler& operator=(const Scheduler&); // should not happen
128 
129  /* Functions to process every state of the DTR during normal workflow */
130 
132  void ProcessDTRNEW(DTR_ptr request);
134  void ProcessDTRCACHE_WAIT(DTR_ptr request);
136  void ProcessDTRCACHE_CHECKED(DTR_ptr request);
138  void ProcessDTRRESOLVED(DTR_ptr request);
140  void ProcessDTRREPLICA_QUERIED(DTR_ptr request);
142  void ProcessDTRPRE_CLEANED(DTR_ptr request);
144  void ProcessDTRSTAGING_PREPARING_WAIT(DTR_ptr request);
146  void ProcessDTRSTAGED_PREPARED(DTR_ptr request);
148  void ProcessDTRTRANSFERRED(DTR_ptr request);
150  void ProcessDTRREQUEST_RELEASED(DTR_ptr request);
152  void ProcessDTRREPLICA_REGISTERED(DTR_ptr request);
154  void ProcessDTRCACHE_PROCESSED(DTR_ptr request);
156  /* This is a special function to deal with states after which
157  * the DTR is returned to the generator, i.e. DONE, ERROR, CANCELLED */
158  void ProcessDTRFINAL_STATE(DTR_ptr request);
159 
163  void log_to_root_logger(Arc::LogLevel level, const std::string& message);
164 
166  void map_state_and_process(DTR_ptr request);
167 
169 
171  void map_cancel_state(DTR_ptr request);
172 
175  void map_stuck_state(DTR_ptr request);
176 
180  void choose_delivery_service(DTR_ptr request);
181 
184  void revise_queues();
185 
187  void add_event(DTR_ptr event);
188 
190  void process_events(void);
191 
193 
198  void next_replica(DTR_ptr request);
199 
201 
206  bool handle_mapped_source(DTR_ptr request, Arc::URL& mapped_url);
207 
209  static void dump_thread(void* arg);
210 
212  static void main_thread(void* arg);
214  void main_thread(void);
215 
216  public:
217 
219 
227  static Scheduler* getInstance();
228 
230  Scheduler();
231 
233  ~Scheduler() { stop(); };
234 
235  /* The following Set/Add methods are only effective when called before start() */
236 
238  void SetSlots(int pre_processor = 0, int post_processor = 0,
239  int delivery = 0, int emergency = 0, int staged_prepared = 0);
240 
242  void AddURLMapping(const Arc::URL& template_url, const Arc::URL& replacement_url,
243  const Arc::URL& access_url = Arc::URL());
244 
246  void SetURLMapping(const Arc::URLMap& mapping = Arc::URLMap());
247 
249 
257  void SetPreferredPattern(const std::string& pattern);
258 
260  void SetTransferSharesConf(const TransferSharesConf& share_conf);
261 
263  void SetTransferParameters(const TransferParameters& params);
264 
266  void SetDeliveryServices(const std::vector<Arc::URL>& endpoints);
267 
269  void SetRemoteSizeLimit(unsigned long long int limit);
270 
272  void SetDumpLocation(const std::string& location);
273 
275  void SetJobPerfLog(const Arc::JobPerfLog& perf_log);
276 
278 
283  bool start(void);
284 
286 
291  virtual void receiveDTR(DTR_ptr dtr);
292 
294  bool cancelDTRs(const std::string& jobid);
295 
297 
301  bool stop();
302  };
303 } // namespace DataStaging
304 
305 #endif /*SCHEDULER_H_*/
DataDelivery transfers data between specified physical locations.
Definition: DataDelivery.h:26
Simple triggered condition.
Definition: Thread.h:150
bool stop()
Tell the Scheduler to shut down all threads and exit.
void SetDeliveryServices(const std::vector< Arc::URL > &endpoints)
Set the list of delivery services. DTR::LOCAL_DELIVERY means local Delivery.
The base class from which all callback-enabled classes should be derived.
Definition: DTR.h:193
void SetPreferredPattern(const std::string &pattern)
Set the preferred pattern for ordering replicas.
A class for storing and manipulating times.
Definition: DateTime.h:125
The Processor performs pre- and post-transfer operations.
Definition: Processor.h:18
Scheduler()
Constructor, to be used when only one Generator uses this Scheduler.
void SetSlots(int pre_processor=0, int post_processor=0, int delivery=0, int emergency=0, int staged_prepared=0)
Set number of slots for processor and delivery stages.
TransferSharesConf describes the configuration of TransferShares.
Definition: TransferShares.h:17
Global list of all active DTRs in the system.
Definition: DTRList.h:17
A logger class.
Definition: Logger.h:493
Represents limits and properties of a DTR transfer. These generally apply to all DTRs.
Definition: DTR.h:91
void SetURLMapping(const Arc::URLMap &mapping=Arc::URLMap())
Replace all URL mapping entries.
static Scheduler * getInstance()
Get static instance of Scheduler, to use one DTR instance with multiple generators.
DataStaging contains all components for data transfer scheduling and execution.
Definition: DataDelivery.h:12
LogLevel
Logging levels for tagging and filtering log messages.
Definition: Logger.h:20
void SetTransferSharesConf(const TransferSharesConf &share_conf)
Set TransferShares configuration.
void SetDumpLocation(const std::string &location)
Set location for periodic dump of DTR state (only file paths currently supported) ...
Class to represent general URLs.
Definition: URL.h:88
URLMap allows mapping certain patterns of URLs to other URLs.
Definition: URLMap.h:22
bool cancelDTRs(const std::string &jobid)
Tell the Scheduler to cancel all the DTRs in the given job description.
virtual void receiveDTR(DTR_ptr dtr)
Callback method implemented from DTRCallback.
bool start(void)
Start scheduling activity.
void SetRemoteSizeLimit(unsigned long long int limit)
Set the remote transfer size limit.
~Scheduler()
Destructor calls stop(), which cancels all DTRs and waits for them to complete.
Definition: Scheduler.h:233
Definition: JobPerfLog.h:9
void AddURLMapping(const Arc::URL &template_url, const Arc::URL &replacement_url, const Arc::URL &access_url=Arc::URL())
Add URL mapping entry. See Arc::URLMap.
void SetTransferParameters(const TransferParameters &params)
Set transfer limits.
The Scheduler is the control centre of the data staging framework.
Definition: Scheduler.h:26
void SetJobPerfLog(const Arc::JobPerfLog &perf_log)
Set JobPerfLog object for performance metrics logging.
ProcessState
Internal state of StagingProcesses.
Definition: DTR.h:79
Wrapper for pointer with automatic destruction and multiple references.
Definition: Thread.h:436