ARC SDK
DTR Generator

C++

Generator.cpp

#include <arc/credential/Credential.h>
#include <arc/GUID.h>
#include "Generator.h"
Arc::Logger Generator::logger(Arc::Logger::getRootLogger(), "Generator");
Arc::SimpleCondition Generator::cond;
Generator::Generator() {
// Set up logging
}
Generator::~Generator() {
logger.msg(Arc::INFO, "Shutting down scheduler");
scheduler.stop();
logger.msg(Arc::INFO, "Scheduler stopped, exiting");
}
void Generator::receiveDTR(DataStaging::DTR_ptr dtr) {
// root logger is disabled in Scheduler thread so need to add it here
logger.msg(Arc::INFO, "Received DTR %s back from scheduler in state %s", dtr->get_id(), dtr->get_status().str());
counter.dec();
}
void Generator::start() {
// Starting scheduler with default configuration
logger.msg(Arc::INFO, "Generator started");
logger.msg(Arc::INFO, "Starting DTR threads");
scheduler.SetDumpLocation("/tmp/dtr.log");
scheduler.start();
}
void Generator::run(const std::string& source, const std::string& destination) {
std::string job_id = Arc::UUID();
Arc::UserConfig cfg(cred_type);
// check credentials
logger.msg(Arc::ERROR, "No valid credentials found, exiting");
return;
}
cfg.UtilsDirPath(Arc::UserConfig::ARCUSERDIRECTORY());
std::list<DataStaging::DTRLogDestination> logs;
logs.push_back(new Arc::LogStream(std::cout));
DataStaging::DTR_ptr dtr(new DataStaging::DTR(source, destination, cfg, job_id, Arc::User().get_uid(), logs, "DataStaging"));
if (!(*dtr)) {
logger.msg(Arc::ERROR, "Problem creating dtr (source %s, destination %s)", source, destination);
return;
}
// register callback with DTR
dtr->registerCallback(this,DataStaging::GENERATOR);
dtr->registerCallback(&scheduler,DataStaging::SCHEDULER);
dtr->set_tries_left(5);
counter.inc();
}

Generator.h

#ifndef GENERATOR_H_
#define GENERATOR_H_
#include <arc/Thread.h>
#include <arc/Logger.h>
#include <arc/data-staging/Scheduler.h>
// This Generator basic implementation shows how a Generator can
// be written. It has one method, run(), which creates a single DTR
// and submits it to the Scheduler.
class Generator: public DataStaging::DTRCallback {
private:
// Condition to wait on until DTR has finished
static Arc::SimpleCondition cond;
// DTR Scheduler
// Logger object
static Arc::Logger logger;
// Root LogDestinations to be used in receiveDTR
std::list<Arc::LogDestination*> root_destinations;
public:
// Counter for main to know how many DTRs are in the system
// Create a new Generator. start() must be called to start DTR threads.
Generator();
// Stop Generator and DTR threads
~Generator();
// Implementation of callback from DTRCallback - the callback method used
// when DTR processing is complete to pass the DTR back to the generator.
// It decrements counter.
virtual void receiveDTR(DataStaging::DTR_ptr dtr);
// Start Generator and DTR threads
void start();
// Submit a DTR with given source and destination. Increments counter.
void run(const std::string& source, const std::string& destination);
};
#endif /* GENERATOR_H_ */

generator-main.cpp

