ARC SDK
EntityRetriever.h
1 #ifndef __ARC_ENTITYRETRIEVER_H__
2 #define __ARC_ENTITYRETRIEVER_H__
3 
4 #include <list>
5 #include <map>
6 #include <string>
7 
8 #include <arc/ArcConfig.h>
9 #include <arc/compute/Endpoint.h>
10 #include <arc/UserConfig.h>
11 #include <arc/Utils.h>
12 #include <arc/compute/EndpointQueryingStatus.h>
13 #include <arc/compute/ExecutionTarget.h>
14 #include <arc/compute/EntityRetrieverPlugin.h>
15 
16 namespace Arc {
17 
18 class Logger;
19 class SharedMutex;
20 class SimpleCondition;
21 class SimpleCounter;
22 class ThreadedPointer<class Endpoint>;
23 class UserConfig;
24 
26 
33 template<typename T>
35 public:
36  virtual ~EntityConsumer() {}
38 
41  virtual void addEntity(const T&) = 0;
42 };
43 
45 
57 template<typename T>
58 class EntityContainer : public EntityConsumer<T>, public std::list<T> {
59 public:
60  virtual ~EntityContainer() {}
62 
65  virtual void addEntity(const T& t) { this->push_back(t); }
66 };
67 
69 
165 template<typename T>
166 class EntityRetriever : public EntityConsumer<T> {
167 public:
169 
175  ~EntityRetriever() { common->deactivate(); }
176 
178  void wait() const { result.wait(); };
179 
180  //void waitForAll() const; // TODO: Make it possible to be nice and wait for all threads to finish.
181 
185  bool isDone() const { return result.wait(0); };
186 
190  void addConsumer(EntityConsumer<T>& consumer) { consumerLock.lock(); consumers.push_back(&consumer); consumerLock.unlock(); };
191 
195  void removeConsumer(const EntityConsumer<T>& consumer);
196 
198 
202  EndpointQueryingStatus getStatusOfEndpoint(const Endpoint& endpoint) const;
203 
205 
211  EndpointStatusMap getAllStatuses() const { statusLock.lock(); EndpointStatusMap s = statuses; statusLock.unlock(); return s; }
212 
214 
224  bool setStatusOfEndpoint(const Endpoint& endpoint, const EndpointQueryingStatus& status, bool overwrite = true);
225 
227 
231  void getServicesWithStatus(const EndpointQueryingStatus& status, std::set<std::string>& result);
232 
234 
239  void clearEndpointStatuses() { statusLock.lock(); statuses.clear(); statusLock.unlock(); return; }
240 
242 
249  bool removeEndpoint(const Endpoint& e) { statusLock.lock(); EndpointStatusMap::iterator it = statuses.find(e); if (it != statuses.end()) { statuses.erase(it); statusLock.unlock(); return true; }; statusLock.unlock(); return false; }
250 
265  virtual void addEntity(const T& entity);
266 
268 
273  virtual void addEndpoint(const Endpoint& endpoint);
274 
276 
282  void needAllResults(bool all_results = true) { need_all_results = all_results; }
283 
284 protected:
285  static void queryEndpoint(void *arg_);
286 
287  void checkSuspendedAndStart(const Endpoint& e);
288 
289  // Common configuration part
291  public:
293  mutex(), t(t), uc(u) {};
294  void deactivate(void) {
295  mutex.lockExclusive();
296  t = NULL;
297  mutex.unlockExclusive();
298  }
299  bool lockExclusiveIfValid(void) {
300  mutex.lockExclusive();
301  if(t) return true;
302  mutex.unlockExclusive();
303  return false;
304  }
305  void unlockExclusive(void) { mutex.unlockExclusive(); }
306  bool lockSharedIfValid(void) {
307  mutex.lockShared();
308  if(t) return true;
309  mutex.unlockShared();
310  return false;
311  }
312  void unlockShared(void) { mutex.unlockShared(); }
313 
314  operator const UserConfig&(void) const { return uc; }
315  const std::list<std::string>& getAvailablePlugins(void) const { return availablePlugins; }
316  void setAvailablePlugins(const std::list<std::string>& newAvailablePlugins) { availablePlugins = newAvailablePlugins; }
317  EntityRetriever* operator->(void) { return t; }
318  EntityRetriever* operator*(void) { return t; }
319  private:
320  SharedMutex mutex;
321  EntityRetriever* t;
322  const UserConfig uc;
323  std::list<std::string> availablePlugins;
324  };
326 
327  // Represents completeness of queriies run in threads.
328  // Different implementations are meant for waiting for either one or all threads.
329  // TODO: counter is duplicate in this implimentation. It may be simplified
330  // either by using counter of ThreadedPointer or implementing part of
331  // ThreadedPointer directly.
332  class Result: private ThreadedPointer<SimpleCounter> {
333  public:
334  // Creates initial instance
335  Result(bool one_success = false):
337  success(false),need_one_success(one_success) { };
338  // Creates new reference representing query - increments counter
339  Result(const Result& r):
341  success(false),need_one_success(r.need_one_success) {
342  Ptr()->inc();
343  };
344  // Query finished - decrement or reset counter (if one result is enough)
345  ~Result(void) {
346  if(need_one_success && success) {
347  Ptr()->set(0);
348  } else {
349  Ptr()->dec();
350  };
351  };
352  // Mark this result as successful (failure by default)
353  void setSuccess(void) { success = true; };
354  // Wait for queries to finish
355  bool wait(int t = -1) const { return Ptr()->wait(t); };
356  private:
357  bool success;
358  bool need_one_success;
359  };
360  Result result;
361 
362  class ThreadArg {
363  public:
364  ThreadArg(const ThreadedPointer<Common>& common, Result& result, const Endpoint& endpoint, const EndpointQueryOptions<T>& options) : common(common), result(result), endpoint(endpoint), options(options) {};
365  ThreadArg(const ThreadArg& v, Result& result) : common(v.common), result(result), endpoint(v.endpoint), pluginName(v.pluginName), options(options) {};
366  // Objects for communication with caller
368  Result result;
369  // Per-thread parameters
370  Endpoint endpoint;
371  std::string pluginName;
372  EndpointQueryOptions<T> options;
373  };
374 
375  EndpointStatusMap statuses;
376 
377  static Logger logger;
378  const UserConfig& uc;
379  std::list< EntityConsumer<T>* > consumers;
380  const EndpointQueryOptions<T> options;
381 
382  mutable SimpleCondition consumerLock;
383  mutable SimpleCondition statusLock;
384  std::map<std::string, std::string> interfacePluginMap;
385  bool need_all_results;
386 };
387 
389 
396 
398 
405 
407 
414 
415 } // namespace Arc
416 
417 #endif // __ARC_ENTITYRETRIEVER_H__
418