Changeset 1257
- Timestamp:
- May 12, 2008, 6:33:03 PM (17 years ago)
- Location:
- code/branches/console/src/core
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
code/branches/console/src/core/TclThreadManager.cc
r1255 r1257 43 43 #include "util/Convert.h" 44 44 45 #define TCLTHREADMANAGER_MAX_QUEUE_LENGTH 10 2445 #define TCLTHREADMANAGER_MAX_QUEUE_LENGTH 100 46 46 #define TCLTHREADMANAGER_MAX_CPU_USAGE 0.50 47 47 … … 65 65 this->orxonoxInterpreterBundle_.id_ = 0; 66 66 this->orxonoxInterpreterBundle_.interpreter_ = TclBind::getInstance().getTclInterpreter(); 67 this->threadID_ = boost::this_thread::get_id(); 67 68 } 68 69 … … 75 76 unsigned int TclThreadManager::create() 76 77 { 77 boost::mutex::scoped_lock lock(TclThreadManager::getInstance().bundlesMutex_);78 boost::mutex::scoped_lock bundles_lock(TclThreadManager::getInstance().bundlesMutex_); 78 79 TclThreadManager::getInstance().threadCounter_++; 79 80 std::string name = getConvertedValue<unsigned int, std::string>(TclThreadManager::getInstance().threadCounter_); … … 84 85 bundle->interpreterName_ = name; 85 86 bundle->running_ = true; 87 bundle->finished_ = true; 86 88 87 89 TclThreadManager::getInstance().interpreterBundles_[TclThreadManager::getInstance().threadCounter_] = bundle; … … 96 98 { 97 99 { 98 boost::mutex::scoped_lock lock(bundle->runningMutex_);100 boost::mutex::scoped_lock running_lock(bundle->runningMutex_); 99 101 bundle->running_ = false; 100 102 } 101 { 102 boost::mutex::scoped_lock lock(bundle->finishedMutex_); 103 while (!bundle->finished_) 104 bundle->finishedCondition_.wait(lock); 105 } 106 { 107 boost::mutex::scoped_lock lock(bundle->interpreterMutex_); 108 delete bundle->interpreter_; 109 } 110 delete bundle; 111 { 112 boost::mutex::scoped_lock lock(TclThreadManager::getInstance().bundlesMutex_); 113 TclThreadManager::getInstance().interpreterBundles_.erase(threadID); 114 } 103 while (true) 104 { 105 { 106 boost::mutex::scoped_lock finished_lock(bundle->finishedMutex_); 107 if (bundle->finished_) 108 { 109 boost::mutex::scoped_lock bundles_lock(TclThreadManager::getInstance().bundlesMutex_); 110 boost::mutex::scoped_lock interpreter_lock(bundle->interpreterMutex_, boost::defer_lock_t()); 111 try 112 { 113 while (!interpreter_lock.try_lock()) 114 { 115 TclThreadManager::getInstance().orxonoxEvalCondition_.notify_one(); 116 boost::this_thread::yield(); 117 } 118 } catch (...) {} 119 delete bundle->interpreter_; 120 delete bundle; 121 TclThreadManager::getInstance().interpreterBundles_.erase(threadID); 122 break; 123 } 124 } 125 126 TclThreadManager::getInstance().orxonoxEvalCondition_.notify_one(); 127 boost::this_thread::yield(); 128 } 129 130 COUT(0) << "Destroyed Tcl-interpreter with ID " << threadID << std::endl; 115 131 } 116 132 } … … 158 174 if (bundle) 159 175 { 160 boost::mutex::scoped_lock lock(bundle->runningMutex_);176 boost::mutex::scoped_lock running_lock(bundle->runningMutex_); 161 177 return bundle->running_; 162 178 } … … 201 217 TclInterpreterBundle* TclThreadManager::getInterpreterBundle(unsigned int threadID) 202 218 { 203 if (threadID == 0) 204 return &this->orxonoxInterpreterBundle_; 205 206 boost::mutex::scoped_lock lock(this->bundlesMutex_); 219 boost::mutex::scoped_lock bundles_lock(this->bundlesMutex_); 207 220 std::map<unsigned int, TclInterpreterBundle*>::iterator it = this->interpreterBundles_.find(threadID); 208 221 if (it != this->interpreterBundles_.end()) … … 212 225 else 213 226 { 214 this-> forceCommandToFrontOfQueue("errorError: No Tcl-interpreter with ID " + getConvertedValue<unsigned int, std::string>(threadID) + " existing.");227 this->error("Error: No Tcl-interpreter with ID " + getConvertedValue<unsigned int, std::string>(threadID) + " existing."); 215 228 return 0; 216 229 } … … 230 243 } 231 244 245 void TclThreadManager::error(const std::string& error) 246 { 247 if (boost::this_thread::get_id() != this->threadID_) 248 { 249 boost::mutex::scoped_lock queue_lock(this->orxonoxInterpreterBundle_.queueMutex_); 250 if (this->orxonoxInterpreterBundle_.queue_.size() >= TCLTHREADMANAGER_MAX_QUEUE_LENGTH) 251 return; 252 } 253 254 this->forceCommandToFrontOfQueue("error " + error); 255 } 256 232 257 void TclThreadManager::pushCommandToQueue(const std::string& command) 233 258 { 234 boost::mutex::scoped_lock lock(this->orxonoxInterpreterBundle_.queueMutex_);259 boost::mutex::scoped_lock queue_lock(this->orxonoxInterpreterBundle_.queueMutex_); 235 260 while (this->orxonoxInterpreterBundle_.queue_.size() >= TCLTHREADMANAGER_MAX_QUEUE_LENGTH) 236 this->fullQueueCondition_.wait( lock);261 this->fullQueueCondition_.wait(queue_lock); 237 262 238 263 this->orxonoxInterpreterBundle_.queue_.push_back(command); … … 241 266 void TclThreadManager::forceCommandToFrontOfQueue(const std::string& command) 242 267 { 243 boost::mutex::scoped_lock lock(this->orxonoxInterpreterBundle_.queueMutex_);268 boost::mutex::scoped_lock queue_lock(this->orxonoxInterpreterBundle_.queueMutex_); 244 269 this->orxonoxInterpreterBundle_.queue_.push_front(command); 245 270 } … … 247 272 std::string TclThreadManager::popCommandFromQueue() 248 273 { 249 boost::mutex::scoped_lock lock(this->orxonoxInterpreterBundle_.queueMutex_);274 boost::mutex::scoped_lock queue_lock(this->orxonoxInterpreterBundle_.queueMutex_); 250 275 std::string temp = this->orxonoxInterpreterBundle_.queue_.front(); 251 276 this->orxonoxInterpreterBundle_.queue_.pop_front(); … … 256 281 bool TclThreadManager::queueIsEmpty() 257 282 { 258 boost::mutex::scoped_lock lock(this->orxonoxInterpreterBundle_.queueMutex_);283 boost::mutex::scoped_lock queue_lock(this->orxonoxInterpreterBundle_.queueMutex_); 259 284 return this->orxonoxInterpreterBundle_.queue_.empty(); 260 285 } … … 265 290 if (bundle) 266 291 { 267 boost::mutex::scoped_lock lock(bundle->queueMutex_);292 boost::mutex::scoped_lock queue_lock(bundle->queueMutex_); 268 293 if (bundle->queue_.size() >= TCLTHREADMANAGER_MAX_QUEUE_LENGTH) 269 294 { 270 this-> forceCommandToFrontOfQueue("errorError: Queue of Tcl-interpreter " + getConvertedValue<unsigned int, std::string>(threadID) + " is full, couldn't add command.");295 this->error("Error: Queue of Tcl-interpreter " + getConvertedValue<unsigned int, std::string>(threadID) + " is full, couldn't add command."); 271 296 return; 272 297 } … … 281 306 if (bundle) 282 307 { 283 boost::mutex::scoped_lock lock(bundle->queueMutex_);308 boost::mutex::scoped_lock queue_lock(bundle->queueMutex_); 284 309 std::string temp = bundle->queue_.front(); 285 310 bundle->queue_.pop_front(); … … 294 319 if (bundle) 295 320 { 296 boost::mutex::scoped_lock lock(bundle->queueMutex_);321 boost::mutex::scoped_lock queue_lock(bundle->queueMutex_); 297 322 return bundle->queue_.empty(); 298 323 } … … 305 330 return false; 306 331 307 boost::mutex::scoped_lock lock(target->queriersMutex_);308 309 { 310 boost::mutex::scoped_lock lock(querier->queriersMutex_);332 boost::mutex::scoped_lock queriers_lock(target->queriersMutex_); 333 334 { 335 boost::mutex::scoped_lock queriers_lock(querier->queriersMutex_); 311 336 target->queriers_.insert(target->queriers_.end(), querier->queriers_.begin(), querier->queriers_.end()); 312 337 } … … 316 341 if (std::find(target->queriers_.begin(), target->queriers_.end(), target->id_) != target->queriers_.end()) 317 342 { 318 this-> forceCommandToFrontOfQueue("errorError: Circular query (" + this->dumpList(target->queriers_) + " -> " + getConvertedValue<unsigned int, std::string>(target->id_) + "), couldn't query Tcl-interpreter with ID " + getConvertedValue<unsigned int, std::string>(target->id_) + " from other interpreter with ID " + getConvertedValue<unsigned int, std::string>(querier->id_) + ".");343 this->error("Error: Circular query (" + this->dumpList(target->queriers_) + " -> " + getConvertedValue<unsigned int, std::string>(target->id_) + "), couldn't query Tcl-interpreter with ID " + getConvertedValue<unsigned int, std::string>(target->id_) + " from other interpreter with ID " + getConvertedValue<unsigned int, std::string>(querier->id_) + "."); 319 344 return false; 320 345 } … … 331 356 if (this->updateQueriersList(querier, &this->orxonoxInterpreterBundle_)) 332 357 { 333 boost::mutex::scoped_lock lock(this->orxonoxInterpreterBundle_.interpreterMutex_);334 this->orxonoxEvalCondition_.wait( lock);358 boost::mutex::scoped_lock interpreter_lock(this->orxonoxInterpreterBundle_.interpreterMutex_); 359 this->orxonoxEvalCondition_.wait(interpreter_lock); 335 360 336 361 if (!CommandExecutor::execute(command, false)) 337 this-> forceCommandToFrontOfQueue("errorError: Can't execute command \"" + command + "\"!");362 this->error("Error: Can't execute command \"" + command + "\"!"); 338 363 339 364 if (CommandExecutor::getLastEvaluation().hasReturnvalue()) … … 341 366 } 342 367 343 boost::mutex::scoped_lock lock(this->orxonoxInterpreterBundle_.queriersMutex_);368 boost::mutex::scoped_lock queriers_lock(this->orxonoxInterpreterBundle_.queriersMutex_); 344 369 this->orxonoxInterpreterBundle_.queriers_.clear(); 345 370 } … … 349 374 std::string TclThreadManager::evalQuery(unsigned int querierID, unsigned int threadID, const std::string& command) 350 375 { 351 TclInterpreterBundle* target = this->getInterpreterBundle(threadID); 376 TclInterpreterBundle* target = 0; 377 if (threadID) 378 target = this->getInterpreterBundle(threadID); 379 else 380 target = &this->orxonoxInterpreterBundle_; 381 352 382 std::string output = ""; 353 383 if (target) … … 363 393 if (this->updateQueriersList(querier, target)) 364 394 { 365 boost::mutex::scoped_lock lock(target->interpreterMutex_, boost::defer_lock_t());395 boost::mutex::scoped_lock interpreter_lock(target->interpreterMutex_, boost::defer_lock_t()); 366 396 bool successfullyLocked = false; 367 397 try 368 398 { 369 399 if (querierID == 0 || std::find(querier->queriers_.begin(), querier->queriers_.end(), (unsigned int)0) != querier->queriers_.end()) 370 successfullyLocked = lock.try_lock();400 successfullyLocked = interpreter_lock.try_lock(); 371 401 else 372 402 { 373 while (! lock.try_lock())403 while (!interpreter_lock.try_lock()) 374 404 boost::this_thread::yield(); 375 405 … … 383 413 { output = (std::string)target->interpreter_->eval(command); } 384 414 catch (Tcl::tcl_error const &e) 385 { this-> forceCommandToFrontOfQueue("errorTcl error: " + (std::string)e.what()); }415 { this->error("Tcl error: " + (std::string)e.what()); } 386 416 catch (std::exception const &e) 387 { this-> forceCommandToFrontOfQueue("errorError while executing Tcl: " + (std::string)e.what()); }417 { this->error("Error while executing Tcl: " + (std::string)e.what()); } 388 418 } 389 419 else 390 420 { 391 this-> forceCommandToFrontOfQueue("errorError: Couldn't query Tcl-interpreter with ID " + getConvertedValue<unsigned int, std::string>(threadID) + ", interpreter is busy right now.");421 this->error("Error: Couldn't query Tcl-interpreter with ID " + getConvertedValue<unsigned int, std::string>(threadID) + ", interpreter is busy right now."); 392 422 } 393 423 } 394 424 395 boost::mutex::scoped_lock lock(target->queriersMutex_);425 boost::mutex::scoped_lock queriers_lock(target->queriersMutex_); 396 426 target->queriers_.clear(); 397 427 } … … 408 438 409 439 { 410 boost::mutex::scoped_lock lock(this->bundlesMutex_);440 boost::mutex::scoped_lock bundles_lock(this->bundlesMutex_); 411 441 for (std::map<unsigned int, TclInterpreterBundle*>::iterator it = this->interpreterBundles_.begin(); it != this->interpreterBundles_.end(); ++it) 412 442 { 413 boost::mutex::scoped_lock lock((*it).second->queueMutex_);443 boost::mutex::scoped_lock queue_lock((*it).second->queueMutex_); 414 444 if (!(*it).second->queue_.empty()) 415 445 { 416 446 std::string command = (*it).second->queue_.front(); 417 447 (*it).second->queue_.pop_front(); 418 419 448 { 420 boost::mutex::scoped_lock lock((*it).second->finishedMutex_);449 boost::mutex::scoped_lock finished_lock((*it).second->finishedMutex_); 421 450 (*it).second->finished_ = false; 422 451 } 423 424 452 boost::thread(boost::bind(&tclThread, (*it).second, command)); 425 453 } … … 428 456 429 457 { 430 boost::mutex::scoped_lock lock(this->orxonoxInterpreterBundle_.interpreterMutex_);458 boost::mutex::scoped_lock interpreter_lock(this->orxonoxInterpreterBundle_.interpreterMutex_); 431 459 unsigned long maxtime = (unsigned long)(dt * 1000000 * TCLTHREADMANAGER_MAX_CPU_USAGE); 432 460 Ogre::Timer timer; … … 442 470 void tclThread(TclInterpreterBundle* interpreterBundle, std::string command) 443 471 { 472 boost::mutex::scoped_lock interpreter_lock(interpreterBundle->interpreterMutex_); 444 473 try 445 474 { 446 boost::mutex::scoped_lock lock(interpreterBundle->interpreterMutex_);447 475 interpreterBundle->interpreter_->eval(command); 448 476 } 449 477 catch (Tcl::tcl_error const &e) 450 478 { 451 TclThreadManager::getInstance(). forceCommandToFrontOfQueue("errorTcl (ID " + getConvertedValue<unsigned int, std::string>(interpreterBundle->id_) + ") error: " + e.what());479 TclThreadManager::getInstance().error("Tcl (ID " + getConvertedValue<unsigned int, std::string>(interpreterBundle->id_) + ") error: " + e.what()); 452 480 } 453 481 catch (std::exception const &e) 454 482 { 455 TclThreadManager::getInstance(). forceCommandToFrontOfQueue("errorError while executing Tcl (ID " + getConvertedValue<unsigned int, std::string>(interpreterBundle->id_) + "): " + e.what());456 } 457 458 boost::mutex::scoped_lock lock(interpreterBundle->finishedMutex_);483 TclThreadManager::getInstance().error("Error while executing Tcl (ID " + getConvertedValue<unsigned int, std::string>(interpreterBundle->id_) + "): " + e.what()); 484 } 485 486 boost::mutex::scoped_lock finished_lock(interpreterBundle->finishedMutex_); 459 487 interpreterBundle->finished_ = true; 460 488 interpreterBundle->finishedCondition_.notify_all(); -
code/branches/console/src/core/TclThreadManager.h
r1255 r1257 85 85 TclInterpreterBundle* getInterpreterBundle(unsigned int threadID); 86 86 std::string dumpList(const std::list<unsigned int>& list); 87 void error(const std::string& error); 87 88 88 89 void pushCommandToQueue(const std::string& command); … … 113 114 boost::condition fullQueueCondition_; 114 115 boost::condition orxonoxEvalCondition_; 116 boost::thread::id threadID_; 115 117 }; 116 118
Note: See TracChangeset
for help on using the changeset viewer.