00001 #ifndef SCHEDULER_H_
00002 #define SCHEDULER_H_
00003
00004 #include <arc/Thread.h>
00005 #include <arc/Logger.h>
00006 #include <arc/URL.h>
00007 #include <arc/data/URLMap.h>
00008
00009 #include "DTR.h"
00010 #include "DTRList.h"
00011 #include "Processor.h"
00012 #include "DataDelivery.h"
00013 #include "TransferShares.h"
00014
00015 namespace DataStaging {
00016
00018
00023 class Scheduler: public DTRCallback {
00024
00025 private:
00026
00028
00030 DTRList DtrList;
00031
00033
00035 std::list<std::string> cancelled_jobs;
00036
00038 std::list<DTR_ptr> events;
00039
00041 std::list<DTR_ptr> staged_queue;
00042
00044 Arc::SimpleCondition cancelled_jobs_lock;
00045
00047 TransferSharesConf transferSharesConf;
00048
00050 Arc::URLMap url_map;
00051
00053 std::string preferred_pattern;
00054
00056 Arc::SimpleCondition event_lock;
00057
00059 Arc::SimpleCondition run_signal;
00060
00062 Arc::SimpleCondition dump_signal;
00063
00065 int PreProcessorSlots;
00067 int DeliverySlots;
00069 int PostProcessorSlots;
00071 int EmergencySlots;
00073 int StagedPreparedSlots;
00074
00076 std::string dumplocation;
00077
00079 std::vector<Arc::URL> configured_delivery_services;
00080
00083 std::map<Arc::URL, std::vector<std::string> > usable_delivery_services;
00084
00086 Arc::Time delivery_last_checked;
00087
00089 unsigned long long int remote_size_limit;
00090
00092 static Arc::Logger logger;
00093
00095 std::list<Arc::LogDestination*> root_destinations;
00096
00098 ProcessState scheduler_state;
00099
00101 Processor processor;
00102
00104 DataDelivery delivery;
00105
00107 Scheduler(const Scheduler&);
00109 Scheduler& operator=(const Scheduler&);
00110
00111
00112
00114 void ProcessDTRNEW(DTR_ptr request);
00116 void ProcessDTRCACHE_WAIT(DTR_ptr request);
00118 void ProcessDTRCACHE_CHECKED(DTR_ptr request);
00120 void ProcessDTRRESOLVED(DTR_ptr request);
00122 void ProcessDTRREPLICA_QUERIED(DTR_ptr request);
00124 void ProcessDTRPRE_CLEANED(DTR_ptr request);
00126 void ProcessDTRSTAGING_PREPARING_WAIT(DTR_ptr request);
00128 void ProcessDTRSTAGED_PREPARED(DTR_ptr request);
00130 void ProcessDTRTRANSFERRED(DTR_ptr request);
00132 void ProcessDTRREQUEST_RELEASED(DTR_ptr request);
00134 void ProcessDTRREPLICA_REGISTERED(DTR_ptr request);
00136 void ProcessDTRCACHE_PROCESSED(DTR_ptr request);
00138
00139
00140 void ProcessDTRFINAL_STATE(DTR_ptr request);
00141
00145 void log_to_root_logger(Arc::LogLevel level, const std::string& message);
00146
00148 void map_state_and_process(DTR_ptr request);
00149
00151
00153 void map_cancel_state(DTR_ptr request);
00154
00157 void map_stuck_state(DTR_ptr request);
00158
00162 void choose_delivery_service(DTR_ptr request);
00163
00166 void revise_queues();
00167
00169 void add_event(DTR_ptr event);
00170
00172 void process_events(void);
00173
00175
00180 void next_replica(DTR_ptr request);
00181
00183
00188 bool handle_mapped_source(DTR_ptr request, Arc::URL& mapped_url);
00189
00191 static void dump_thread(void* arg);
00192
00194 static void main_thread(void* arg);
00196 void main_thread(void);
00197
00198 public:
00199
00201 Scheduler();
00202
00204 ~Scheduler() { stop(); };
00205
00206
00207
00209 void SetSlots(int pre_processor = 0, int post_processor = 0,
00210 int delivery = 0, int emergency = 0, int staged_prepared = 0);
00211
00213 void AddURLMapping(const Arc::URL& template_url, const Arc::URL& replacement_url,
00214 const Arc::URL& access_url = Arc::URL());
00215
00217 void SetURLMapping(const Arc::URLMap& mapping = Arc::URLMap());
00218
00220 void SetPreferredPattern(const std::string& pattern);
00221
00223 void SetTransferSharesConf(const TransferSharesConf& share_conf);
00224
00226 void SetTransferParameters(const TransferParameters& params);
00227
00229 void SetDeliveryServices(const std::vector<Arc::URL>& endpoints);
00230
00232 void SetRemoteSizeLimit(unsigned long long int limit);
00233
00235 void SetDumpLocation(const std::string& location);
00236
00238
00241 bool start(void);
00242
00244
00247 virtual void receiveDTR(DTR_ptr dtr);
00248
00250 bool cancelDTRs(const std::string& jobid);
00251
00253
00255 bool stop();
00256 };
00257 }
00258
00259 #endif