ARC SDK
DTR Generator

C++

Generator.cpp

#include <arc/GUID.h>
#include "Generator.h"
Arc::Logger Generator::logger(Arc::Logger::getRootLogger(), "Generator");
Arc::SimpleCondition Generator::cond;
Generator::Generator() {
// Set up logging
}
Generator::~Generator() {
logger.msg(Arc::INFO, "Shutting down scheduler");
scheduler.stop();
logger.msg(Arc::INFO, "Scheduler stopped, exiting");
}
void Generator::receiveDTR(DataStaging::DTR_ptr dtr) {
// root logger is disabled in Scheduler thread so need to add it here
logger.msg(Arc::INFO, "Received DTR %s back from scheduler in state %s", dtr->get_id(), dtr->get_status().str());
// DTR logger destinations can be destroyed when DTR has finished
dtr->get_logger()->deleteDestinations();
counter.dec();
}
void Generator::start() {
// Starting scheduler with default configuration
logger.msg(Arc::INFO, "Generator started");
logger.msg(Arc::INFO, "Starting DTR threads");
scheduler.SetDumpLocation("/tmp/dtr.log");
scheduler.start();
}
void Generator::run(const std::string& source, const std::string& destination) {
std::string job_id = Arc::UUID();
Arc::UserConfig cfg(cred_type);
// check credentials
logger.msg(Arc::ERROR, "No valid credentials found, exiting");
return;
}
Arc::LogDestination * dest = new Arc::LogStream(std::cout);
log->addDestination(*dest);
DataStaging::DTR_ptr dtr(new DataStaging::DTR(source, destination, cfg, job_id, Arc::User().get_uid(), log));
if (!(*dtr)) {
logger.msg(Arc::ERROR, "Problem creating dtr (source %s, destination %s)", source, destination);
return;
}
// register callback with DTR
dtr->registerCallback(this,DataStaging::GENERATOR);
dtr->registerCallback(&scheduler,DataStaging::SCHEDULER);
dtr->set_tries_left(5);
counter.inc();
}

Generator.h

#ifndef GENERATOR_H_
#define GENERATOR_H_
#include <arc/Thread.h>
#include <arc/Logger.h>
#include <arc/data-staging/Scheduler.h>
// This Generator basic implementation shows how a Generator can
// be written. It has one method, run(), which creates a single DTR
// and submits it to the Scheduler.
class Generator: public DataStaging::DTRCallback {
private:
// Condition to wait on until DTR has finished
static Arc::SimpleCondition cond;
// DTR Scheduler
// Logger object
static Arc::Logger logger;
// Root LogDestinations to be used in receiveDTR
std::list<Arc::LogDestination*> root_destinations;
public:
// Counter for main to know how many DTRs are in the system
// Create a new Generator. start() must be called to start DTR threads.
Generator();
// Stop Generator and DTR threads
~Generator();
// Implementation of callback from DTRCallback - the callback method used
// when DTR processing is complete to pass the DTR back to the generator.
// It decrements counter.
virtual void receiveDTR(DataStaging::DTR_ptr dtr);
// Start Generator and DTR threads
void start();
// Submit a DTR with given source and destination. Increments counter.
void run(const std::string& source, const std::string& destination);
};
#endif /* GENERATOR_H_ */

generator-main.cpp

/*
// To compile this example requires that nordugrid-arc-devel be installed. It
// also requires including headers of external libraries used by ARC core code:
//
// g++ -o generator `pkg-config --cflags glibmm-2.4` -I/usr/include/libxml2 \
// -larcdatastaging Generator.cpp Generator.h generator-main.cpp
//
// If ARC is installed in a non-standard location, the options
// -L ARC_LOCATION/lib and -I ARC_LOCATION/include should also be used
*/
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
#include <signal.h>
#include <arc/StringConv.h>
#include "Generator.h"
static Arc::SimpleCounter counter;
static bool run = true;
static void do_shutdown(int) {
run = false;
}
static void usage() {
std::cout << "Usage: generator [num mock transfers]" << std::endl;
std::cout << " generator source destination" << std::endl;
std::cout << "To use mock transfers ARC must be built with configure --enable-mock-dmc" << std::endl;
std::cout << "The default number of mock transfers is 10" << std::endl;
}
int main(int argc, char** argv) {
signal(SIGTTOU,SIG_IGN);
signal(SIGTTIN,SIG_IGN);
signal(SIGINT, do_shutdown);
// Log to stderr
Arc::LogStream logcerr(std::cerr);
Generator generator;
int num = 10;
if (argc == 1 || argc == 2) { // run mock a number of times
if (argc == 2 && (std::string(argv[1]) == "-h" || !Arc::stringto(argv[1], num))) {
usage();
return 1;
}
generator.start();
for (int i = 0; i < num; ++i) {
std::string source = "mock://mocksrc/mock." + Arc::tostring(i);
std::string destination = "mock://mockdest/mock." + Arc::tostring(i);
generator.run(source, destination);
}
}
else if (argc == 3) { // run with given source and destination
generator.start();
generator.run(argv[1], argv[2]);
}
else {
usage();
return 1;
}
while (generator.counter.get() > 0 && run) {
sleep(1);
}
return 0;
}

Python

