00001 #ifndef DTR_H_
00002 #define DTR_H_
00003
00004 #ifdef HAVE_CONFIG_H
00005 #include <config.h>
00006 #endif
00007
00008 #include <arc/data/DataHandle.h>
00009 #include <arc/CheckSum.h>
00010 #include <arc/data/URLMap.h>
00011 #include <arc/DateTime.h>
00012 #include <arc/Logger.h>
00013 #include <arc/User.h>
00014 #include <arc/UserConfig.h>
00015 #include <arc/Thread.h>
00016 #include "DTRStatus.h"
00017
00019 namespace DataStaging {
00020
00021 class DTR;
00022
00024 typedef Arc::ThreadedPointer<DTR> DTR_ptr;
00025
00027 typedef Arc::ThreadedPointer<Arc::Logger> DTRLogger;
00028
00030 enum StagingProcesses {GENERATOR, SCHEDULER, PRE_PROCESSOR, DELIVERY, POST_PROCESSOR};
00031
00033 enum ProcessState {INITIATED, RUNNING, TO_STOP, STOPPED};
00034
00037 class TransferParameters {
00038 public:
00041 unsigned long long int min_average_bandwidth;
00044 unsigned int max_inactivity_time;
00048 unsigned long long int min_current_bandwidth;
00050 unsigned int averaging_time;
00052 TransferParameters() : min_average_bandwidth(0), max_inactivity_time(0),
00053 min_current_bandwidth(0), averaging_time(0) {};
00054 };
00055
00057 class DTRCacheParameters {
00058 public:
00060 std::vector<std::string> cache_dirs;
00062 std::vector<std::string> remote_cache_dirs;
00064 std::vector<std::string> drain_cache_dirs;
00066 DTRCacheParameters(void) {};
00068 DTRCacheParameters(std::vector<std::string> caches,
00069 std::vector<std::string> remote_caches,
00070 std::vector<std::string> drain_caches);
00071 };
00072
00074 enum CacheState {
00075 CACHEABLE,
00076 NON_CACHEABLE,
00077 CACHE_ALREADY_PRESENT,
00078 CACHE_DOWNLOADED,
00079 CACHE_LOCKED,
00080 CACHE_SKIP,
00081 CACHE_NOT_USED
00082 };
00083
00085
00091 class DTRCallback {
00092 public:
00094 virtual ~DTRCallback() {};
00100 virtual void receiveDTR(DTR_ptr dtr) = 0;
00101
00102
00103
00104 };
00105
00107
00166 class DTR {
00167
00168 private:
00170 std::string DTR_ID;
00171
00173 Arc::URL source_url;
00174 Arc::URL destination_url;
00175 Arc::UserConfig cfg;
00176
00178 Arc::DataHandle source_endpoint;
00180 Arc::DataHandle destination_endpoint;
00181
00183 std::string source_url_str;
00185 std::string destination_url_str;
00186
00188
00189
00190
00191 std::string cache_file;
00192
00194 DTRCacheParameters cache_parameters;
00195
00197 CacheState cache_state;
00198
00200 Arc::User user;
00201
00204 bool rfc_proxy;
00205
00207 std::string parent_job_id;
00208
00210 int priority;
00211
00213 std::string transfershare;
00214
00216
00219 std::string sub_share;
00220
00222 unsigned int tries_left;
00223
00225 unsigned int initial_tries;
00226
00228 bool replication;
00229
00231
00235 bool force_registration;
00236
00238
00239 std::string mapped_source;
00240
00242 DTRStatus status;
00243
00245 DTRErrorStatus error_status;
00246
00248 unsigned long long int bytes_transferred;
00249
00251
00252 Arc::Time timeout;
00254 Arc::Time created;
00256 Arc::Time last_modified;
00258 Arc::Time next_process_time;
00259
00261 bool cancel_request;
00262
00264 bool bulk_start;
00266 bool bulk_end;
00268 bool source_supports_bulk;
00269
00271
00272 Arc::URL delivery_endpoint;
00273
00275 std::vector<Arc::URL> problematic_delivery_endpoints;
00276
00278 bool use_host_cert_for_remote_delivery;
00279
00281 StagingProcesses current_owner;
00282
00284
00286 DTRLogger logger;
00287
00289
00292 std::list<Arc::LogDestination*> log_destinations;
00293
00295 std::map<StagingProcesses,std::list<DTRCallback*> > proc_callback;
00296
00298 Arc::SimpleCondition lock;
00299
00303
00306
00310
00312 void mark_modification () { last_modified.SetTime(time(NULL)); };
00313
00315 DTR& operator=(const DTR& dtr);
00316 DTR(const DTR& dtr);
00317
00318 public:
00319
00321 static const Arc::URL LOCAL_DELIVERY;
00322
00324 DTR();
00325
00327
00338 DTR(const std::string& source,
00339 const std::string& destination,
00340 const Arc::UserConfig& usercfg,
00341 const std::string& jobid,
00342 const uid_t& uid,
00343 DTRLogger log);
00344
00346 ~DTR() {};
00347
00349 operator bool() const {
00350 return (!DTR_ID.empty());
00351 }
00353 bool operator!() const {
00354 return (DTR_ID.empty());
00355 }
00356
00358
00361 void registerCallback(DTRCallback* cb, StagingProcesses owner);
00362
00364 std::list<DTRCallback*> get_callbacks(const std::map<StagingProcesses, std::list<DTRCallback*> >& proc_callback,
00365 StagingProcesses owner);
00366
00368
00369 void reset();
00370
00372 void set_id(const std::string& id);
00374 std::string get_id() const { return DTR_ID; };
00376 std::string get_short_id() const;
00377
00379 Arc::DataHandle& get_source() { return source_endpoint; };
00381 Arc::DataHandle& get_destination() { return destination_endpoint; };
00382
00384 std::string get_source_str() const { return source_url_str; };
00386 std::string get_destination_str() const { return destination_url_str; };
00387
00389 const Arc::UserConfig& get_usercfg() const { return cfg; };
00390
00392 void set_timeout(time_t value) { timeout.SetTime(Arc::Time().GetTime() + value); };
00394 Arc::Time get_timeout() const { return timeout; };
00395
00397 void set_process_time(const Arc::Period& process_time);
00399 Arc::Time get_process_time() const { return next_process_time; };
00400
00402 Arc::Time get_creation_time() const { return created; };
00403
00405 Arc::Time get_modification_time() const { return last_modified; };
00406
00408 std::string get_parent_job_id() const { return parent_job_id; };
00409
00411 void set_priority(int pri);
00413 int get_priority() const { return priority; };
00414
00416 void set_rfc_proxy(bool rfc) { rfc_proxy = rfc; };
00418 bool is_rfc_proxy() const { return rfc_proxy; };
00419
00421 void set_transfer_share(const std::string& share_name);
00423 std::string get_transfer_share() const { return transfershare; };
00424
00426 void set_sub_share(const std::string& share) { sub_share = share; };
00428 std::string get_sub_share() const { return sub_share; };
00429
00431 void set_tries_left(unsigned int tries);
00433 unsigned int get_tries_left() const { return tries_left; };
00435 unsigned int get_initial_tries() const { return initial_tries; }
00437 void decrease_tries_left();
00438
00440 void set_status(DTRStatus stat);
00442 DTRStatus get_status();
00443
00445
00447 void set_error_status(DTRErrorStatus::DTRErrorStatusType error_stat,
00448 DTRErrorStatus::DTRErrorLocation error_loc,
00449 const std::string& desc="");
00451 void reset_error_status();
00453 DTRErrorStatus get_error_status();
00454
00456 void set_bytes_transferred(unsigned long long int bytes);
00458 unsigned long long int get_bytes_transferred() const { return bytes_transferred; };
00459
00461 void set_cancel_request();
00463 bool cancel_requested() const { return cancel_request; };
00464
00466 void set_delivery_endpoint(const Arc::URL& endpoint) { delivery_endpoint = endpoint; };
00468 const Arc::URL& get_delivery_endpoint() const { return delivery_endpoint; };
00469
00472 void add_problematic_delivery_service(const Arc::URL& endpoint) { problematic_delivery_endpoints.push_back(endpoint); };
00474 const std::vector<Arc::URL>& get_problematic_delivery_services() const { return problematic_delivery_endpoints; };
00475
00477 void host_cert_for_remote_delivery(bool host) { use_host_cert_for_remote_delivery = host; };
00479 bool host_cert_for_remote_delivery() const { return use_host_cert_for_remote_delivery; };
00480
00482 void set_cache_file(const std::string& filename);
00484 std::string get_cache_file() const { return cache_file; };
00485
00487 void set_cache_parameters(const DTRCacheParameters& param) { cache_parameters = param; };
00489 const DTRCacheParameters& get_cache_parameters() const { return cache_parameters; };
00490
00492 void set_cache_state(CacheState state);
00494 CacheState get_cache_state() const { return cache_state; };
00495
00497 void set_mapped_source(const std::string& file = "") { mapped_source = file; };
00499 std::string get_mapped_source() const { return mapped_source; };
00500
00502 StagingProcesses get_owner() const { return current_owner; };
00503
00505 Arc::User get_local_user() const { return user; };
00506
00508 void set_replication(bool rep) { replication = rep; };
00510 bool is_replication() const { return replication; };
00512 void set_force_registration(bool force) { force_registration = force; };
00514 bool is_force_registration() const { return force_registration; };
00515
00517 void set_bulk_start(bool value) { bulk_start = value; };
00519 bool get_bulk_start() const { return bulk_start; };
00521 void set_bulk_end(bool value) { bulk_end = value; };
00523 bool get_bulk_end() const { return bulk_end; };
00525 bool bulk_possible();
00526
00528 const DTRLogger& get_logger() const { return logger; };
00529
00531 void connect_logger() { if (logger) logger->addDestinations(log_destinations); };
00533 void disconnect_logger() { if (logger) logger->removeDestinations(); };
00534
00536 static void push(DTR_ptr dtr, StagingProcesses new_owner);
00537
00539 bool suspend();
00540
00542 bool error() const { return (error_status != DTRErrorStatus::NONE_ERROR); }
00543
00545 bool is_destined_for_pre_processor() const;
00547 bool is_destined_for_post_processor() const;
00549 bool is_destined_for_delivery() const;
00550
00552 bool came_from_pre_processor() const;
00554 bool came_from_post_processor() const;
00556 bool came_from_delivery() const;
00558 bool came_from_generator() const;
00560 bool is_in_final_state() const;
00561 };
00562
00563 }
00564 #endif