/* orxonox - the future of 3D-vertical-scrollers Copyright (C) 2004 orx This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2, or (at your option) any later version. ### File Specific: main-programmer: Christoph Renner co-programmer: ... */ //#define DEBUG_SPECIAL_MODULE DEBUG_MODULE_ #include "message_manager.h" #include "network_stream.h" #include "shared_network_data.h" #include "converter.h" #include MessageManager* MessageManager::singletonRef = NULL; /** * standard constructor */ MessageManager::MessageManager () { this->setClassID( CL_MESSAGE_MANAGER, "MessageManager" ); newNumber = 1; setSynchronized( true ); } /** * standard deconstructor */ MessageManager::~MessageManager () { for ( MessageQueue::iterator it = messageQueue.begin(); it != messageQueue.end(); it++ ) { for ( std::list::iterator it2 = it->second.messages.begin(); it2 != it->second.messages.end(); it2++ ) { if ( it2->data ) { delete [] it2->data; it2->data = NULL; } } it->second.messages.clear(); it->second.toAck.clear(); } messageQueue.clear(); this->messageHandlerMap.clear(); MessageManager::singletonRef = NULL; } /** * get the diff to last acked state of userId * * this class does not use the normal SynchronizeableVars for zynchronisation. instead * it defines its own protocol * * @param userId user to create diff for * @param data buffer to copy diff in * @param maxLength max bytes to copy into data * @param stateId id of current state * @param fromStateId the reference state for the delta state * @param priorityTH tells getStateDiff to not send element with priority \< priorityTH * @return n bytes copied into data */ int MessageManager::getStateDiff( int userId, byte * data, int maxLength, int stateId, int fromStateId, int priorityTH ) { int i = 0; int n; n = Converter::intToByteArray( messageQueue[userId].toAck.size(), data + i, maxLength ); i += n; assert( n == INTSIZE ); for ( std::list::iterator it = messageQueue[userId].toAck.begin(); it != messageQueue[userId].toAck.end(); it++) { n = Converter::intToByteArray( *it, data + i, maxLength ); i += n; assert( n == INTSIZE ); } messageQueue[userId].toAck.clear(); n = Converter::intToByteArray( messageQueue[userId].messages.size(), data + i, maxLength ); i += n; assert( n == INTSIZE ); for ( std::list::iterator it = messageQueue[userId].messages.begin(); it != messageQueue[userId].messages.end(); it++ ) { n = Converter::intToByteArray( it->length, data + i, maxLength ); i += n; assert( n == INTSIZE ); n = Converter::intToByteArray( it->number, data + i, maxLength ); i += n; assert( n == INTSIZE ); n = Converter::intToByteArray( it->messageId, data + i, maxLength ); i += n; assert( n == INTSIZE ); assert( i + it->length <= maxLength ); memcpy( data + i, it->data, it->length ); i += it->length; } return i; } /** * sets a new state out of a diff created on another host * @param userId hostId of user who send me that diff * @param data pointer to diff * @param length length of diff * @param stateId id of current state * @param fromStateId id of the base state id * @return number bytes read * @todo check for permissions */ int MessageManager::setStateDiff( int userId, byte * data, int length, int stateId, int fromStateId ) { int i = 0; int n; int nAcks; assert( i + INTSIZE <= length ); n = Converter::byteArrayToInt( data + i, &nAcks ); assert( n == INTSIZE ); i += n; std::list acks; int number; for ( int j = 0; j < nAcks; j++ ) { assert( i + INTSIZE <= length ); n = Converter::byteArrayToInt( data + i, &number ); assert( n == INTSIZE ); i += n; acks.push_back( number ); } int nMessages; assert( i + INTSIZE <= length ); n = Converter::byteArrayToInt( data + i, &nMessages ); assert( n == INTSIZE ); i += n; int messageLength, messageId; for ( int j = 0; j < nMessages; j++ ) { assert( i + INTSIZE <= length ); n = Converter::byteArrayToInt( data + i, &messageLength ); assert( n == INTSIZE ); i += n; assert( i + INTSIZE <= length ); n = Converter::byteArrayToInt( data + i, &number ); assert( n == INTSIZE ); i += n; assert( i + INTSIZE <= length ); n = Converter::byteArrayToInt( data + i, &messageId ); assert( n == INTSIZE ); i += n; if ( number > 0 ) messageQueue[userId].toAck.push_back( number ); assert( i + messageLength <= length ); assert( messageHandlerMap.find( (MessageId)messageId ) != messageHandlerMap.end() ); if ( std::find( messageQueue[userId].recievedMessages.begin(), messageQueue[userId].recievedMessages.end(), number )== messageQueue[userId].recievedMessages.end() ) { if ( !(*(messageHandlerMap[(MessageId)messageId].cb))( (MessageId)messageId, data + i, messageLength, messageHandlerMap[(MessageId)messageId].someData, userId ) ) { NetworkMessage msg; msg.data = new byte[messageLength]; memcpy( msg.data, data + i, messageLength ); msg.length = messageLength; msg.messageId = (MessageId)messageId; msg.number = userId; incomingMessageBuffer.push_back( msg ); } messageQueue[userId].recievedMessages.push_back( number ); } i += messageLength; } //TODO maybe handle incomingMessage in tick function. else local messages will not be handled if no clients are connected for ( std::list::iterator it = incomingMessageBuffer.begin(); it != incomingMessageBuffer.end(); ) { if ( (*(messageHandlerMap[it->messageId].cb))( it->messageId, it->data, it->length, messageHandlerMap[it->messageId].someData, it->number ) ) { std::list::iterator delIt = it; if ( it->data ) delete it->data; it++; incomingMessageBuffer.erase( delIt ); continue; } it++; } //walk throu message queue and remove acked messages for ( std::list::iterator it = messageQueue[userId].messages.begin(); it != messageQueue[userId].messages.end(); ) { if ( std::find( acks.begin(), acks.end(), it->number) != acks.end() ) { std::list::iterator delIt = it; it++; messageQueue[userId].messages.erase( delIt ); continue; } it++; } //TODO find bether way. maybe with timestamp if ( messageQueue[userId].recievedMessages.size() > 1000 ) { for ( int j = 0; j < messageQueue[userId].recievedMessages.size() - 1000; j++ ) messageQueue[userId].recievedMessages.erase( messageQueue[userId].recievedMessages.begin() ); } return i; } /** * clean up memory reserved for user * @param userId userid */ void MessageManager::cleanUpUser( int userId ) { if ( messageQueue.find( userId ) == messageQueue.end() ) return; for ( std::list::iterator it = messageQueue[userId].messages.begin(); it != messageQueue[userId].messages.end(); it++ ) { if ( it->data ) delete it->data; it->data = NULL; } messageQueue[userId].toAck.clear(); messageQueue.erase( userId ); } /** * registers function to handle messages with id messageId. someData is passed to callbackfuntion * @param messageId message id to handle * @param cb function pointer to callback function * @param someData this pointer is passed to callback function without modification * @return true on success */ bool MessageManager::registerMessageHandler( MessageId messageId, MessageCallback cb, void * someData ) { MessageHandler messageHandler; messageHandler.cb = cb; messageHandler.messageId = messageId; messageHandler.someData = someData; messageHandlerMap[messageId] = messageHandler; return true; } /** * initializes buffers for user * @param userId userId */ void MessageManager::initUser( int userId ) { // just do something so map creates a new entry messageQueue[userId].toAck.clear(); //assert( messageQueue[userId].messages.size() == 0 ); } /** * send a message to one or more clients * recieverType: * RT_ALL send to all users. reciever is ignored * RT_USER send only to reciever * RT_NOT_USER send to all but reciever * * @param messageId message id * @param data pointer to data * @param dataLength length of data * @param recieverType * @param reciever */ void MessageManager::sendMessage( MessageId messageId, byte * data, int dataLength, RecieverType recieverType, int reciever, MessagePriority messagePriority ) { for ( MessageQueue::iterator it = messageQueue.begin(); it != messageQueue.end(); it++ ) { if ( recieverType == RT_ALL_ME || recieverType == RT_ALL_BUT_ME || recieverType == RT_USER && it->first == reciever || recieverType == RT_NOT_USER && it->first != reciever || recieverType == RT_SERVER && getNetworkStream()->isUserMasterServer( it->first ) || recieverType == RT_SERVER && getNetworkStream()->isUserProxyServerActive( it->first ) ) { NetworkMessage msg; msg.data = new byte[dataLength]; memcpy( msg.data, data, dataLength ); msg.length = dataLength; msg.messageId = messageId; msg.number = newNumber++; msg.priority = messagePriority; it->second.messages.push_back( msg ); } } if ( recieverType == RT_ALL_ME ) { NetworkMessage msg; msg.data = new byte[dataLength]; memcpy( msg.data, data, dataLength ); msg.length = dataLength; msg.messageId = messageId; msg.number = SharedNetworkData::getInstance()->getHostID(); msg.priority = messagePriority; incomingMessageBuffer.push_back( msg ); } }