ARC SDK
EntityRetriever.h
1 #ifndef __ARC_ENTITYRETRIEVER_H__
2 #define __ARC_ENTITYRETRIEVER_H__
3 
4 #include <list>
5 #include <map>
6 #include <string>
7 
8 #include <arc/ArcConfig.h>
9 #include <arc/compute/Endpoint.h>
10 #include <arc/UserConfig.h>
11 #include <arc/Utils.h>
12 #include <arc/compute/EndpointQueryingStatus.h>
13 #include <arc/compute/ExecutionTarget.h>
14 #include <arc/compute/EntityRetrieverPlugin.h>
15 
16 namespace Arc {
17 
18 class Logger;
19 class SharedMutex;
20 class SimpleCondition;
21 class SimpleCounter;
22 template<> class ThreadedPointer<class Endpoint>;
23 class UserConfig;
24 
26 
33 template<typename T>
35 public:
36  EntityConsumer() {}
37  virtual ~EntityConsumer() {}
39 
42  virtual void addEntity(const T&) = 0;
43 };
44 
46 
58 template<typename T>
59 class EntityContainer : public std::list<T>, public EntityConsumer<T> { // The order of inheritance is important for Swig.
60 public:
61  EntityContainer() {}
62  virtual ~EntityContainer() {}
64 
67  virtual void addEntity(const T& t) { this->push_back(t); }
68 };
69 
71 
166 template<typename T>
167 class EntityRetriever : public EntityConsumer<T> {
168 public:
170 
176  ~EntityRetriever() { common->deactivate(); }
177 
179  void wait() const { result.wait(); };
180 
181  //void waitForAll() const; // TODO: Make it possible to be nice and wait for all threads to finish.
182 
186  bool isDone() const { return result.wait(0); };
187 
192  void addConsumer(EntityConsumer<T>& addConsumer_consumer /* The name 'addConsumer_consumer' is important for Swig when matching methods */) { consumerLock.lock(); consumers.push_back(&addConsumer_consumer); consumerLock.unlock(); };
193 
197  void removeConsumer(const EntityConsumer<T>& removeConsumer_consumer /* The name 'removeConsumer_consumer' is important for Swig when matching methods */);
198 
200 
204  EndpointQueryingStatus getStatusOfEndpoint(const Endpoint& endpoint) const;
205 
207 
213  EndpointStatusMap getAllStatuses() const { statusLock.lock(); EndpointStatusMap s = statuses; statusLock.unlock(); return s; }
214 
216 
226  bool setStatusOfEndpoint(const Endpoint& endpoint, const EndpointQueryingStatus& status, bool overwrite = true);
227 
229 
233  void getServicesWithStatus(const EndpointQueryingStatus& status, std::set<std::string>& result);
234 
236 
241  void clearEndpointStatuses() { statusLock.lock(); statuses.clear(); statusLock.unlock(); return; }
242 
244 
251  bool removeEndpoint(const Endpoint& e) { statusLock.lock(); EndpointStatusMap::iterator it = statuses.find(e); if (it != statuses.end()) { statuses.erase(it); statusLock.unlock(); return true; }; statusLock.unlock(); return false; }
252 
267  virtual void addEntity(const T& entity);
268 
270 
275  virtual void addEndpoint(const Endpoint& endpoint);
276 
278 
284  void needAllResults(bool all_results = true) { need_all_results = all_results; }
285 
286 protected:
287  static void queryEndpoint(void *arg_);
288 
289  void checkSuspendedAndStart(const Endpoint& e);
290 
291  // Common configuration part
293  public:
295  mutex(), t(t), uc(u) {};
296  void deactivate(void) {
297  mutex.lockExclusive();
298  t = NULL;
299  mutex.unlockExclusive();
300  }
301  bool lockExclusiveIfValid(void) {
302  mutex.lockExclusive();
303  if(t) return true;
304  mutex.unlockExclusive();
305  return false;
306  }
307  void unlockExclusive(void) { mutex.unlockExclusive(); }
308  bool lockSharedIfValid(void) {
309  mutex.lockShared();
310  if(t) return true;
311  mutex.unlockShared();
312  return false;
313  }
314  void unlockShared(void) { mutex.unlockShared(); }
315 
316  operator const UserConfig&(void) const { return uc; }
317  const std::list<std::string>& getAvailablePlugins(void) const { return availablePlugins; }
318  void setAvailablePlugins(const std::list<std::string>& newAvailablePlugins) { availablePlugins = newAvailablePlugins; }
319  EntityRetriever* operator->(void) { return t; }
320  EntityRetriever* operator*(void) { return t; }
321  private:
322  SharedMutex mutex;
323  EntityRetriever* t;
324  const UserConfig uc;
325  std::list<std::string> availablePlugins;
326  };
328 
329  // Represents completeness of queriies run in threads.
330  // Different implementations are meant for waiting for either one or all threads.
331  // TODO: counter is duplicate in this implimentation. It may be simplified
332  // either by using counter of ThreadedPointer or implementing part of
333  // ThreadedPointer directly.
334  class Result: private ThreadedPointer<SimpleCounter> {
335  public:
336  // Creates initial instance
337  Result(bool one_success = false):
339  success(false),need_one_success(one_success) { };
340  // Creates new reference representing query - increments counter
341  Result(const Result& r):
343  success(false),need_one_success(r.need_one_success) {
344  Ptr()->inc();
345  };
346  // Query finished - decrement or reset counter (if one result is enough)
347  ~Result(void) {
348  if(need_one_success && success) {
349  Ptr()->set(0);
350  } else {
351  Ptr()->dec();
352  };
353  };
354  // Mark this result as successful (failure by default)
355  void setSuccess(void) { success = true; };
356  // Wait for queries to finish
357  bool wait(int t = -1) const { return Ptr()->wait(t); };
358  private:
359  bool success;
360  bool need_one_success;
361  };
362  Result result;
363 
364  class ThreadArg {
365  public:
366  ThreadArg(const ThreadedPointer<Common>& common, Result& result, const Endpoint& endpoint, const EndpointQueryOptions<T>& options) : common(common), result(result), endpoint(endpoint), options(options) {};
367  ThreadArg(const ThreadArg& v, Result& result) : common(v.common), result(result), endpoint(v.endpoint), pluginName(v.pluginName), options(v.options) {};
368  // Objects for communication with caller
370  Result result;
371  // Per-thread parameters
372  Endpoint endpoint;
373  std::string pluginName;
374  EndpointQueryOptions<T> options;
375  };
376 
377  EndpointStatusMap statuses;
378 
379  static Logger logger;
380  const UserConfig& uc;
381  std::list< EntityConsumer<T>* > consumers;
382  const EndpointQueryOptions<T> options;
383 
384  mutable SimpleCondition consumerLock;
385  mutable SimpleCondition statusLock;
386  std::map<std::string, std::string> interfacePluginMap;
387  bool need_all_results;
388 };
389 
391 
398 
400 
407 
409 
416 
417 } // namespace Arc
418 
419 #endif // __ARC_ENTITYRETRIEVER_H__
420 
Arc namespace contains all core ARC classes.
Definition: ArcConfig.h:11
void lock(void)
Acquire semaphor.
Definition: Thread.h:164
void getServicesWithStatus(const EndpointQueryingStatus &status, std::set< std::string > &result)
Insert into results the endpoint.ServiceName() of each endpoint with the given status.
Simple triggered condition.
Definition: Thread.h:150
void addConsumer(EntityConsumer< T > &addConsumer_consumer)
Definition: EntityRetriever.h:192
virtual void addEndpoint(const Endpoint &endpoint)
Starts querying an Endpoint.
void unlockShared(void)
Release a shared lock.
User configuration class
Definition: UserConfig.h:196
Definition: EntityRetriever.h:292
Represents the status in the EntityRetriever of the query process of an Endpoint (service registry...
Definition: EndpointQueryingStatus.h:19
virtual void addEntity(const T &t)
All the consumed entities are pushed to the list.
Definition: EntityRetriever.h:67
bool setStatusOfEndpoint(const Endpoint &endpoint, const EndpointQueryingStatus &status, bool overwrite=true)
Set the status of the query process of a given Endpoint.
Definition: EntityRetriever.h:334
Thread-safe counter with capability to wait for zero value.
Definition: Thread.h:244
bool isDone() const
Definition: EntityRetriever.h:186
virtual void wait(void) const
Wait for zero condition.
void unlockExclusive(void)
Release exclusive lock.
void unlock(void)
Release semaphor.
Definition: Thread.h:168
Definition: EntityRetrieverPlugin.h:96
A logger class.
Definition: Logger.h:493
void removeConsumer(const EntityConsumer< T > &removeConsumer_consumer)
EntityRetriever(const UserConfig &uc, const EndpointQueryOptions< T > &options=EndpointQueryOptions< T >())
Needs the credentials of the user and can have some options.
Mutex which allows shared and exclusive locking.
Definition: Thread.h:333
Represents an endpoint of a service with a given interface type and capabilities. ...
Definition: Endpoint.h:68
Queries Endpoint objects (using plugins in parallel) and sends the found entities to consumers...
Definition: EntityRetriever.h:167
EndpointStatusMap getAllStatuses() const
Get status of all the queried Endpoint objects.
Definition: EntityRetriever.h:213
Options controlling the query process.
Definition: EntityRetrieverPlugin.h:24
void wait() const
Definition: EntityRetriever.h:179
EntityRetriever< ComputingServiceType > TargetInformationRetriever
The TargetInformationRetriever is an EntityRetriever retrieving ComputingServiceType objects...
Definition: EntityRetriever.h:406
EntityRetriever< Endpoint > ServiceEndpointRetriever
The ServiceEndpointRetriever is an EntityRetriever retrieving Endpoint objects.
Definition: EntityRetriever.h:397
SimpleCounter * Ptr(void) const
Cast to original pointer.
Definition: Thread.h:494
virtual void addEntity(const T &entity)
An entity consumer class storing all the consumed entities in a list.
Definition: EntityRetriever.h:59
virtual int inc(void)
Increment value of counter.
virtual int dec(void)
Decrement value of counter.
EndpointQueryingStatus getStatusOfEndpoint(const Endpoint &endpoint) const
Get the status of the query process of a given Endpoint.
virtual void addEntity(const T &)=0
Send an entity to this consumer.
void needAllResults(bool all_results=true)
Sets if all wait for all queries.
Definition: EntityRetriever.h:284
virtual int set(int v)
Set value of counter.
void lockExclusive(void)
Acquire an exclusive lock. Blocks until all shared and exclusive locks are released.
Status map for Endpoint objects.
Definition: Endpoint.h:38
EntityRetriever< Job > JobListRetriever
The JobListRetriever is an EntityRetriever retrieving Job objects.
Definition: EntityRetriever.h:415
Definition: EntityRetriever.h:364
void clearEndpointStatuses()
Clear statuses of registered endpoints.
Definition: EntityRetriever.h:241
A general concept of an object which can consume entities use by the retrievers to return results...
Definition: EntityRetriever.h:34
Wrapper for pointer with automatic destruction and multiple references.
Definition: Thread.h:436
void lockShared(void)
Acquire a shared lock. Blocks until exclusive lock is released.
bool removeEndpoint(const Endpoint &e)
Remove a particular registered endpoint.
Definition: EntityRetriever.h:251