ARC SDK
Thread.h
1 // -*- indent-tabs-mode: nil -*-
2 
3 #ifndef __ARC_THREAD_H__
4 #define __ARC_THREAD_H__
5 
6 #include <map>
7 
8 #include <glibmm/thread.h>
9 
10 namespace Arc {
11 
15  class SimpleCondition;
16  class SimpleCounter;
17 
18  // This module provides convenient helpers for Glibmm interface for thread
19  // management. So far it takes care of automatic initialization
20  // of threading environment and creation of simple detached threads.
21  // Always use it instead of glibmm/thread.h and keep among first
22  // includes. It safe to use it multiple times and to include it both
23  // from source files and other include files.
24 
29  const size_t thread_stacksize = (16 * 1024 * 1024);
32 
38  bool CreateThreadFunction(void (*func)(void*), void *arg, SimpleCounter* count = NULL);
39 
42  class ThreadId {
43  private:
44  Glib::Mutex mutex;
45  std::map<unsigned long int, unsigned long int> thread_ids;
46  unsigned long int thread_no;
47  ThreadId();
48  public:
49  static ThreadId& getInstance();
51  void add();
53  void remove();
55  unsigned long int get();
56  };
59  class ThreadData;
60 
62 
67  friend class ThreadData;
68  private:
69  ThreadDataItem(const ThreadDataItem& it);
70  protected:
71  virtual ~ThreadDataItem(void);
72  public:
74 
75  ThreadDataItem(void);
77 
79  ThreadDataItem(std::string& key);
81  ThreadDataItem(const std::string& key);
83 
86  void Attach(std::string& key);
88 
90  void Attach(const std::string& key);
92 
93  static ThreadDataItem* Get(const std::string& key);
95 
99  virtual void Dup(void);
100  };
101 
102  class ThreadArgument;
103 
105  class Thread {
106  friend class ThreadArgument;
107  public:
109 
111  bool start(SimpleCounter* count = NULL);
112 
113  virtual ~Thread(void) {};
114 
115  protected:
117  virtual void thread(void) = 0;
118 
119  };
120 
121  template<typename T> class AutoLock {
122  public:
123  AutoLock(T& olock, bool olocked = true) : lock_(olock), locked_(0) {
124  if(olocked) lock();
125  }
126 
127  ~AutoLock() {
128  if(locked_ != 0) lock_.unlock();
129  locked_ = 0;
130  }
131 
132  void lock() {
133  if(locked_ == 0) lock_.lock();
134  ++locked_;
135  }
136 
137  void unlock() {
138  if(locked_ == 1) lock_.unlock();
139  --locked_;
140  }
141 
142  private:
143  T& lock_;
144  int locked_;
145  };
146 
148 
151  private:
152  Glib::Cond cond_;
153  Glib::Mutex lock_;
154  unsigned int flag_;
155  unsigned int waiting_;
156  public:
157  SimpleCondition(void)
158  : flag_(0), waiting_(0) {}
159  ~SimpleCondition(void) {
160  /* race condition ? */
161  broadcast();
162  }
164  void lock(void) {
165  lock_.lock();
166  }
168  void unlock(void) {
169  lock_.unlock();
170  }
172  void signal(void) {
173  lock_.lock();
174  flag_ = 1;
175  cond_.signal();
176  lock_.unlock();
177  }
179 
180  void signal_nonblock(void) {
181  flag_ = 1;
182  cond_.signal();
183  }
185 
186  void broadcast(void) {
187  lock_.lock();
188  flag_ = waiting_?waiting_:1;
189  cond_.broadcast();
190  lock_.unlock();
191  }
193  void wait(void) {
194  lock_.lock();
195  ++waiting_;
196  while (!flag_) cond_.wait(lock_);
197  --waiting_;
198  --flag_;
199  lock_.unlock();
200  }
202 
203  void wait_nonblock(void) {
204  ++waiting_;
205  while (!flag_) cond_.wait(lock_);
206  --waiting_;
207  --flag_;
208  }
210 
211  bool wait(int t) {
212  lock_.lock();
213  Glib::TimeVal etime;
214  etime.assign_current_time();
215  etime.add_milliseconds(t);
216  bool res(true);
217  ++waiting_;
218  while (!flag_) {
219  res = cond_.timed_wait(lock_, etime);
220  if (!res) break;
221  }
222  --waiting_;
223  if(res) --flag_;
224  lock_.unlock();
225  return res;
226  }
228  void reset(void) {
229  lock_.lock();
230  flag_ = 0;
231  lock_.unlock();
232  }
234 
235  void forceReset(void) {
236  // This function is deprecated and its body removed because
237  // there is no safe way to reset locks after call to fork().
238  }
239  };
240 
242 
245  private:
246  Glib::Cond cond_;
247  Glib::Mutex lock_;
248  int count_;
249  public:
250  SimpleCounter(void) : count_(0) {}
251  virtual ~SimpleCounter(void);
253 
254  virtual int inc(void);
256 
257  virtual int dec(void);
259  virtual int get(void) const;
261 
262  virtual int set(int v);
264  virtual void wait(void) const;
266 
268  virtual bool wait(int t) const;
270 
271  virtual void forceReset(void) {
272  // This function is deprecated and its body removed because
273  // there is no safe way to reset locks after call to fork().
274  }
275  };
276 
278 
279  class TimedMutex {
280  private:
281  Glib::Cond cond_;
282  Glib::Mutex lock_;
283  bool locked_;
284  public:
285  TimedMutex(void):locked_(false) { };
286  ~TimedMutex(void) { };
288 
289  bool lock(int t = -1) {
290  lock_.lock();
291  if(t < 0) { // infinite
292  while(locked_) {
293  cond_.wait(lock_);
294  };
295  } else if(t > 0) { // timed
296  Glib::TimeVal etime;
297  etime.assign_current_time();
298  etime.add_milliseconds(t);
299  while(locked_) {
300  if(!cond_.timed_wait(lock_, etime)) break;
301  };
302  };
303  bool res = !locked_;
304  locked_=true;
305  lock_.unlock();
306  return res;
307  };
309  bool trylock(void) {
310  return lock(0);
311  };
313  bool unlock(void) {
314  lock_.lock();
315  bool res = locked_;
316  if(res) {
317  locked_ = false;
318  cond_.signal();
319  };
320  lock_.unlock();
321  return true;
322  };
324 
325  void forceReset(void) {
326  // This function is deprecated and its body removed because
327  // there is no safe way to reset locks after call to fork().
328  }
329  };
330 
332 
333  class SharedMutex {
334  private:
335  Glib::Cond cond_;
336  Glib::Mutex lock_;
337  unsigned int exclusive_;
338  Glib::Thread* thread_;
339  typedef std::map<Glib::Thread*,unsigned int> shared_list;
340  shared_list shared_;
341  void add_shared_lock(void);
342  void remove_shared_lock(void);
343  bool have_shared_lock(void);
344  inline bool have_exclusive_lock(void) {
345  if(!exclusive_) return false;
346  if(thread_ == Glib::Thread::self()) return false;
347  return true;
348  };
349  public:
350  SharedMutex(void):exclusive_(0),thread_(NULL) { };
351  ~SharedMutex(void) { };
353  void lockShared(void);
355  void unlockShared(void);
357  bool isLockShared(void) {
358  return (shared_.size() > 0); // Is it safe?
359  };
361  void lockExclusive(void);
363  void unlockExclusive(void);
365  bool isLockExclusive(void) {
366  return (exclusive_ > 0);
367  };
369 
370  void forceReset(void) {
371  // This function is deprecated and its body removed because
372  // there is no safe way to reset locks after call to fork().
373  };
374  };
375 
378  private:
379  SharedMutex& mutex_;
380  public:
381  SharedMutexSharedLock(SharedMutex& mutex):mutex_(mutex) {
382  mutex_.lockShared();
383  };
385  mutex_.unlockShared();
386  };
387  };
388 
391  private:
392  SharedMutex& mutex_;
393  public:
394  SharedMutexExclusiveLock(SharedMutex& mutex):mutex_(mutex) {
395  mutex_.lockExclusive();
396  };
398  mutex_.unlockExclusive();
399  };
400  };
401 
404  class ThreadedPointerBase {
405  private:
406  Glib::Mutex lock_;
407  Glib::Cond cond_;
408  unsigned int cnt_;
409  void *ptr_;
410  bool released_;
411  ThreadedPointerBase(ThreadedPointerBase&);
412  ~ThreadedPointerBase(void);
413  public:
414  ThreadedPointerBase(void *p);
415  ThreadedPointerBase* add(void);
416  void* rem(void);
417  void* ptr(void) const { return ptr_; };
418  void rel(void) { released_ = true; };
419  unsigned int cnt(void) const { return cnt_; };
420  void lock(void) { lock_.lock(); };
421  void unlock(void) { lock_.unlock(); };
422  void wait(void) { cond_.wait(lock_); };
423  bool wait(Glib::TimeVal etime) {
424  return cond_.timed_wait(lock_,etime);
425  };
426  };
429 
435  template<typename T>
437  private:
438  ThreadedPointerBase *object_;
439  public:
440  ThreadedPointer(T *p)
441  : object_(new ThreadedPointerBase(p)) {}
443  : object_(p.object_->add()) {}
444  ThreadedPointer(void)
445  : object_(new ThreadedPointerBase(NULL)) {}
446  ~ThreadedPointer(void) {
447  delete((T*)(object_->rem()));
448  }
451  if (p != object_->ptr()) {
452  delete((T*)(object_->rem()));
453  object_ = new ThreadedPointerBase(p);
454  }
455  return *this;
456  }
459  if (p.object_->ptr() != object_->ptr()) {
460  delete((T*)(object_->rem()));
461  object_ = p.object_->add();
462  }
463  return *this;
464  }
466  T& operator*(void) const {
467  return *(T*)(object_->ptr());
468  }
470  T* operator->(void) const {
471  return (T*)(object_->ptr());
472  }
474  operator bool(void) const {
475  return ((object_->ptr()) != NULL);
476  }
478  bool operator!(void) const {
479  return ((object_->ptr()) == NULL);
480  }
482  bool operator==(const ThreadedPointer& p) const {
483  return ((T*)(object_->ptr()) == (T*)(p.object_->ptr()));
484  }
486  bool operator!=(const ThreadedPointer& p) const {
487  return ((T*)(object_->ptr()) != (T*)(p.object_->ptr()));
488  }
490  bool operator<(const ThreadedPointer& p) const {
491  return ((T*)(object_->ptr()) < (T*)(p.object_->ptr()));
492  }
494  T* Ptr(void) const {
495  return (T*)(object_->ptr());
496  }
498 
500  T* Release(void) {
501  T* tmp = (T*)(object_->ptr());
502  object_->rel();
503  return tmp;
504  }
506  unsigned int Holders(void) {
507  return object_->cnt();
508  }
510  /* Returns current number of instances. */
511  unsigned int WaitOutRange(unsigned int minThr, unsigned int maxThr) {
512  unsigned int r = 0;
513  object_->lock();
514  for(;;) {
515  r = object_->cnt();
516  if(r <= minThr) break;
517  if(r >= maxThr) break;
518  object_->wait();
519  };
520  object_->unlock();
521  return r;
522  }
524 
526  unsigned int WaitOutRange(unsigned int minThr, unsigned int maxThr, int timeout) {
527  if(timeout < 0) return WaitOutRange(minThr, maxThr);
528  unsigned int r = 0;
529  object_->lock();
530  Glib::TimeVal etime;
531  etime.assign_current_time();
532  etime.add_milliseconds(timeout);
533  for(;;) {
534  r = object_->cnt();
535  if(r <= minThr) break;
536  if(r >= maxThr) break;
537  if(!object_->wait(etime)) break;
538  };
539  object_->unlock();
540  return r;
541  }
543  /* Returns current number of instances. */
544  unsigned int WaitInRange(unsigned int minThr, unsigned int maxThr) {
545  unsigned int r = 0;
546  object_->lock();
547  for(;;) {
548  r = object_->cnt();
549  if((r >= minThr) && (r <= maxThr)) break;
550  object_->wait();
551  };
552  object_->unlock();
553  return r;
554  }
556 
558  unsigned int WaitInRange(unsigned int minThr, unsigned int maxThr, int timeout) {
559  if(timeout < 0) return WaitInRange(minThr, maxThr);
560  unsigned int r = 0;
561  object_->lock();
562  Glib::TimeVal etime;
563  etime.assign_current_time();
564  etime.add_milliseconds(timeout);
565  for(;;) {
566  r = object_->cnt();
567  if((r >= minThr) && (r <= maxThr)) break;
568  if(!object_->wait(etime)) break;
569  };
570  object_->unlock();
571  return r;
572  }
573 
574  };
575 
577 
580  private:
581  int counter_;
582  bool cancel_;
583  Glib::Cond cond_;
584  Glib::Mutex lock_;
585  public:
586  ThreadRegistry(void);
587  ~ThreadRegistry(void);
589  void RegisterThread(void);
591  void UnregisterThread(void);
593 
594  bool WaitOrCancel(int timeout);
596 
598  bool WaitForExit(int timeout = -1);
600  void RequestCancel(void);
602 
603  void forceReset(void) {
604  // This function is deprecated and its body removed because
605  // there is no safe way to reset locks after call to fork().
606  }
607  };
608 
611  void GlibThreadInitialize(void);
614  class ThreadInitializer {
616  public:
619  GlibThreadInitialize();
620  }
622 
624  void forceReset(void);
626 
628  void waitExit(void);
629  };
630 
631  // This is done intentionally to make sure glibmm is
632  // properly initialized before every module starts
633  // using threads functionality. To make it work this
634  // header must be included before defining any
635  // variable/class instance using static threads-related
636  // elements. The simplest way to do that is to use
637  // this header instead of glibmm/thread.h
638  static ThreadInitializer _local_thread_initializer;
639 
642 } // namespace Arc
643 
644 #endif /* __ARC_THREAD_H__ */
bool operator!=(const ThreadedPointer &p) const
Returns true if pointers are not equal.
Definition: Thread.h:486
Arc namespace contains all core ARC classes.
Definition: ArcConfig.h:11
void lock(void)
Acquire semaphor.
Definition: Thread.h:164
void Attach(std::string &key)
Attaches object to current thread under key.
bool wait(int t)
Wait for condition no longer than t milliseconds.
Definition: Thread.h:211
Simple triggered condition.
Definition: Thread.h:150
void unlockShared(void)
Release a shared lock.
void forceReset(void)
This method is meant to be used only after fork.
Definition: Thread.h:603
bool operator!(void) const
Returns true if pointer is NULL and false otherwise.
Definition: Thread.h:478
Base class for per-thread object.
Definition: Thread.h:66
void broadcast(void)
Signal about condition to all waiting threads.
Definition: Thread.h:186
Exclusive lock for SharedMutex.
Definition: Thread.h:390
Thread-safe counter with capability to wait for zero value.
Definition: Thread.h:244
bool unlock(void)
Release mutex.
Definition: Thread.h:313
bool isLockExclusive(void)
Returns true if the exclusive lock is held.
Definition: Thread.h:365
T * Release(void)
Release referred object so that it can be passed to other container.
Definition: Thread.h:500
virtual void wait(void) const
Wait for zero condition.
void signal_nonblock(void)
Signal about condition without using semaphor.
Definition: Thread.h:180
virtual void thread(void)=0
Implement this method and put thread functionality into it.
void unlockExclusive(void)
Release exclusive lock.
void unlock(void)
Release semaphor.
Definition: Thread.h:168
void UnregisterThread(void)
Report thread as exited.
void RegisterThread(void)
Register thread as started/starting into this instance.
bool start(SimpleCounter *count=NULL)
Start thread.
void wait_nonblock(void)
Wait for condition without using semaphor.
Definition: Thread.h:203
void forceReset(void)
This method is meant to be used only after fork.
ThreadDataItem(void)
Dummy constructor which does nothing.
Mutex which allows shared and exclusive locking.
Definition: Thread.h:333
bool lock(int t=-1)
Lock mutex, but wait no longer than t milliseconds.
Definition: Thread.h:289
T * operator->(void) const
For referring to wrapped object.
Definition: Thread.h:470
unsigned int Holders(void)
Returns number of ThreadedPointer instances referring to underlying object.
Definition: Thread.h:506
bool CreateThreadFunction(void(*func)(void *), void *arg, SimpleCounter *count=NULL)
Helper function to create simple thread.
void reset(void)
Reset object to initial state.
Definition: Thread.h:228
T & operator*(void) const
For referring to wrapped object.
Definition: Thread.h:466
bool operator==(const ThreadedPointer &p) const
Returns true if pointers are equal.
Definition: Thread.h:482
bool isLockShared(void)
Returns true if at least one shared lock is held.
Definition: Thread.h:357
Base class for simple object associated thread.
Definition: Thread.h:105
unsigned int WaitInRange(unsigned int minThr, unsigned int maxThr, int timeout)
Waits till number of ThreadedPointer instances >= minThr and <= maxThr.
Definition: Thread.h:558
void signal(void)
Definition: Thread.h:172
unsigned int WaitOutRange(unsigned int minThr, unsigned int maxThr, int timeout)
Waits till number of ThreadedPointer instances <= minThr or >= maxThr.
Definition: Thread.h:526
bool WaitOrCancel(int timeout)
Wait for timeout milliseconds or cancel request.
unsigned int WaitInRange(unsigned int minThr, unsigned int maxThr)
Waits till number of ThreadedPointer instances >= minThr and <= maxThr.
Definition: Thread.h:544
virtual void forceReset(void)
This method is meant to be used only after fork.
Definition: Thread.h:271
A set of conditions, mutexes, etc. conveniently exposed to monitor running child threads and to wait ...
Definition: Thread.h:579
T * Ptr(void) const
Cast to original pointer.
Definition: Thread.h:494
unsigned int WaitOutRange(unsigned int minThr, unsigned int maxThr)
Waits till number of ThreadedPointer instances <= minThr or >= maxThr.
Definition: Thread.h:511
void forceReset(void)
This method is meant to be used only after fork.
Definition: Thread.h:235
virtual int inc(void)
Increment value of counter.
ThreadedPointer< T > & operator=(const ThreadedPointer< T > &p)
Assign a new ThreadedPointer from another ThreadedPointer.
Definition: Thread.h:458
virtual int dec(void)
Decrement value of counter.
Definition: Thread.h:121
void forceReset(void)
This method is meant to be used only after fork.
Definition: Thread.h:370
bool WaitForExit(int timeout=-1)
Wait for registered threads to exit.
ThreadedPointer< T > & operator=(T *p)
Assign a new ThreadedPointer from a pointer to an object.
Definition: Thread.h:450
void wait(void)
Wait for condition.
Definition: Thread.h:193
ThreadInitializer(void)
Initialise the thread system.
Definition: Thread.h:618
void lockExclusive(void)
Acquire an exclusive lock. Blocks until all shared and exclusive locks are released.
Mutex which allows a timeout on locking.
Definition: Thread.h:279
virtual void Dup(void)
Creates copy of object.
void forceReset(void)
This method is meant to be used only after fork.
Definition: Thread.h:325
Shared lock for SharedMutex.
Definition: Thread.h:377
void RequestCancel(void)
Send cancel request to registered threads.
void waitExit(void)
Wait for all known threads to exit.
bool operator<(const ThreadedPointer &p) const
Comparison operator.
Definition: Thread.h:490
bool trylock(void)
Returns true if mutex is currently locked, but does not attempt to acquire lock.
Definition: Thread.h:309
static ThreadDataItem * Get(const std::string &key)
Retrieves object attached to thread under key.
Wrapper for pointer with automatic destruction and multiple references.
Definition: Thread.h:436
void lockShared(void)
Acquire a shared lock. Blocks until exclusive lock is released.