Changeset 3240
- Timestamp:
- Jun 28, 2009, 3:04:30 PM (15 years ago)
- Location:
- code/branches/netp6/src
- Files:
-
- 14 edited
Legend:
- Unmodified
- Added
- Removed
-
code/branches/netp6/src/core/CorePrereqs.h
r3231 r3240 189 189 // multithreading 190 190 class Thread; 191 class Thread Group;191 class ThreadPool; 192 192 } 193 193 -
code/branches/netp6/src/core/Thread.cc
r3231 r3240 36 36 37 37 #include "util/Sleep.h" 38 #include " Functor.h"38 #include "Executor.h" 39 39 40 40 namespace orxonox … … 44 44 45 45 Thread::Thread(): 46 functor_(0),46 executor_(0), 47 47 isWorking_(false), 48 48 stopThread_(false) 49 49 { 50 this->communicationMutex_ = new boost::mutex; 50 this->executorMutex_ = new boost::mutex; 51 this->isWorkingMutex_ = new boost::mutex; 52 this->stopThreadMutex_ = new boost::mutex; 51 53 this->workerThread_ = new boost::thread( boost::bind(&Thread::threadLoop, this) ); 52 54 } … … 54 56 Thread::~Thread() 55 57 { 58 this->stopThreadMutex_->lock(); 56 59 this->stopThread_ = true; 60 this->stopThreadMutex_->unlock(); 57 61 if( !this->workerThread_->timed_join( THREAD_WAIT_BEFORE_DETACH ) ) 58 62 assert(0); // this should not happen 59 63 delete this->workerThread_; 60 delete this->communicationMutex_; 64 delete this->executorMutex_; 65 delete this->stopThreadMutex_; 66 delete this->isWorkingMutex_; 61 67 } 62 68 63 bool Thread:: evaluateFunctor( Functor* functor)69 bool Thread::isWorking() 64 70 { 65 if( this->communicationMutex_->try_lock() ) 66 { 67 this->functor_ = functor; 68 this->communicationMutex_->unlock(); 69 return true; 70 } 71 else 72 return false; 71 this->isWorkingMutex_->lock(); 72 bool isWorking = this->isWorking_; 73 this->isWorkingMutex_->unlock(); 74 return isWorking; 75 } 76 77 bool Thread::evaluateExecutor( Executor* executor ) 78 { 79 this->isWorkingMutex_->lock(); 80 this->isWorking_=true; 81 this->isWorkingMutex_->unlock(); 82 this->executorMutex_->lock(); 83 this->executor_ = executor; 84 this->executorMutex_->unlock(); 85 return true; 73 86 } 74 87 75 88 void Thread::threadLoop() 76 89 { 77 while( !this->stopThread_ ) 90 bool stopThread = false; 91 while( !stopThread ) 78 92 { 79 this->communicationMutex_->lock(); 80 if( this->functor_ ) 93 this->executorMutex_->lock(); 94 Executor* executor = this->executor_; 95 this->executorMutex_->unlock(); 96 if( executor ) 81 97 { 82 (*this->functor_)(); 83 this->communicationMutex_->unlock(); 98 (*executor)(); 99 this->executorMutex_->lock(); 100 delete this->executor_; 101 this->executor_ = 0; 102 this->executorMutex_->unlock(); 103 this->isWorkingMutex_->lock(); 104 this->isWorking_=false; 105 this->isWorkingMutex_->unlock(); 84 106 } 85 107 else 86 108 { 87 this->communicationMutex_->unlock();88 109 this->workerThread_->yield(); 89 110 } 111 this->stopThreadMutex_->lock(); 112 stopThread = this->stopThread_; 113 this->stopThreadMutex_->unlock(); 90 114 } 91 115 } … … 96 120 while( stillWorking ) 97 121 { 98 this-> communicationMutex_->lock();122 this->isWorkingMutex_->lock(); 99 123 stillWorking = this->isWorking_; 100 this-> communicationMutex_->unlock();124 this->isWorkingMutex_->unlock(); 101 125 if( stillWorking ) 102 126 msleep( 1 ); -
code/branches/netp6/src/core/Thread.h
r3231 r3240 32 32 #include "CorePrereqs.h" 33 33 34 namespace boost{ 35 class recursive_mutex; 36 } 37 34 38 namespace orxonox 35 39 { … … 40 44 virtual ~Thread(); 41 45 42 inline bool isWorking() { return this->isWorking_; }46 bool isWorking(); 43 47 void waitUntilFinished(); 44 bool evaluate Functor( Functor* functor );48 bool evaluateExecutor( Executor* executor ); 45 49 46 50 private: 47 51 void threadLoop(); 48 52 49 Functor* functor_;53 Executor* executor_; 50 54 bool isWorking_; 51 55 bool stopThread_; 52 56 boost::thread* workerThread_; 53 boost::mutex* communicationMutex_; 57 boost::mutex* executorMutex_; 58 boost::mutex* isWorkingMutex_; 59 boost::mutex* stopThreadMutex_; 54 60 }; 55 61 -
code/branches/netp6/src/core/ThreadPool.cc
r3231 r3240 28 28 29 29 #include "ThreadPool.h" 30 #include "Thread.h" 30 31 #include <cassert> 31 32 … … 39 40 ThreadPool::~ThreadPool() 40 41 { 42 unsigned int a = this->setNrOfThreads(0); 43 assert(a == 0); 41 44 } 42 45 … … 44 47 { 45 48 for( unsigned int i=0; i<nr; i++ ) 46 this->threadPool_.push_back( Thread());49 this->threadPool_.push_back(new Thread()); 47 50 } 48 51 unsigned int ThreadPool::removeThreads( unsigned int nr ) 49 52 { 50 53 unsigned int i=0; 51 std::vector<Thread >::iterator it;52 for( it = this->threadPool_.begin(); it != threadPool_.end() && i<nr; ++it)54 std::vector<Thread*>::iterator it; 55 for( it = this->threadPool_.begin(); it != threadPool_.end() && i<nr; ) 53 56 { 54 if( ! it->isWorking() )57 if( ! (*it)->isWorking() ) 55 58 { 56 this->threadPool_.erase( it++ ); 59 Thread* temp = *it; 60 it=this->threadPool_.erase( it ); 61 delete temp; 57 62 ++i; 58 63 } 64 else 65 ++it; 59 66 } 60 67 return i; … … 74 81 } 75 82 76 bool ThreadPool::passFunction( Functor* functor, bool addThread )83 bool ThreadPool::passFunction( Executor* executor, bool addThread ) 77 84 { 78 std::vector<Thread >::iterator it;85 std::vector<Thread*>::iterator it; 79 86 for ( it=this->threadPool_.begin(); it!=this->threadPool_.end(); ++it ) 80 87 { 81 if ( ! it->isWorking() )88 if ( ! (*it)->isWorking() ) 82 89 { 83 bool b = it->evaluateFunctor( functor );90 bool b = (*it)->evaluateExecutor( executor ); 84 91 assert(b); // if b is false then there is some code error 85 92 return true; … … 89 96 { 90 97 addThreads( 1 ); 91 this->threadPool_.back().evaluateFunctor( functor ); // access the last element 98 bool b = this->threadPool_.back()->evaluateExecutor( executor ); // access the last element 99 assert(b); 92 100 return true; 93 101 } … … 98 106 void ThreadPool::synchronise() 99 107 { 100 std::vector<Thread >::iterator it;108 std::vector<Thread*>::iterator it; 101 109 for ( it=this->threadPool_.begin(); it!=this->threadPool_.end(); ++it ) 102 110 { 103 it->waitUntilFinished();111 (*it)->waitUntilFinished(); 104 112 } 105 113 } -
code/branches/netp6/src/core/ThreadPool.h
r3231 r3240 33 33 34 34 #include <vector> 35 #include "Thread.h"36 35 37 36 namespace orxonox … … 47 46 unsigned int setNrOfThreads( unsigned int nr ); 48 47 49 bool passFunction( Functor* functor, bool addThread=false );48 bool passFunction( Executor* executor, bool addThread=false ); 50 49 void synchronise(); 51 50 52 51 private: 53 std::vector<Thread > threadPool_;52 std::vector<Thread*> threadPool_; 54 53 55 54 }; -
code/branches/netp6/src/network/GamestateManager.cc
r3227 r3240 42 42 43 43 #include <cassert> 44 #include <queue> 45 #include <boost/thread/mutex.hpp> 44 46 45 47 #include "util/Debug.h" 48 #include "core/Executor.h" 49 #include "core/ThreadPool.h" 46 50 #include "ClientInformation.h" 47 51 #include "packet/Acknowledgement.h" … … 56 60 { 57 61 trafficControl_ = new TrafficControl(); 62 threadMutex_ = new boost::mutex(); 63 threadPool_ = new ThreadPool(); 58 64 } 59 65 … … 71 77 delete (*it2).second; 72 78 } 73 delete trafficControl_; 79 delete this->trafficControl_; 80 delete this->threadMutex_; 81 delete this->threadPool_; 74 82 } 75 83 … … 116 124 return true; 117 125 } 118 119 120 packet::Gamestate *GamestateManager::popGameState(unsigned int clientID) { 126 127 void GamestateManager::sendGamestates() 128 { 129 ClientInformation *temp = ClientInformation::getBegin(); 130 std::queue<packet::Gamestate*> clientGamestates; 131 while(temp != NULL){ 132 if( !(temp->getSynched()) ){ 133 COUT(5) << "Server: not sending gamestate" << std::endl; 134 temp=temp->next(); 135 if(!temp) 136 break; 137 continue; 138 } 139 COUT(4) << "client id: " << temp->getID() << " RTT: " << temp->getRTT() << " loss: " << temp->getPacketLoss() << std::endl; 140 COUT(5) << "Server: doing gamestate gamestate preparation" << std::endl; 141 int cid = temp->getID(); //get client id 142 143 packet::Gamestate *gs; 144 unsigned int gID = temp->getGamestateID(); 145 if(!reference) 146 return; 147 148 packet::Gamestate *client=0; 149 if(gID != GAMESTATEID_INITIAL){ 150 assert(gamestateMap_.find(cid)!=gamestateMap_.end()); 151 std::map<unsigned int, packet::Gamestate*>::iterator it = gamestateMap_[cid].find(gID); 152 if(it!=gamestateMap_[cid].end()) 153 { 154 client = it->second; 155 } 156 } 157 158 clientGamestates.push(0); 159 // finishGamestate( cid, clientGamestates.back(), client, reference ); 160 //FunctorMember<GamestateManager>* functor = 161 ExecutorMember<GamestateManager>* executor = createExecutor( createFunctor(&GamestateManager::finishGamestate) ); 162 executor->setObject(this); 163 executor->setDefaultValues( cid, &clientGamestates.back(), client, reference ); 164 // (*static_cast<Executor*>(executor))(); 165 this->threadPool_->passFunction( executor, true ); 166 // (*functor)( cid, &(clientGamestates.back()), client, reference ); 167 168 temp = temp->next(); 169 } 170 171 threadPool_->synchronise(); 172 173 while( !clientGamestates.empty() ) 174 { 175 if(clientGamestates.front()) 176 clientGamestates.front()->send(); 177 clientGamestates.pop(); 178 } 179 } 180 181 182 void GamestateManager::finishGamestate( unsigned int clientID, packet::Gamestate** destgamestate, packet::Gamestate* base, packet::Gamestate* gamestate ) { 121 183 //why are we searching the same client's gamestate id as we searched in 122 184 //Server::sendGameState? 123 packet::Gamestate *gs;124 unsigned int gID = ClientInformation::findClient(clientID)->getGamestateID();125 if(!reference)126 return 0;127 gs = reference->doSelection(clientID, 10000);128 185 // save the (undiffed) gamestate in the clients gamestate map 129 gamestateMap_[clientID][gs->getID()]=gs;130 186 //chose wheather the next gamestate is the first or not 131 packet::Gamestate *client=0; 132 if(gID != GAMESTATEID_INITIAL){ 133 assert(gamestateMap_.find(clientID)!=gamestateMap_.end()); 134 std::map<unsigned int, packet::Gamestate*>::iterator it = gamestateMap_[clientID].find(gID); 135 if(it!=gamestateMap_[clientID].end()) 136 { 137 client = it->second; 138 } 139 } 140 if(client){ 187 188 packet::Gamestate *gs = gamestate->doSelection(clientID, 20000); 189 // packet::Gamestate *gs = new packet::Gamestate(*gamestate); 190 // packet::Gamestate *gs = new packet::Gamestate(); 191 // gs->collectData( id_, 0x1 ); 192 this->threadMutex_->lock(); 193 gamestateMap_[clientID][gamestate->getID()]=gs; 194 this->threadMutex_->unlock(); 195 196 if(base) 197 { 198 141 199 // COUT(3) << "diffing" << std::endl; 142 200 // packet::Gamestate* gs1 = gs; 143 packet::Gamestate *diffed = gs->diff( client);201 packet::Gamestate *diffed = gs->diff(base); 144 202 //packet::Gamestate *gs2 = diffed->undiff(gs); 145 203 // assert(*gs == *gs2); … … 150 208 } 151 209 else{ 152 // COUT(3) << "not diffing" << std::endl;153 210 gs = new packet::Gamestate(*gs); 154 211 } 212 213 155 214 bool b = gs->compressData(); 156 215 assert(b); 157 COUT(4) << "sending gamestate with id " << gs->getID(); 158 if(gs->isDiffed()) 159 COUT(4) << " and baseid " << gs->getBaseID() << endl; 160 else 161 COUT(4) << endl; 162 return gs; 216 // COUT(4) << "sending gamestate with id " << gs->getID(); 217 // if(gamestate->isDiffed()) 218 // COUT(4) << " and baseid " << gs->getBaseID() << endl; 219 // else 220 // COUT(4) << endl; 221 gs->setClientID(clientID); 222 *destgamestate = gs; 163 223 } 164 224 -
code/branches/netp6/src/network/GamestateManager.h
r3214 r3240 45 45 #include <map> 46 46 #include "GamestateHandler.h" 47 #include "core/CorePrereqs.h" 47 48 48 49 namespace orxonox … … 73 74 bool processGamestates(); 74 75 bool update(); 75 packet::Gamestate *popGameState(unsigned int clientID); 76 void sendGamestates(); 77 // packet::Gamestate *popGameState(unsigned int clientID); 78 void finishGamestate( unsigned int clientID, packet::Gamestate** destgamestate, packet::Gamestate* base, packet::Gamestate* gamestate ); 76 79 77 80 bool getSnapshot(); … … 79 82 bool ack(unsigned int gamestateID, unsigned int clientID); 80 83 void removeClient(ClientInformation *client); 81 84 private: 82 85 bool processGamestate(packet::Gamestate *gs); 83 86 84 87 std::map<unsigned int, std::map<unsigned int, packet::Gamestate*> > gamestateMap_; 85 //std::map<int, packet::Gamestate*> gamestateMap; //map gsID to gamestate*86 //std::map<int, int> gamestateUsed; // save the number of clients, that use the specific gamestate87 88 std::map<unsigned int, packet::Gamestate*> gamestateQueue; 88 89 packet::Gamestate *reference; 89 90 TrafficControl *trafficControl_; 90 91 unsigned int id_; 92 boost::mutex* threadMutex_; 93 ThreadPool* threadPool_; 91 94 }; 92 95 -
code/branches/netp6/src/network/Host.cc
r3214 r3240 74 74 } 75 75 76 77 // bool Host::chat(std::string& message){78 // if(!instance_)79 // return false;80 // packet::Chat *c = new packet::Chat(message, getPlayerID());81 // return instance_->sendChat(c);82 // }83 84 // bool Host::receiveChat(packet::Chat *message, unsigned int clientID){85 // if(instance_)86 // return instance_->processChat(message, clientID);87 // else88 // return false;89 // }90 91 76 /** 92 77 * This function returns the ID of the player -
code/branches/netp6/src/network/Server.cc
r3214 r3240 48 48 #include "core/Clock.h" 49 49 #include "core/ObjectList.h" 50 #include "core/Executor.h" 51 #include "core/ThreadPool.h" 50 52 #include "packet/Chat.h" 51 53 #include "packet/ClassID.h" … … 68 70 */ 69 71 Server::Server() { 70 t imeSinceLastUpdate_=0;71 gamestates_ = new GamestateManager();72 this->timeSinceLastUpdate_=0; 73 this->threadPool_ = new ThreadPool(); 72 74 } 73 75 74 76 Server::Server(int port){ 75 77 this->setPort( port ); 76 t imeSinceLastUpdate_=0;77 gamestates_ = new GamestateManager();78 this->timeSinceLastUpdate_=0; 79 this->threadPool_ = new ThreadPool(); 78 80 } 79 81 … … 86 88 this->setPort( port ); 87 89 this->setBindAddress( bindAddress ); 88 t imeSinceLastUpdate_=0;89 gamestates_ = new GamestateManager();90 this->timeSinceLastUpdate_=0; 91 this->threadPool_ = new ThreadPool(); 90 92 } 91 93 … … 94 96 */ 95 97 Server::~Server(){ 96 if(gamestates_) 97 delete gamestates_; 98 delete this->threadPool_; 98 99 } 99 100 … … 138 139 */ 139 140 void Server::update(const Clock& time) { 141 // receive incoming packets 140 142 Connection::processQueue(); 141 gamestates_->processGamestates(); 143 // process incoming gamestates 144 GamestateManager::processGamestates(); 145 146 // pass sendFunctionCalls to worker thread pool 147 ExecutorStatic* functioncalls = createExecutor( createFunctor(&FunctionCallManager::sendCalls) ); 148 this->threadPool_->passFunction( functioncalls, true ); 149 150 this->threadPool_->synchronise(); 151 142 152 //this steers our network frequency 143 153 timeSinceLastUpdate_+=time.getDeltaTime(); … … 145 155 { 146 156 timeSinceLastUpdate_ -= static_cast<unsigned int>( timeSinceLastUpdate_ / NETWORK_PERIOD ) * NETWORK_PERIOD; 157 // ExecutorMember<GamestateManager>* updategamestate = createExecutor( createFunctor(&GamestateManager::updateGamestate); 158 // updategamestate->setObject( static_cast<GamestateManager*>(this) ); 159 // this->threadPool_->passFunction( updategamestate ); 147 160 updateGamestate(); 148 FunctionCallManager::sendCalls();149 161 } 150 162 sendPackets(); // flush the enet queue … … 175 187 */ 176 188 void Server::updateGamestate() { 177 //if( ClientInformation::getBegin()==NULL )189 if( ClientInformation::getBegin()==NULL ) 178 190 //no client connected 179 //return;180 gamestates_->update();191 return; 192 GamestateManager::update(); 181 193 COUT(5) << "Server: one gamestate update complete, goig to sendGameState" << std::endl; 182 194 //std::cout << "updated gamestate, sending it" << std::endl; … … 197 209 */ 198 210 bool Server::sendGameState() { 199 COUT(5) << "Server: starting function sendGameState" << std::endl; 200 ClientInformation *temp = ClientInformation::getBegin(); 201 bool added=false; 202 while(temp != NULL){ 203 if( !(temp->getSynched()) ){ 204 COUT(5) << "Server: not sending gamestate" << std::endl; 205 temp=temp->next(); 206 if(!temp) 207 break; 208 //think this works without continue 209 continue; 210 } 211 COUT(4) << "client id: " << temp->getID() << " RTT: " << temp->getRTT() << " loss: " << temp->getPacketLoss() << std::endl; 212 COUT(5) << "Server: doing gamestate gamestate preparation" << std::endl; 213 int gid = temp->getGamestateID(); //get gamestate id 214 int cid = temp->getID(); //get client id 215 COUT(5) << "Server: got acked (gamestate) ID from clientlist: " << gid << std::endl; 216 packet::Gamestate *gs = gamestates_->popGameState(cid); 217 if(gs==NULL){ 218 COUT(2) << "Server: could not generate gamestate (NULL from compress)" << std::endl; 219 temp = temp->next(); 220 continue; 221 } 222 //std::cout << "adding gamestate" << std::endl; 223 gs->setClientID(cid); 224 if ( !gs->send() ){ 225 COUT(3) << "Server: packet with client id (cid): " << cid << " not sended: " << temp->getFailures() << std::endl; 226 temp->addFailure(); 227 }else 228 temp->resetFailures(); 229 added=true; 230 temp=temp->next(); 231 // gs gets automatically deleted by enet callback 232 } 211 // COUT(5) << "Server: starting function sendGameState" << std::endl; 212 // ClientInformation *temp = ClientInformation::getBegin(); 213 // bool added=false; 214 // while(temp != NULL){ 215 // if( !(temp->getSynched()) ){ 216 // COUT(5) << "Server: not sending gamestate" << std::endl; 217 // temp=temp->next(); 218 // if(!temp) 219 // break; 220 // continue; 221 // } 222 // COUT(4) << "client id: " << temp->getID() << " RTT: " << temp->getRTT() << " loss: " << temp->getPacketLoss() << std::endl; 223 // COUT(5) << "Server: doing gamestate gamestate preparation" << std::endl; 224 // int cid = temp->getID(); //get client id 225 // packet::Gamestate *gs = GamestateManager::popGameState(cid); 226 // if(gs==NULL){ 227 // COUT(2) << "Server: could not generate gamestate (NULL from compress)" << std::endl; 228 // temp = temp->next(); 229 // continue; 230 // } 231 // //std::cout << "adding gamestate" << std::endl; 232 // gs->setClientID(cid); 233 // if ( !gs->send() ){ 234 // COUT(3) << "Server: packet with client id (cid): " << cid << " not sended: " << temp->getFailures() << std::endl; 235 // temp->addFailure(); 236 // }else 237 // temp->resetFailures(); 238 // added=true; 239 // temp=temp->next(); 240 // // gs gets automatically deleted by enet callback 241 // } 242 GamestateManager::sendGamestates(); 233 243 return true; 234 244 } … … 324 334 void Server::disconnectClient( ClientInformation *client ){ 325 335 ServerConnection::disconnectClient( client ); 326 gamestates_->removeClient(client);336 GamestateManager::removeClient(client); 327 337 // inform all the listeners 328 338 ObjectList<ClientConnectionListener>::iterator listener = ObjectList<ClientConnectionListener>::begin(); -
code/branches/netp6/src/network/Server.h
r3214 r3240 34 34 #include "core/CorePrereqs.h" 35 35 #include "Host.h" 36 #include "GamestateManager.h" 36 37 #include "ServerConnection.h" 37 38 … … 43 44 * It implements all functions necessary for a Server 44 45 */ 45 class _NetworkExport Server : public Host, public ServerConnection {46 class _NetworkExport Server : public Host, public ServerConnection, public GamestateManager{ 46 47 public: 47 48 Server(); … … 63 64 unsigned int shipID(){return 0;} 64 65 unsigned int playerID(){return 0;} 65 66 66 67 void addClient(ENetEvent *event); 67 68 bool createClient(int clientID); … … 75 76 void syncClassid(unsigned int clientID); 76 77 77 GamestateManager *gamestates_; 78 79 78 ThreadPool* threadPool_; 80 79 float timeSinceLastUpdate_; 81 80 }; -
code/branches/netp6/src/network/synchronisable/NetworkCallbackManager.cc
r3214 r3240 44 44 if (it != callbackSet_.end()) 45 45 { 46 delete (*it);47 46 callbackSet_.erase(it); 47 delete cb; 48 48 } 49 49 } -
code/branches/netp6/src/network/synchronisable/Synchronisable.cc
r3214 r3240 247 247 return 0; 248 248 uint32_t tempsize = 0; 249 #ifndef NDEBUG 249 250 if (this->classID==0) 250 251 COUT(3) << "classid 0 " << this->getIdentifier()->getName() << std::endl; 252 #endif 251 253 252 254 if (this->classID == static_cast<uint32_t>(-1)) -
code/branches/netp6/src/network/synchronisable/SynchronisableVariable.h
r3214 r3240 114 114 { 115 115 if (this->callback_ != 0) 116 { 116 117 NetworkCallbackManager::deleteCallback(this->callback_); //safe call for deletion 118 // this is neccessary because for example for a Vector3 all 3 components of the vector use the same callback 119 } 117 120 } 118 121 -
code/branches/netp6/src/orxonox/gamestates/GSDedicated.cc
r3205 r3240 103 103 #endif 104 104 //inputThread_->join(); 105 delete this->inputThread_; 105 106 106 107 GameMode::setHasServer(false);
Note: See TracChangeset
for help on using the changeset viewer.