1 #!/usr/bin/python
2 
3 # The nordugrid-arc-python package is required. As stated in the comments, the
4 # main missing piece in Python when compared to C++ is the ability to get
5 # callbacks to the Generator when the DTR has finished. To run:
6 #
7 # python dtr_generator.py /bin/ls /tmp/dtrtest
8 #
9 # If nordugrid-arc-python is installed to a non-standard location, PYTHONPATH
10 # may need to be set.
11 
12 import os
13 import sys
14 import time
15 import arc
16 
17 class DTRGenerator(arc.DTRCallback):
18 
19  def __init__(self):
20  # Set up logging
21  self.root_logger = arc.Logger_getRootLogger()
22  self.stream = arc.LogStream(sys.stdout)
23  self.root_logger.addDestination(self.stream)
24  self.root_logger.setThreshold(arc.DEBUG)
25  self.cfg = arc.UserConfig('', '')
26  self.id = '1'
27  arc.DTR.LOG_LEVEL = self.root_logger.getThreshold()
28 
29  # Start the Scheduler
30  self.scheduler = arc.Scheduler()
31  self.scheduler.start()
32 
33  def __del__(self):
34  # Stop Scheduler when Generator is finished
35  self.scheduler.stop()
36 
37  def add(self, source, dest):
38  # Logger object, wrapped in smart pointer. The Logger object can only be accessed
39  # by explicitly deferencing the smart pointer.
40  dtrlog = arc.createDTRLogger(self.root_logger, "DTR")
41  dtrlog.__deref__().addDestination(self.stream)
42  dtrlog.__deref__().setThreshold(arc.DEBUG)
43 
44  # Create DTR (also wrapped in smart pointer)
45  dtrptr = arc.createDTRPtr(source, dest, self.cfg, self.id, os.getuid(), dtrlog)
46 
47  # The ability to register 'this' as a callback object is not available yet
48  #dtrptr.registerCallback(self, arc.GENERATOR)
49  # Register the scheduler callback so we can push the DTR to it
50  dtrptr.registerCallback(self.scheduler, arc.SCHEDULER)
51  # Send the DTR to the Scheduler
52  arc.DTR.push(dtrptr, arc.SCHEDULER)
53  # Since the callback is not available, wait until the transfer reaches a final state
54  while dtrptr.get_status() != arc.DTRStatus.ERROR and dtrptr.get_status() != arc.DTRStatus.DONE:
55  time.sleep(1)
56  print dtrptr.get_status().str()
57 
58  # This is never called in the current version
59  def receiveDTR(self, dtr):
60  print 'Received back DTR', dtr.get_id()
61 
62 def main(args):
63  if len(args) != 3:
64  print "Usage: python dtr_generator.py source destination"
65  sys.exit(1)
66  generator = DTRGenerator()
67  generator.add(args[1], args[2])
68 
69 if __name__ == '__main__':
70  main(sys.argv[0:])

Java

//
// The nordugrid-arc-java package is required. To compile and run this example:
//
// export CLASSPATH=/usr/lib64/java/arc.jar:.
// export LD_LIBRARY_PATH=/usr/lib64/java
// javac DTRGenerator.java
// java DTRGenerator /bin/ls /tmp/dtrtest
//
// The PATHs above may vary depending on ARC install location and system
// architecture.
import nordugrid.arc.*; // For the sake of brevity in this example import everything from arc
// Implementation of DTR Generator.
// Cannot inherit from DTRCallback as it is a pure virtual class and swig does not
// create a default constructor, so extend Scheduler which inherits from DTRCallback.
class DTRGenerator extends Scheduler {
private Logger logger;
private LogDestination logdest;
private SimpleCondition cond;
// Create a new Generator and set up logging to stdout
public DTRGenerator() {
logger = new Logger(Logger.getRootLogger(), "Generator");
logdest = new LogStream_ostream(arc.getStdout());
Logger.getRootLogger().addDestination(logdest);
Logger.getRootLogger().setThreshold(LogLevel.DEBUG);
cond = new SimpleCondition();
}
// Implementation of callback from DTRCallback
public void receiveDTR(DTRPointer dtr) {
// root logger is disabled in Scheduler thread so need to add it here
Logger.getRootLogger().addDestination(logdest);
logger.msg(LogLevel.INFO, "Received DTR " + dtr.get_id() + " in state " + dtr.get_status().str());
Logger.getRootLogger().removeDestinations();
cond.signal();
}
// Run the transfer and wait for the callback on completion
private void run(final String source, final String dest) {
// Set log level for DTR (must be done before starting Scheduler)
DTR.setLOG_LEVEL(LogLevel.DEBUG);
// Start Scheduler thread
Scheduler scheduler = new Scheduler();
scheduler.start();
// UserConfig contains information such as the location of credentials
UserConfig cfg = new UserConfig();
// The ID can be used to group DTRs together
String id = "1234";
// Logger for DTRs
DTRLogger dtrlog = arc.createDTRLogger(Logger.getRootLogger(), "DTR");
dtrlog.addDestination(logdest);
// Use current user's uid for the transfer
User user = new User();
// Create a DTR
DTRPointer dtr = arc.createDTRPtr(source, dest, cfg, id, user.get_uid(), dtrlog);
logger.msg(LogLevel.INFO, "Created DTR "+ dtr.get_id());
// Register this callback in order to receive completed DTRs
dtr.registerCallback(this, StagingProcesses.GENERATOR);
// This line must be here in order to pass the DTR to the Scheduler
dtr.registerCallback(scheduler, StagingProcesses.SCHEDULER);
// Push the DTR to the Scheduler
DTR.push(dtr, StagingProcesses.SCHEDULER);
// Wait until callback is called
// Note: SimpleCondition.wait() is renamed to _wait() as wait() is a java.lang.Object method
cond._wait();
// DTR is finished, so stop Scheduler
scheduler.stop();
}
public static void main(String[] args) {
if (args.length != 2) {
System.out.println("Usage: java DTRGenerator source destination");
return;
}
DTRGenerator gen = new DTRGenerator();
gen.run(args[0], args[1]);
}
}