ARC SDK
DTR.h
1 // Summary page of data staging for doxygen
2 namespace DataStaging {
37 } // namespace DataStaging
38 
39 #ifndef DTR_H_
40 #define DTR_H_
41 
42 #include <arc/data/DataHandle.h>
43 #include <arc/CheckSum.h>
44 #include <arc/data/URLMap.h>
45 #include <arc/DateTime.h>
46 #include <arc/JobPerfLog.h>
47 #include <arc/Logger.h>
48 #include <arc/User.h>
49 #include <arc/UserConfig.h>
50 #include <arc/Thread.h>
51 #include "DTRStatus.h"
52 
54 namespace DataStaging {
55 
56  class DTR;
57 
59 
61 
63 
66 
68 
75  };
76 
78 
79  enum ProcessState {
84  };
85 
87 
92  public:
94 
98  unsigned long long int min_average_bandwidth;
100 
103  unsigned int max_inactivity_time;
105 
110  unsigned long long int min_current_bandwidth;
112  unsigned int averaging_time;
116  };
117 
119 
124  public:
126  std::vector<std::string> cache_dirs;
128  std::vector<std::string> drain_cache_dirs;
130  std::vector<std::string> readonly_cache_dirs;
134  DTRCacheParameters(std::vector<std::string> caches,
135  std::vector<std::string> drain_caches,
136  std::vector<std::string> readonly_caches);
137  };
138 
140 
148  public:
152  DTRCredentialInfo(const std::string& DN,
153  const Arc::Time& expirytime,
154  const std::list<std::string> vomsfqans);
156  std::string getDN() const { return DN; };
158  Arc::Time getExpiryTime() const { return expirytime; };
160  std::string extractVOMSVO() const;
162  std::string extractVOMSGroup() const;
164  std::string extractVOMSRole() const;
165  private:
166  std::string DN;
167  Arc::Time expirytime;
168  std::list<std::string> vomsfqans;
169 
170  };
171 
173 
174  enum CacheState {
182  };
183 
185 
193  class DTRCallback {
194  public:
196  virtual ~DTRCallback() {};
198 
202  virtual void receiveDTR(DTR_ptr dtr) = 0;
203  // TODO
204  //virtual void suspendDTR(DTR& dtr) = 0;
205  //virtual void cancelDTR(DTR& dtr) = 0;
206  };
207 
209 
228  class DTR {
229 
230  private:
232  std::string DTR_ID;
233 
235  Arc::URL source_url;
236  Arc::URL destination_url;
237  Arc::UserConfig cfg;
238 
240  Arc::DataHandle source_endpoint;
242  Arc::DataHandle destination_endpoint;
243 
245  std::string source_url_str;
247  std::string destination_url_str;
248 
250  /* Kept as string so we don't need to duplicate DataHandle properties
251  * of destination. Delivery should check if this is set and if so use
252  * it as destination. */
253  std::string cache_file;
254 
256  DTRCacheParameters cache_parameters;
257 
259  CacheState cache_state;
260 
262  bool use_acix;
263 
265  Arc::User user;
266 
268  DTRCredentialInfo credentials;
269 
271  std::string parent_job_id;
272 
274  int priority;
275 
277  std::string transfershare;
278 
280 
283  std::string sub_share;
284 
286  unsigned int tries_left;
287 
289  unsigned int initial_tries;
290 
292  bool replication;
293 
295 
299  bool force_registration;
300 
302 
303  std::string mapped_source;
304 
306  DTRStatus status;
307 
309  DTRErrorStatus error_status;
310 
312  unsigned long long int bytes_transferred; // TODO and/or offset?
314  unsigned long long int transfer_time;
315 
317  Arc::Time timeout;
320  Arc::Time created;
322  Arc::Time last_modified;
324  Arc::Time next_process_time;
325 
327  bool cancel_request;
328 
330  bool bulk_start;
332  bool bulk_end;
334  bool source_supports_bulk;
335 
337  bool mandatory;
338 
340 
341  Arc::URL delivery_endpoint;
342 
344  std::vector<Arc::URL> problematic_delivery_endpoints;
345 
347  bool use_host_cert_for_remote_delivery;
348 
350  StagingProcesses current_owner;
351 
353 
355  DTRLogger logger;
356 
358 
361  std::list<DTRLogDestination> log_destinations;
362 
364 
366  //bool delete_log_destinations;
367 
369  Arc::JobPerfLog perf_log;
370 
372  Arc::JobPerfRecord perf_record;
373 
375  std::map<StagingProcesses,std::list<DTRCallback*> > proc_callback;
376 
379 
383 
386 
390  /* Methods */
392  void mark_modification () { last_modified.SetTime(time(NULL)); };
393 
395  std::list<DTRCallback*> get_callbacks(const std::map<StagingProcesses, std::list<DTRCallback*> >& proc_callback,
396  StagingProcesses owner);
397 
399  DTR& operator=(const DTR& dtr);
400  DTR(const DTR& dtr);
401  DTR();
402 
403 
404  public:
405 
407  static const Arc::URL LOCAL_DELIVERY;
408 
411 
413 
425  DTR(const std::string& source,
426  const std::string& destination,
427  const Arc::UserConfig& usercfg,
428  const std::string& jobid,
429  const uid_t& uid,
430  std::list<DTRLogDestination> const& logs,
431  const std::string& logname = std::string("DTR"));
432 
434  ~DTR() {};
435 
437  operator bool() const {
438  return (!DTR_ID.empty());
439  }
441  bool operator!() const {
442  return (DTR_ID.empty());
443  }
444 
446 
452 
454 
457  void reset();
458 
460  void set_id(const std::string& id);
462  std::string get_id() const { return DTR_ID; };
464  std::string get_short_id() const;
465 
467  Arc::DataHandle& get_source() { return source_endpoint; };
469  Arc::DataHandle& get_destination() { return destination_endpoint; };
470 
472  std::string get_source_str() const { return source_url_str; };
474  std::string get_destination_str() const { return destination_url_str; };
475 
477  const Arc::UserConfig& get_usercfg() const { return cfg; };
478 
480  void set_timeout(time_t value) { timeout.SetTime(Arc::Time().GetTime() + value); };
482  Arc::Time get_timeout() const { return timeout; };
483 
485  void set_process_time(const Arc::Period& process_time);
487  Arc::Time get_process_time() const { return next_process_time; };
488 
490  Arc::Time get_creation_time() const { return created; };
491 
493  Arc::Time get_modification_time() const { return last_modified; };
494 
496  std::string get_parent_job_id() const { return parent_job_id; };
497 
499  void set_priority(int pri);
501  int get_priority() const { return priority; };
502 
504  void set_credential_info(const DTRCredentialInfo& cred) { credentials = cred; };
506  const DTRCredentialInfo& get_credential_info() const { return credentials; };
507 
509  void set_transfer_share(const std::string& share_name);
511  std::string get_transfer_share() const { return transfershare; };
512 
514  void set_sub_share(const std::string& share) { sub_share = share; };
516  std::string get_sub_share() const { return sub_share; };
517 
519  void set_tries_left(unsigned int tries);
521  unsigned int get_tries_left() const { return tries_left; };
523  unsigned int get_initial_tries() const { return initial_tries; }
525  void decrease_tries_left();
526 
528  void set_status(DTRStatus stat);
531 
533 
539  const std::string& desc="");
541  void reset_error_status();
544 
546  void set_bytes_transferred(unsigned long long int bytes);
548  unsigned long long int get_bytes_transferred() const { return bytes_transferred; };
549 
551  void set_transfer_time(unsigned long long int t);
553  unsigned long long int get_transfer_time() const { return transfer_time; };
554 
556  void set_cancel_request();
558  bool cancel_requested() const { return cancel_request; };
559 
561  void set_delivery_endpoint(const Arc::URL& endpoint) { delivery_endpoint = endpoint; };
563  const Arc::URL& get_delivery_endpoint() const { return delivery_endpoint; };
564 
566 
570  void add_problematic_delivery_service(const Arc::URL& endpoint) { problematic_delivery_endpoints.push_back(endpoint); };
572  const std::vector<Arc::URL>& get_problematic_delivery_services() const { return problematic_delivery_endpoints; };
573 
575  void host_cert_for_remote_delivery(bool host) { use_host_cert_for_remote_delivery = host; };
577  bool host_cert_for_remote_delivery() const { return use_host_cert_for_remote_delivery; };
578 
580  void set_cache_file(const std::string& filename);
582  std::string get_cache_file() const { return cache_file; };
583 
585  void set_cache_parameters(const DTRCacheParameters& param) { cache_parameters = param; };
587  const DTRCacheParameters& get_cache_parameters() const { return cache_parameters; };
588 
590  void set_cache_state(CacheState state);
592  CacheState get_cache_state() const { return cache_state; };
593 
595  void set_use_acix(bool acix) { use_acix = acix; };
597  bool get_use_acix() const { return use_acix; };
598 
600  void set_mapped_source(const std::string& file = "") { mapped_source = file; };
602  std::string get_mapped_source() const { return mapped_source; };
603 
605  StagingProcesses get_owner() const { return current_owner; };
606 
608  Arc::User get_local_user() const { return user; };
609 
611  void set_replication(bool rep) { replication = rep; };
613  bool is_replication() const { return replication; };
615  void set_force_registration(bool force) { force_registration = force; };
617  bool is_force_registration() const { return force_registration; };
618 
620  void set_bulk_start(bool value) { bulk_start = value; };
622  bool get_bulk_start() const { return bulk_start; };
624  void set_bulk_end(bool value) { bulk_end = value; };
626  bool get_bulk_end() const { return bulk_end; };
628  bool bulk_possible();
629 
631  bool is_mandatory() const { return mandatory; };
632 
634  const DTRLogger& get_logger() const { return logger; };
635 
637  std::list<Arc::LogDestination*> get_log_destinations() const;
638 
640  static void push(DTR_ptr dtr, StagingProcesses new_owner);
641 
643  bool suspend();
644 
646  bool error() const { return (error_status != DTRErrorStatus::NONE_ERROR); }
647 
649  bool is_destined_for_pre_processor() const;
651  bool is_destined_for_post_processor() const;
653  bool is_destined_for_delivery() const;
654 
656  bool came_from_pre_processor() const;
658  bool came_from_post_processor() const;
660  bool came_from_delivery() const;
662  bool came_from_generator() const;
664  bool is_in_final_state() const;
665 
667  Arc::JobPerfLog& get_job_perf_log() { return perf_log; };
669  Arc::JobPerfRecord& get_job_perf_record() { return perf_record; };
670 
671  };
672 
674  DTR_ptr createDTRPtr(const std::string& source,
675  const std::string& destination,
676  const Arc::UserConfig& usercfg,
677  const std::string& jobid,
678  const uid_t& uid,
679  std::list<DTRLogDestination> const& logs,
680  const std::string& logname = std::string("DTR"));
681 
684  const std::string& subdomain);
685 
686 } // namespace DataStaging
687 #endif /*DTR_H_*/
void set_mapped_source(const std::string &file="")
Set the mapped file.
Definition: DTR.h:600
DTR_ptr createDTRPtr(const std::string &source, const std::string &destination, const Arc::UserConfig &usercfg, const std::string &jobid, const uid_t &uid, std::list< DTRLogDestination > const &logs, const std::string &logname=std::string("DTR"))
Helper method to create smart pointer, only for swig bindings.
void set_replication(bool rep)
Set replication flag.
Definition: DTR.h:611
Arc::Time get_timeout() const
Get the timeout for processing this DTR.
Definition: DTR.h:482
Simple triggered condition.
Definition: Thread.h:150
No error.
Definition: DTRStatus.h:216
User configuration class
Definition: UserConfig.h:196
bool is_destined_for_pre_processor() const
Returns true if this DTR is about to go into the pre-processor.
void SetTime(time_t)
Sets the time.
The base class from which all callback-enabled classes should be derived.
Definition: DTR.h:193
bool came_from_pre_processor() const
Returns true if this DTR just came from the pre-processor.
void set_cache_state(CacheState state)
Set the cache state.
bool is_destined_for_post_processor() const
Returns true if this DTR is about to go into the post-processor.
StagingProcesses
Components of the data staging framework.
Definition: DTR.h:69
std::string extractVOMSVO() const
Get the VOMS VO.
bool came_from_generator() const
Returns true if this DTR just came from the generator.
std::string get_mapped_source() const
Get the mapped file.
Definition: DTR.h:602
Process has been instructed to stop.
Definition: DTR.h:82
void set_bytes_transferred(unsigned long long int bytes)
Set bytes transferred (should be set by whatever is controlling the transfer)
std::string get_sub_share() const
Get sub-share.
Definition: DTR.h:516
void set_bulk_start(bool value)
Set bulk start flag.
Definition: DTR.h:620
const std::vector< Arc::URL > & get_problematic_delivery_services() const
Get all problematic endpoints.
Definition: DTR.h:572
This class is a wrapper around the DataPoint class.
Definition: DataHandle.h:33
bool came_from_delivery() const
Returns true if this DTR just came from delivery.
A class for storing and manipulating times.
Definition: DateTime.h:125
bool cancel_requested() const
Returns true if cancellation has been requested.
Definition: DTR.h:558
bool bulk_possible()
Whether bulk operation is possible according to current state and src/dest.
bool suspend()
Suspend the DTR which is in doing transfer in the delivery process.
Class for storing credential information.
Definition: DTR.h:147
const DTRLogger & get_logger() const
Get Logger object, so that processes can log to this DTR&#39;s log.
Definition: DTR.h:634
Source is available in cache from before.
Definition: DTR.h:177
bool came_from_post_processor() const
Returns true if this DTR just came from the post-processor.
Arc::JobPerfRecord & get_job_perf_record()
Get the performance log record.
Definition: DTR.h:669
Arc::JobPerfLog & get_job_perf_log()
Get the performance log.
Definition: DTR.h:667
bool host_cert_for_remote_delivery() const
Get the flag for using host certificate for contacting remote delivery services.
Definition: DTR.h:577
void registerCallback(DTRCallback *cb, StagingProcesses owner)
Register callback objects to be used during DTR processing.
void set_process_time(const Arc::Period &process_time)
Set the next processing time to current time + given time.
bool get_use_acix() const
Get whether ACIX is a source.
Definition: DTR.h:597
virtual void receiveDTR(DTR_ptr dtr)=0
Defines the callback method called when a DTR is pushed to this object.
virtual ~DTRCallback()
Empty virtual destructor.
Definition: DTR.h:196
bool is_destined_for_delivery() const
Returns true if this DTR is about to go into delivery.
void set_force_registration(bool force)
Set force replication flag.
Definition: DTR.h:615
const Arc::UserConfig & get_usercfg() const
Get the UserConfig object associated with this DTR.
Definition: DTR.h:477
bool is_replication() const
Get replication flag.
Definition: DTR.h:613
DTRErrorStatus get_error_status()
Get the error status.
static Arc::LogLevel LOG_LEVEL
Log level for all DTR activity.
Definition: DTR.h:410
unsigned long long int min_average_bandwidth
Minimum average bandwidth in bytes/sec.
Definition: DTR.h:98
A logger class.
Definition: Logger.h:493
Data Transfer Request.
Definition: DTR.h:228
std::string extractVOMSGroup() const
Get the VOMS Group (first in the supplied list of fqans)
DTRStatus get_status()
Get the status. Protected by lock.
void set_tries_left(unsigned int tries)
Set the number of attempts remaining.
Proecess has stopped.
Definition: DTR.h:83
void reset_error_status()
Set the error status back to NONE_ERROR and clear other fields.
void set_cache_parameters(const DTRCacheParameters &param)
Set cache parameters.
Definition: DTR.h:585
std::string get_parent_job_id() const
Get the parent job ID.
Definition: DTR.h:496
unsigned int get_initial_tries() const
Get the initial number of attempts (set by set_tries_left())
Definition: DTR.h:523
void set_bulk_end(bool value)
Set bulk end flag.
Definition: DTR.h:624
Creator of new DTRs and receiver of completed DTRs.
Definition: DTR.h:70
Represents limits and properties of a DTR transfer. These generally apply to all DTRs.
Definition: DTR.h:91
TransferParameters()
Constructor. Initialises all values to zero.
Definition: DTR.h:114
void set_timeout(time_t value)
Set the timeout for processing this DTR.
Definition: DTR.h:480
void set_transfer_time(unsigned long long int t)
Set transfer time (should be set by whatever is controlling the transfer)
static const Arc::URL LOCAL_DELIVERY
URL that is used to denote local Delivery should be used.
Definition: DTR.h:407
void set_cache_file(const std::string &filename)
Set cache filename.
unsigned int max_inactivity_time
Maximum inactivity time in sec.
Definition: DTR.h:103
Class representing the status of a DTR.
Definition: DTRStatus.h:16
void add_problematic_delivery_service(const Arc::URL &endpoint)
Add problematic endpoint.
Definition: DTR.h:570
DTRErrorStatusType
A list of error types.
Definition: DTRStatus.h:213
std::vector< std::string > cache_dirs
List of (cache dir [link dir])
Definition: DTR.h:126
Arc::DataHandle & get_source()
Get source handle. Return by reference since DataHandle cannot be copied.
Definition: DTR.h:467
DataStaging contains all components for data transfer scheduling and execution.
Definition: DataDelivery.h:12
DTRCacheParameters(void)
Constructor with empty lists initialised.
Definition: DTR.h:132
void set_delivery_endpoint(const Arc::URL &endpoint)
Set delivery endpoint.
Definition: DTR.h:561
LogLevel
Logging levels for tagging and filtering log messages.
Definition: Logger.h:20
void decrease_tries_left()
Decrease attempt number.
bool error() const
Did an error happen?
Definition: DTR.h:646
static void push(DTR_ptr dtr, StagingProcesses new_owner)
Pass the DTR from one process to another. Protected by lock.
Source has just been downloaded and put in cache.
Definition: DTR.h:178
void reset()
Reset information held on this DTR, such as resolved replicas, error state etc.
void set_id(const std::string &id)
Set the ID of this DTR. Useful when passing DTR between processes.
Controls queues and moves DTRs bewteen other components when necessary.
Definition: DTR.h:71
int get_priority() const
Get the priority.
Definition: DTR.h:501
Arc::Time getExpiryTime() const
Get the expiry time.
Definition: DTR.h:158
unsigned long long int get_transfer_time() const
Get transfer time.
Definition: DTR.h:553
void set_status(DTRStatus stat)
Set the status. Protected by lock.
unsigned long long int min_current_bandwidth
Minimum current bandwidth in bytes/sec.
Definition: DTR.h:110
unsigned int get_tries_left() const
Get the number of attempts remaining.
Definition: DTR.h:521
std::vector< std::string > readonly_cache_dirs
List of read-only caches.
Definition: DTR.h:130
A Period represents a length of time.
Definition: DateTime.h:48
void set_priority(int pri)
Set the priority.
Arc::User get_local_user() const
Get the local user information.
Definition: DTR.h:608
Class to represent general URLs.
Definition: URL.h:88
Performs all post-transfer operations.
Definition: DTR.h:74
Arc::DataHandle & get_destination()
Get destination handle. Return by reference since DataHandle cannot be copied.
Definition: DTR.h:469
void set_transfer_share(const std::string &share_name)
Set the transfer share. sub_share is automatically added to transfershare.
Arc::Time get_creation_time() const
Get the creation time.
Definition: DTR.h:490
void set_error_status(DTRErrorStatus::DTRErrorStatusType error_stat, DTRErrorStatus::DTRErrorLocation error_loc, const std::string &desc="")
Set the error status.
void set_cancel_request()
Set the DTR to be cancelled.
A class to represent error states reported by various components.
Definition: DTRStatus.h:208
bool get_bulk_end() const
Get bulk start flag.
Definition: DTR.h:626
Performs physical transfer.
Definition: DTR.h:73
std::string get_source_str() const
Get source as a string.
Definition: DTR.h:472
Process is ready to start.
Definition: DTR.h:80
Cache was started but was not used.
Definition: DTR.h:181
std::vector< std::string > drain_cache_dirs
List of draining caches.
Definition: DTR.h:128
void set_sub_share(const std::string &share)
Set sub-share.
Definition: DTR.h:514
std::string get_cache_file() const
Get cache filename.
Definition: DTR.h:582
std::string get_id() const
Get the ID of this DTR.
Definition: DTR.h:462
Definition: JobPerfLog.h:33
void set_credential_info(const DTRCredentialInfo &cred)
Set credential info.
Definition: DTR.h:504
Source should be cached.
Definition: DTR.h:175
std::string extractVOMSRole() const
Get the VOMS Role (first in the supplied list of fqans)
std::string get_transfer_share() const
Get the transfer share. sub_share is automatically added to transfershare.
Definition: DTR.h:511
bool is_force_registration() const
Get force replication flag.
Definition: DTR.h:617
Arc::ThreadedPointer< Arc::Logger > DTRLogger
The DTR&#39;s Logger object can be used outside the DTR object with DTRLogger.
Definition: DTR.h:64
void set_use_acix(bool acix)
Set whether ACIX is a source.
Definition: DTR.h:595
Source is cacheable but due to some problem should not be cached.
Definition: DTR.h:180
Definition: JobPerfLog.h:9
std::list< Arc::LogDestination * > get_log_destinations() const
Get log destination sassigned to this instance.
bool operator!() const
Is DTR not valid?
Definition: DTR.h:441
DTRLogger createDTRLogger(Arc::Logger &parent, const std::string &subdomain)
Helper method to create smart pointer, only for swig bindings.
bool is_in_final_state() const
Returns true if this DTR is in a final state (finished, failed or cancelled)
Cache file is locked.
Definition: DTR.h:179
DTRErrorLocation
Describes where the error occurred.
Definition: DTRStatus.h:247
void host_cert_for_remote_delivery(bool host)
Set the flag for using host certificate for contacting remote delivery services.
Definition: DTR.h:575
Process is running.
Definition: DTR.h:81
Arc::ThreadedPointer< DTR > DTR_ptr
Provides automatic memory management of DTRs and thread-safe destruction.
Definition: DTR.h:56
std::string getDN() const
Get the DN.
Definition: DTR.h:156
~DTR()
Empty destructor.
Definition: DTR.h:434
unsigned long long int get_bytes_transferred() const
Get current number of bytes transferred.
Definition: DTR.h:548
Arc::Time get_process_time() const
Get the next processing time for the DTR.
Definition: DTR.h:487
Source should not be cached.
Definition: DTR.h:176
bool is_mandatory() const
Whether DTR success is mandatory.
Definition: DTR.h:631
Arc::Time get_modification_time() const
Get the modification time.
Definition: DTR.h:493
CacheState get_cache_state() const
Get the cache state.
Definition: DTR.h:592
Performs all pre-transfer operations.
Definition: DTR.h:72
StagingProcesses get_owner() const
Find the DTR owner.
Definition: DTR.h:605
const DTRCredentialInfo & get_credential_info() const
Get credential info.
Definition: DTR.h:506
Platform independent representation of system user.
Definition: User.h:16
const DTRCacheParameters & get_cache_parameters() const
Get cache parameters.
Definition: DTR.h:587
bool get_bulk_start() const
Get bulk start flag.
Definition: DTR.h:622
ProcessState
Internal state of StagingProcesses.
Definition: DTR.h:79
std::string get_short_id() const
Get an abbreviated version of the DTR ID - useful to reduce logging verbosity.
CacheState
Represents possible cache states of this DTR.
Definition: DTR.h:174
const Arc::URL & get_delivery_endpoint() const
Returns delivery endpoint.
Definition: DTR.h:563
std::string get_destination_str() const
Get destination as a string.
Definition: DTR.h:474
unsigned int averaging_time
The time in seconds over which to average the calculation of min_current_bandwidth.
Definition: DTR.h:112
DTRCredentialInfo()
Default constructor.
Definition: DTR.h:150
The configured cache directories.
Definition: DTR.h:123