ARC SDK
EntityRetriever.h
1 #ifndef __ARC_ENTITYRETRIEVER_H__
2 #define __ARC_ENTITYRETRIEVER_H__
3 
4 #include <list>
5 #include <map>
6 #include <string>
7 
8 #include <arc/ArcConfig.h>
9 #include <arc/compute/Endpoint.h>
10 #include <arc/UserConfig.h>
11 #include <arc/Utils.h>
12 #include <arc/compute/EndpointQueryingStatus.h>
13 #include <arc/compute/ExecutionTarget.h>
14 #include <arc/compute/EntityRetrieverPlugin.h>
15 
16 namespace Arc {
17 
18 class Logger;
19 class SharedMutex;
20 class SimpleCondition;
21 class SimpleCounter;
22 template<> class ThreadedPointer<class Endpoint>;
23 class UserConfig;
24 
26 
33 template<typename T>
35 public:
36  EntityConsumer() {}
37  virtual ~EntityConsumer() {}
39 
42  virtual void addEntity(const T&) = 0;
43 };
44 
46 
58 template<typename T>
59 class EntityContainer : public std::list<T>, public EntityConsumer<T> { // The order of inheritance is important for Swig.
60 public:
61  EntityContainer() {}
62  virtual ~EntityContainer() {}
64 
67  virtual void addEntity(const T& t) { this->push_back(t); }
68 };
69 
71 
167 template<typename T>
168 class EntityRetriever : public EntityConsumer<T> {
169 public:
171 
177  ~EntityRetriever() { common->deactivate(); }
178 
180  void wait() const { result.wait(); };
181 
182  //void waitForAll() const; // TODO: Make it possible to be nice and wait for all threads to finish.
183 
187  bool isDone() const { return result.wait(0); };
188 
193  void addConsumer(EntityConsumer<T>& addConsumer_consumer /* The name 'addConsumer_consumer' is important for Swig when matching methods */) { consumerLock.lock(); consumers.push_back(&addConsumer_consumer); consumerLock.unlock(); };
194 
198  void removeConsumer(const EntityConsumer<T>& removeConsumer_consumer /* The name 'removeConsumer_consumer' is important for Swig when matching methods */);
199 
201 
205  EndpointQueryingStatus getStatusOfEndpoint(const Endpoint& endpoint) const;
206 
208 
214  EndpointStatusMap getAllStatuses() const { statusLock.lock(); EndpointStatusMap s = statuses; statusLock.unlock(); return s; }
215 
217 
227  bool setStatusOfEndpoint(const Endpoint& endpoint, const EndpointQueryingStatus& status, bool overwrite = true);
228 
230 
234  void getServicesWithStatus(const EndpointQueryingStatus& status, std::set<std::string>& result);
235 
237 
242  void clearEndpointStatuses() { statusLock.lock(); statuses.clear(); statusLock.unlock(); return; }
243 
245 
252  bool removeEndpoint(const Endpoint& e) { statusLock.lock(); EndpointStatusMap::iterator it = statuses.find(e); if (it != statuses.end()) { statuses.erase(it); statusLock.unlock(); return true; }; statusLock.unlock(); return false; }
253 
268  virtual void addEntity(const T& entity);
269 
271 
276  virtual void addEndpoint(const Endpoint& endpoint);
277 
279 
285  void needAllResults(bool all_results = true) { need_all_results = all_results; }
286 
287 protected:
288  static void queryEndpoint(void *arg_);
289 
290  void checkSuspendedAndStart(const Endpoint& e);
291 
292  // Common configuration part
294  public:
296  mutex(), t(t), uc(u) {};
297  void deactivate(void) {
298  mutex.lockExclusive();
299  t = NULL;
300  mutex.unlockExclusive();
301  }
302  bool lockExclusiveIfValid(void) {
303  mutex.lockExclusive();
304  if(t) return true;
305  mutex.unlockExclusive();
306  return false;
307  }
308  void unlockExclusive(void) { mutex.unlockExclusive(); }
309  bool lockSharedIfValid(void) {
310  mutex.lockShared();
311  if(t) return true;
312  mutex.unlockShared();
313  return false;
314  }
315  void unlockShared(void) { mutex.unlockShared(); }
316 
317  operator const UserConfig&(void) const { return uc; }
318  const std::list<std::string>& getAvailablePlugins(void) const { return availablePlugins; }
319  void setAvailablePlugins(const std::list<std::string>& newAvailablePlugins) { availablePlugins = newAvailablePlugins; }
320  EntityRetriever* operator->(void) { return t; }
321  EntityRetriever* operator*(void) { return t; }
322  private:
323  SharedMutex mutex;
324  EntityRetriever* t;
325  const UserConfig uc;
326  std::list<std::string> availablePlugins;
327  };
329 
330  // Represents completeness of queriies run in threads.
331  // Different implementations are meant for waiting for either one or all threads.
332  // TODO: counter is duplicate in this implimentation. It may be simplified
333  // either by using counter of ThreadedPointer or implementing part of
334  // ThreadedPointer directly.
335  class Result: private ThreadedPointer<SimpleCounter> {
336  public:
337  // Creates initial instance
338  Result(bool one_success = false):
340  success(false),need_one_success(one_success) { };
341  // Creates new reference representing query - increments counter
342  Result(const Result& r):
344  success(false),need_one_success(r.need_one_success) {
345  Ptr()->inc();
346  };
347  // Query finished - decrement or reset counter (if one result is enough)
348  ~Result(void) {
349  if(need_one_success && success) {
350  Ptr()->set(0);
351  } else {
352  Ptr()->dec();
353  };
354  };
355  // Mark this result as successful (failure by default)
356  void setSuccess(void) { success = true; };
357  // Wait for queries to finish
358  bool wait(int t = -1) const { return Ptr()->wait(t); };
359  private:
360  bool success;
361  bool need_one_success;
362  };
363  Result result;
364 
365  class ThreadArg {
366  public:
367  ThreadArg(const ThreadedPointer<Common>& common, Result& result, const Endpoint& endpoint, const EndpointQueryOptions<T>& options) : common(common), result(result), endpoint(endpoint), options(options) {};
368  ThreadArg(const ThreadArg& v, Result& result) : common(v.common), result(result), endpoint(v.endpoint), pluginName(v.pluginName), options(options) {};
369  // Objects for communication with caller
371  Result result;
372  // Per-thread parameters
373  Endpoint endpoint;
374  std::string pluginName;
375  EndpointQueryOptions<T> options;
376  };
377 
378  EndpointStatusMap statuses;
379 
380  static Logger logger;
381  const UserConfig& uc;
382  std::list< EntityConsumer<T>* > consumers;
383  const EndpointQueryOptions<T> options;
384 
385  mutable SimpleCondition consumerLock;
386  mutable SimpleCondition statusLock;
387  std::map<std::string, std::string> interfacePluginMap;
388  bool need_all_results;
389 };
390 
392 
399 
401 
408 
410 
417 
418 } // namespace Arc
419 
420 #endif // __ARC_ENTITYRETRIEVER_H__
421