Changeset 3240 for code/branches/netp6/src/core
- Timestamp:
- Jun 28, 2009, 3:04:30 PM (16 years ago)
- Location:
- code/branches/netp6/src/core
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
code/branches/netp6/src/core/CorePrereqs.h
r3231 r3240 189 189 // multithreading 190 190 class Thread; 191 class Thread Group;191 class ThreadPool; 192 192 } 193 193 -
code/branches/netp6/src/core/Thread.cc
r3231 r3240 36 36 37 37 #include "util/Sleep.h" 38 #include " Functor.h"38 #include "Executor.h" 39 39 40 40 namespace orxonox … … 44 44 45 45 Thread::Thread(): 46 functor_(0),46 executor_(0), 47 47 isWorking_(false), 48 48 stopThread_(false) 49 49 { 50 this->communicationMutex_ = new boost::mutex; 50 this->executorMutex_ = new boost::mutex; 51 this->isWorkingMutex_ = new boost::mutex; 52 this->stopThreadMutex_ = new boost::mutex; 51 53 this->workerThread_ = new boost::thread( boost::bind(&Thread::threadLoop, this) ); 52 54 } … … 54 56 Thread::~Thread() 55 57 { 58 this->stopThreadMutex_->lock(); 56 59 this->stopThread_ = true; 60 this->stopThreadMutex_->unlock(); 57 61 if( !this->workerThread_->timed_join( THREAD_WAIT_BEFORE_DETACH ) ) 58 62 assert(0); // this should not happen 59 63 delete this->workerThread_; 60 delete this->communicationMutex_; 64 delete this->executorMutex_; 65 delete this->stopThreadMutex_; 66 delete this->isWorkingMutex_; 61 67 } 62 68 63 bool Thread:: evaluateFunctor( Functor* functor)69 bool Thread::isWorking() 64 70 { 65 if( this->communicationMutex_->try_lock() ) 66 { 67 this->functor_ = functor; 68 this->communicationMutex_->unlock(); 69 return true; 70 } 71 else 72 return false; 71 this->isWorkingMutex_->lock(); 72 bool isWorking = this->isWorking_; 73 this->isWorkingMutex_->unlock(); 74 return isWorking; 75 } 76 77 bool Thread::evaluateExecutor( Executor* executor ) 78 { 79 this->isWorkingMutex_->lock(); 80 this->isWorking_=true; 81 this->isWorkingMutex_->unlock(); 82 this->executorMutex_->lock(); 83 this->executor_ = executor; 84 this->executorMutex_->unlock(); 85 return true; 73 86 } 74 87 75 88 void Thread::threadLoop() 76 89 { 77 while( !this->stopThread_ ) 90 bool stopThread = false; 91 while( !stopThread ) 78 92 { 79 this->communicationMutex_->lock(); 80 if( this->functor_ ) 93 this->executorMutex_->lock(); 94 Executor* executor = this->executor_; 95 this->executorMutex_->unlock(); 96 if( executor ) 81 97 { 82 (*this->functor_)(); 83 this->communicationMutex_->unlock(); 98 (*executor)(); 99 this->executorMutex_->lock(); 100 delete this->executor_; 101 this->executor_ = 0; 102 this->executorMutex_->unlock(); 103 this->isWorkingMutex_->lock(); 104 this->isWorking_=false; 105 this->isWorkingMutex_->unlock(); 84 106 } 85 107 else 86 108 { 87 this->communicationMutex_->unlock();88 109 this->workerThread_->yield(); 89 110 } 111 this->stopThreadMutex_->lock(); 112 stopThread = this->stopThread_; 113 this->stopThreadMutex_->unlock(); 90 114 } 91 115 } … … 96 120 while( stillWorking ) 97 121 { 98 this-> communicationMutex_->lock();122 this->isWorkingMutex_->lock(); 99 123 stillWorking = this->isWorking_; 100 this-> communicationMutex_->unlock();124 this->isWorkingMutex_->unlock(); 101 125 if( stillWorking ) 102 126 msleep( 1 ); -
code/branches/netp6/src/core/Thread.h
r3231 r3240 32 32 #include "CorePrereqs.h" 33 33 34 namespace boost{ 35 class recursive_mutex; 36 } 37 34 38 namespace orxonox 35 39 { … … 40 44 virtual ~Thread(); 41 45 42 inline bool isWorking() { return this->isWorking_; }46 bool isWorking(); 43 47 void waitUntilFinished(); 44 bool evaluate Functor( Functor* functor );48 bool evaluateExecutor( Executor* executor ); 45 49 46 50 private: 47 51 void threadLoop(); 48 52 49 Functor* functor_;53 Executor* executor_; 50 54 bool isWorking_; 51 55 bool stopThread_; 52 56 boost::thread* workerThread_; 53 boost::mutex* communicationMutex_; 57 boost::mutex* executorMutex_; 58 boost::mutex* isWorkingMutex_; 59 boost::mutex* stopThreadMutex_; 54 60 }; 55 61 -
code/branches/netp6/src/core/ThreadPool.cc
r3231 r3240 28 28 29 29 #include "ThreadPool.h" 30 #include "Thread.h" 30 31 #include <cassert> 31 32 … … 39 40 ThreadPool::~ThreadPool() 40 41 { 42 unsigned int a = this->setNrOfThreads(0); 43 assert(a == 0); 41 44 } 42 45 … … 44 47 { 45 48 for( unsigned int i=0; i<nr; i++ ) 46 this->threadPool_.push_back( Thread());49 this->threadPool_.push_back(new Thread()); 47 50 } 48 51 unsigned int ThreadPool::removeThreads( unsigned int nr ) 49 52 { 50 53 unsigned int i=0; 51 std::vector<Thread >::iterator it;52 for( it = this->threadPool_.begin(); it != threadPool_.end() && i<nr; ++it)54 std::vector<Thread*>::iterator it; 55 for( it = this->threadPool_.begin(); it != threadPool_.end() && i<nr; ) 53 56 { 54 if( ! it->isWorking() )57 if( ! (*it)->isWorking() ) 55 58 { 56 this->threadPool_.erase( it++ ); 59 Thread* temp = *it; 60 it=this->threadPool_.erase( it ); 61 delete temp; 57 62 ++i; 58 63 } 64 else 65 ++it; 59 66 } 60 67 return i; … … 74 81 } 75 82 76 bool ThreadPool::passFunction( Functor* functor, bool addThread )83 bool ThreadPool::passFunction( Executor* executor, bool addThread ) 77 84 { 78 std::vector<Thread >::iterator it;85 std::vector<Thread*>::iterator it; 79 86 for ( it=this->threadPool_.begin(); it!=this->threadPool_.end(); ++it ) 80 87 { 81 if ( ! it->isWorking() )88 if ( ! (*it)->isWorking() ) 82 89 { 83 bool b = it->evaluateFunctor( functor );90 bool b = (*it)->evaluateExecutor( executor ); 84 91 assert(b); // if b is false then there is some code error 85 92 return true; … … 89 96 { 90 97 addThreads( 1 ); 91 this->threadPool_.back().evaluateFunctor( functor ); // access the last element 98 bool b = this->threadPool_.back()->evaluateExecutor( executor ); // access the last element 99 assert(b); 92 100 return true; 93 101 } … … 98 106 void ThreadPool::synchronise() 99 107 { 100 std::vector<Thread >::iterator it;108 std::vector<Thread*>::iterator it; 101 109 for ( it=this->threadPool_.begin(); it!=this->threadPool_.end(); ++it ) 102 110 { 103 it->waitUntilFinished();111 (*it)->waitUntilFinished(); 104 112 } 105 113 } -
code/branches/netp6/src/core/ThreadPool.h
r3231 r3240 33 33 34 34 #include <vector> 35 #include "Thread.h"36 35 37 36 namespace orxonox … … 47 46 unsigned int setNrOfThreads( unsigned int nr ); 48 47 49 bool passFunction( Functor* functor, bool addThread=false );48 bool passFunction( Executor* executor, bool addThread=false ); 50 49 void synchronise(); 51 50 52 51 private: 53 std::vector<Thread > threadPool_;52 std::vector<Thread*> threadPool_; 54 53 55 54 };
Note: See TracChangeset
for help on using the changeset viewer.