/*
// To compile this example requires that nordugrid-arc-devel be installed. It
// also requires including headers of external libraries used by ARC core code:
//
// g++ -o generator `pkg-config --cflags glibmm-2.4` -I/usr/include/libxml2 \
// -larcdatastaging Generator.cpp Generator.h generator-main.cpp
//
// If ARC is installed in a non-standard location, the options
// -L ARC_LOCATION/lib and -I ARC_LOCATION/include should also be used
*/
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
#include <signal.h>
#include <arc/StringConv.h>
#include "Generator.h"
static Arc::SimpleCounter counter;
static bool run = true;
static void do_shutdown(int) {
run = false;
}
static void usage() {
std::cout << "Usage: generator [num mock transfers]" << std::endl;
std::cout << " generator source destination" << std::endl;
std::cout << "To use mock transfers ARC must be built with configure --enable-mock-dmc" << std::endl;
std::cout << "The default number of mock transfers is 10" << std::endl;
}
int main(int argc, char** argv) {
signal(SIGTTOU,SIG_IGN);
signal(SIGTTIN,SIG_IGN);
signal(SIGINT, do_shutdown);
// Log to stderr
Arc::LogStream logcerr(std::cerr);
Generator generator;
int num = 10;
if (argc == 1 || argc == 2) { // run mock a number of times
if (argc == 2 && (std::string(argv[1]) == "-h" || !Arc::stringto(argv[1], num))) {
usage();
return 1;
}
generator.start();
for (int i = 0; i < num; ++i) {
std::string source = "mock://mocksrc/mock." + Arc::tostring(i);
std::string destination = "mock://mockdest/mock." + Arc::tostring(i);
generator.run(source, destination);
}
}
else if (argc == 3) { // run with given source and destination
generator.start();
generator.run(argv[1], argv[2]);
}
else {
usage();
return 1;
}
while (generator.counter.get() > 0 && run) {
sleep(1);
}
return 0;
}

Python

#!/usr/bin/env python
# The nordugrid-arc-python package is required. As stated in the comments, the
# main missing piece in Python when compared to C++ is the ability to get
# callbacks to the Generator when the DTR has finished. To run:
#
# python dtr_generator.py /bin/ls /tmp/dtrtest
#
# If nordugrid-arc-python is installed to a non-standard location, PYTHONPATH
# may need to be set.
import os
import sys
import time
import arc
class DTRGenerator(arc.DTRCallback):
def __init__(self):
super(DTRGenerator, self).__init__()
# Set up logging
self.root_logger = arc.Logger_getRootLogger()
self.stream = arc.LogStream(sys.stdout)
self.root_logger.addDestination(self.stream)
self.root_logger.setThreshold(arc.DEBUG)
self.cfg = arc.UserConfig('', '')
self.id = '1'
arc.DTR.LOG_LEVEL = self.root_logger.getThreshold()
# Start the Scheduler
self.scheduler = arc.Scheduler()
self.scheduler.start()
def __del__(self):
# Stop Scheduler when Generator is finished
self.scheduler.stop()
def add(self, source, dest):
# Logger object, wrapped in smart pointer. The Logger object can only be accessed
# by explicitly deferencing the smart pointer.
dtrlog = arc.createDTRLogger(self.root_logger, "DTR")
dtrlog.__deref__().addDestination(self.stream)
dtrlog.__deref__().setThreshold(arc.DEBUG)
# Create DTR (also wrapped in smart pointer)
dtrptr = arc.createDTRPtr(source, dest, self.cfg, self.id, os.getuid(), dtrlog)
# The ability to register 'this' as a callback object is not available yet
#dtrptr.registerCallback(self, arc.GENERATOR)
# Register the scheduler callback so we can push the DTR to it
dtrptr.registerCallback(self.scheduler, arc.SCHEDULER)
# Send the DTR to the Scheduler
arc.DTR.push(dtrptr, arc.SCHEDULER)
# Since the callback is not available, wait until the transfer reaches a final state
while dtrptr.get_status() != arc.DTRStatus.ERROR and dtrptr.get_status() != arc.DTRStatus.DONE:
time.sleep(1)
sys.stdout.write("%s\n"%dtrptr.get_status().str())
# This is never called in the current version
def receiveDTR(self, dtr):
sys.stdout.write('Received back DTR %s\n'%str(dtr.get_id()))
def main(args):
if len(args) != 3:
sys.stdout.write("Usage: python dtr_generator.py source destination\n")
sys.exit(1)
generator = DTRGenerator()
generator.add(args[1], args[2])
if __name__ == '__main__':
main(sys.argv[0:])