ARC SDK
Scheduler.h
1 #ifndef SCHEDULER_H_
2 #define SCHEDULER_H_
3 
4 #include <arc/Thread.h>
5 #include <arc/Logger.h>
6 #include <arc/URL.h>
7 #include <arc/data/URLMap.h>
8 
9 #include "DTR.h"
10 #include "DTRList.h"
11 #include "Processor.h"
12 #include "DataDelivery.h"
13 #include "TransferShares.h"
14 
15 namespace DataStaging {
16 
18 
25 class Scheduler: public DTRCallback {
26 
27  private:
28 
30 
32  DTRList DtrList;
33 
35 
37  std::list<std::string> cancelled_jobs;
38 
40  std::list<DTR_ptr> events;
41 
43  std::map<std::string, std::list<DTR_ptr> > staged_queue;
44 
46  Arc::SimpleCondition cancelled_jobs_lock;
47 
49  TransferSharesConf transferSharesConf;
50 
52  Arc::URLMap url_map;
53 
55  std::string preferred_pattern;
56 
58  Arc::SimpleCondition state_lock;
59 
61  Arc::SimpleCondition event_lock;
62 
64  Arc::SimpleCondition run_signal;
65 
67  Arc::SimpleCondition dump_signal;
68 
70  unsigned int PreProcessorSlots;
72  unsigned int DeliverySlots;
74  unsigned int PostProcessorSlots;
76  unsigned int EmergencySlots;
78  unsigned int StagedPreparedSlots;
79 
81  std::string dumplocation;
82 
84  std::vector<Arc::URL> configured_delivery_services;
85 
88  std::map<Arc::URL, std::vector<std::string> > usable_delivery_services;
89 
91  Arc::Time delivery_last_checked;
92 
94  unsigned long long int remote_size_limit;
95 
97  static Arc::Logger logger;
98 
100  std::list<Arc::LogDestination*> root_destinations;
101 
103  ProcessState scheduler_state;
104 
106  Processor processor;
107 
109  DataDelivery delivery;
110 
112  static Scheduler* scheduler_instance;
113 
115  static Glib::Mutex instance_lock;
116 
118  Scheduler(const Scheduler&); // should not happen
120  Scheduler& operator=(const Scheduler&); // should not happen
121 
122  /* Functions to process every state of the DTR during normal workflow */
123 
125  void ProcessDTRNEW(DTR_ptr request);
127  void ProcessDTRCACHE_WAIT(DTR_ptr request);
129  void ProcessDTRCACHE_CHECKED(DTR_ptr request);
131  void ProcessDTRRESOLVED(DTR_ptr request);
133  void ProcessDTRREPLICA_QUERIED(DTR_ptr request);
135  void ProcessDTRPRE_CLEANED(DTR_ptr request);
137  void ProcessDTRSTAGING_PREPARING_WAIT(DTR_ptr request);
139  void ProcessDTRSTAGED_PREPARED(DTR_ptr request);
141  void ProcessDTRTRANSFERRED(DTR_ptr request);
143  void ProcessDTRREQUEST_RELEASED(DTR_ptr request);
145  void ProcessDTRREPLICA_REGISTERED(DTR_ptr request);
147  void ProcessDTRCACHE_PROCESSED(DTR_ptr request);
149  /* This is a special function to deal with states after which
150  * the DTR is returned to the generator, i.e. DONE, ERROR, CANCELLED */
151  void ProcessDTRFINAL_STATE(DTR_ptr request);
152 
156  void log_to_root_logger(Arc::LogLevel level, const std::string& message);
157 
159  void map_state_and_process(DTR_ptr request);
160 
162 
164  void map_cancel_state(DTR_ptr request);
165 
168  void map_stuck_state(DTR_ptr request);
169 
173  void choose_delivery_service(DTR_ptr request);
174 
177  void revise_queues();
178 
180  void add_event(DTR_ptr event);
181 
183  void process_events(void);
184 
186 
191  void next_replica(DTR_ptr request);
192 
194 
199  bool handle_mapped_source(DTR_ptr request, Arc::URL& mapped_url);
200 
202  static void dump_thread(void* arg);
203 
205  static void main_thread(void* arg);
207  void main_thread(void);
208 
209  public:
210 
212 
220  static Scheduler* getInstance();
221 
223  Scheduler();
224 
226  ~Scheduler() { stop(); };
227 
228  /* The following Set/Add methods are only effective when called before start() */
229 
231  void SetSlots(int pre_processor = 0, int post_processor = 0,
232  int delivery = 0, int emergency = 0, int staged_prepared = 0);
233 
235  void AddURLMapping(const Arc::URL& template_url, const Arc::URL& replacement_url,
236  const Arc::URL& access_url = Arc::URL());
237 
239  void SetURLMapping(const Arc::URLMap& mapping = Arc::URLMap());
240 
242 
250  void SetPreferredPattern(const std::string& pattern);
251 
253  void SetTransferSharesConf(const TransferSharesConf& share_conf);
254 
256  void SetTransferParameters(const TransferParameters& params);
257 
259  void SetDeliveryServices(const std::vector<Arc::URL>& endpoints);
260 
262  void SetRemoteSizeLimit(unsigned long long int limit);
263 
265  void SetDumpLocation(const std::string& location);
266 
268 
273  bool start(void);
274 
276 
281  virtual void receiveDTR(DTR_ptr dtr);
282 
284  bool cancelDTRs(const std::string& jobid);
285 
287 
291  bool stop();
292  };
293 } // namespace DataStaging
294 
295 #endif /*SCHEDULER_H_*/