Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

Changeset 7671 in orxonox.OLD for branches


Ignore:
Timestamp:
May 18, 2006, 10:48:17 AM (19 years ago)
Author:
rennerc
Message:

started implementing MessageManager

Location:
branches/network/src/lib/network
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • branches/network/src/lib/network/message_manager.cc

    r7631 r7671  
    2020using namespace std;
    2121
     22MessageManager* MessageManager::singletonRef = NULL;
     23
    2224
    2325/**
     
    3537MessageManager::~MessageManager ()
    3638{
     39  for ( MessageQueue::iterator it = messageQueue.begin(); it != messageQueue.end(); it++ )
     40  {
     41    for ( std::list<NetworkMessage>::iterator it2 = it->second.messages.begin(); it2 != it->second.messages.end(); it2++ )
     42    {
     43      if ( it2->data )
     44      {
     45        delete it2->data;
     46        it2->data = NULL;
     47      }
     48    }
     49   
     50    it->second.messages.clear();
     51    it->second.toAck.clear();
     52  }
     53 
     54  messageQueue.clear();
    3755}
    3856
     
    4058 * get the diff to last acked state of userId
    4159 *
    42  * each synchrinizeable defines its own stack of states received and sent over the network. The stack contains
    43  * a per user entry for the last sent/received state This function returns a delta compressed state of the
    44  * synchronizeable. This state will be transmitted over the network to the other participants
     60 * this class does not use the normal SynchronizeableVars for zynchronisation. instead
     61 * it defines its own protocol
    4562 *
    4663 * @param userId user to create diff for
     
    5471int MessageManager::getStateDiff( int userId, byte * data, int maxLength, int stateId, int fromStateId, int priorityTH )
    5572{
     73  int i = 0;
     74  int n;
     75 
     76  n = Converter::intToByteArray( messageQueue[userId].toAck.size(), data + i, maxLength );
     77  i += n;
     78  assert( n == INTSIZE );
     79 
     80  for ( std::list<int>::iterator it = messageQueue[userId].toAck.begin(); it != messageQueue[userId].toAck.end(); it++)
     81  {
     82    n = Converter::intToByteArray( *it, data + i, maxLength );
     83    i += n;
     84    assert( n == INTSIZE );
     85  }
     86 
     87  messageQueue[userId].toAck.clear();
     88 
     89  n = Converter::intToByteArray( messageQueue[userId].messages.size(), data + i, maxLength );
     90  i += n;
     91  assert( n == INTSIZE );
     92 
     93  for ( std::list<NetworkMessage>::iterator it = messageQueue[userId].messages.begin(); it != messageQueue[userId].messages.end(); it++ )
     94  {
     95    n = Converter::intToByteArray( it->length, data + i, maxLength );
     96    i += n;
     97    assert( n == INTSIZE );
     98   
     99    n = Converter::intToByteArray( it->number, data + i, maxLength );
     100    i += n;
     101    assert( n == INTSIZE );
     102   
     103    n = Converter::intToByteArray( it->messageId, data + i, maxLength );
     104    i += n;
     105    assert( n == INTSIZE );
     106   
     107    assert( i + it->length <= maxLength );
     108    memcpy( data + i, it->data, it->length );
     109    i += it->length;
     110  }
     111 
     112  return i;
    56113}
    57114
     
    68125int MessageManager::setStateDiff( int userId, byte * data, int length, int stateId, int fromStateId )
    69126{
     127  int i = 0;
     128  int n;
     129 
     130  int nAcks;
     131 
     132  n = Converter::byteArrayToInt( data + i, &nAcks );
     133  assert( n == INTSIZE );
     134  i += n;
     135 
     136  std::list<int> acks;
     137 
     138  int number;
     139 
     140  for ( int j = 0; j < nAcks; j++ )
     141  {
     142    n = Converter::byteArrayToInt( data + i, &number );
     143    assert( n == INTSIZE );
     144    i += n;
     145   
     146    acks.push_back( number );
     147  }
     148 
     149  int nMessages;
     150 
     151  n = Converter::byteArrayToInt( data + i, &nMessages );
     152  assert( n == INTSIZE );
     153  i += n;
     154 
     155  for ( int j = 0; j < nMessages; j++ )
     156  {
     157  }
    70158}
    71159
     
    77165{
    78166}
     167
     168/**
     169 * registers function to handle messages with id messageId. someData is passed to callbackfuntion
     170 * @param messageId message id to handle
     171 * @param cb function pointer to callback function
     172 * @param someData this pointer is passed to callback function without modification
     173 * @return true on success
     174 */
     175bool MessageManager::registerMessageHandler( MessageId messageId, MessageCallback cb, void * someData )
     176{
     177}
     178
     179/**
     180 * initializes buffers for user
     181 * @param userId userId
     182 */
     183void MessageManager::initUser( int userId )
     184{
     185}
  • branches/network/src/lib/network/message_manager.h

    r7631 r7671  
    99#include "synchronizeable.h"
    1010
    11 // FORWARD DECLARATION
     11#include <map>
     12#include <list>
     13
     14/*
     15  Protocol:
     16    int nacks
     17    int acks[1..nacks]
     18    int nmsg
     19    (
     20      int length
     21      int number
     22      byte MessageId
     23      byte * data
     24    )[1..nmsg]
     25*/
     26
     27typedef void (*MessageCallback)( byte * data, int dataLength, void * someData, int userId );
     28
     29
     30enum MessageId
     31{
     32};
     33
     34enum RecieverType
     35{
     36  RT_ALL = 1,   //!< message is sent to all users
     37  RT_USER,      //!< message is only sent to reciever
     38  RT_NOT_USER   //!< message is sent to all but reciever
     39};
     40
     41struct NetworkMessage
     42{
     43  MessageId messageId;
     44  byte *    data;
     45  int       length;
     46  int       number;
     47};
     48
     49struct MessageUserQueue
     50{
     51  std::list<NetworkMessage> messages;
     52  std::list<int>            toAck;
     53};
     54
     55typedef std::map<int,MessageUserQueue> MessageQueue;
     56
     57struct MessageHandler
     58{
     59  MessageCallback cb;
     60  MessageId       messageId;
     61};
     62
     63typedef std::map<MessageId,MessageHandler> MessageHandlerMap;
    1264
    1365//! A class for sending messages over network
    1466class MessageManager : public Synchronizeable {
    15 
     67 protected:
     68   MessageManager();
    1669 public:
    17    MessageManager();
     70   inline static MessageManager * getInstance(){ if (!singletonRef) singletonRef = new MessageManager();  return singletonRef; }
     71   
    1872   virtual ~MessageManager();
     73   
     74   bool registerMessageHandler( MessageId messageId, MessageCallback cb, void * someData );
     75   
     76   void sendMessage( MessageId messageId, byte * data, int dataLength, RecieverType recieverType, int reciever );
    1977
    2078   virtual int getStateDiff( int userId, byte* data, int maxLength, int stateId, int fromStateId, int priorityTH );
    2179   virtual int setStateDiff( int userId, byte* data, int length, int stateId, int fromStateId );
    2280   virtual void cleanUpUser( int userId );
     81   
     82   void initUser( int userId );
    2383
    2484
    2585 private:
     86   static MessageManager * singletonRef;
     87   MessageQueue            messageQueue;        //!< stores messages to send
     88   MessageHandlerMap       messageHandlerMap;   //!< contains handlers for messages
    2689
    2790};
  • branches/network/src/lib/network/network_stream.cc

    r7659 r7671  
    2929#include "network_game_manager.h"
    3030#include "shared_network_data.h"
     31#include "message_manager.h"
    3132
    3233#include "lib/util/loading/factory.h"
     
    199200
    200201  handleHandshakes();
     202  handleDownstream();
    201203  handleUpstream();
    202   handleDownstream();
    203 
    204 
    205 
    206   /* DOWNSTREAM */
    207 #if 0
    208 
    209 
    210   int dataLength;
    211   int reciever;
    212   Header header;
    213   int counter;
    214 
    215   for (SynchronizeableList::iterator it = synchronizeables.begin(); it!=synchronizeables.end(); it++)
    216   {
    217     counter = 0;
    218 
    219     if ( (*it)!=NULL && (*it)->beSynchronized() /*&& (*it)->getOwner() == myHostId*/ )
    220     {
    221       do {
    222         counter++;
    223 
    224         reciever = 0;
    225 #warning fix this
    226 dataLength = 0;
    227 //TODO fix
    228         //dataLength = (*it)->readBytes(downBuffer, DATA_STREAM_BUFFER_SIZE, &reciever);
    229 
    230         if ( dataLength<=0 ){
    231           reciever = 0;
    232           continue;
    233         }
    234 
    235         dataLength = networkProtocol->createHeader((byte*)downBuffer, dataLength, DATA_STREAM_BUFFER_SIZE, static_cast<const Synchronizeable&>(*(*it)));
    236 
    237         Header* header = (Header*)downBuffer;
    238         if ( header->synchronizeableID < this->maxConnections+2 )
    239         {
    240           //if ( !isServer() ) PRINTF(0)("RESET UNIQUEID FROM %d TO 0 maxCon=%d\n", header->synchronizeableID, this->maxConnections);
    241           header->synchronizeableID = 0;
    242         }
    243         else
    244         {
    245           //if ( !isServer() ) PRINTF(0)("UNIQUEID=%d\n", header->synchronizeableID);
    246         }
    247 
    248         if ( dataLength<=0 )
    249           continue;
    250 
    251         if ( reciever!=0 )
    252         {
    253           if ( reciever < 0)
    254           {
    255             for ( int i = 0; i<networkSockets.size(); i++)
    256             {
    257               if ( i!=abs(reciever) && networkSockets[i] != NULL )
    258               {
    259                 PRINTF(0)("write %d bytes to socket %d uniqueid %d reciever %d\n", dataLength, i, (*it)->getUniqueID(), reciever);
    260                 networkSockets[i]->writePacket(downBuffer, dataLength);
    261               }
    262             }
    263           }
    264           else
    265           {
    266             if ( networkSockets[reciever] != NULL )
    267             {
    268               PRINTF(5)("write %d bytes to socket %d\n", dataLength, reciever);
    269               networkSockets[reciever]->writePacket(downBuffer, dataLength);
    270             }
    271             else
    272             {
    273               PRINTF(1)("networkSockets[reciever] == NULL\n");
    274             }
    275           }
    276         }
    277         else
    278         {
    279           for ( int i = 0; i<networkSockets.size(); i++)
    280           {
    281             if ( networkSockets[i] != NULL )
    282             {
    283               PRINTF(5)("write %d bytes to socket %d\n", dataLength, i);
    284               networkSockets[i]->writePacket(downBuffer, dataLength);
    285             }
    286           }
    287         }
    288 
    289       } while( reciever!=0 );
    290     }
    291   }
    292 
    293   /* UPSTREAM */
    294 
    295   for ( int i = 0; i<networkSockets.size(); i++)
    296   {
    297     if ( networkSockets[i] )
    298     {
    299       do {
    300         dataLength = networkSockets[i]->readPacket(upBuffer, DATA_STREAM_BUFFER_SIZE);
    301 
    302         if ( dataLength<=0 )
    303           continue;
    304 
    305         header = networkProtocol->extractHeader(upBuffer, dataLength);
    306         dataLength -= sizeof(header);
    307 
    308         PRINTF(5)("read %d bytes from socket uniqueID = %d\n", dataLength, header.synchronizeableID);
    309 
    310         if ( dataLength != header.length )
    311         {
    312           PRINTF(1)("packetsize in header and real packetsize do not match! %d:%d\n", dataLength, header.length);
    313           continue;
    314         }
    315 
    316         if ( header.synchronizeableID == 0 )
    317         {
    318           header.synchronizeableID = i;
    319         }
    320 
    321         for (SynchronizeableList::iterator it = synchronizeables.begin(); it!=synchronizeables.end(); it++)
    322         {
    323 #warning fix this
    324 
    325           if ( *it && (*it)->getUniqueID()==header.synchronizeableID )
    326           {
    327             if ( (*it)->writeBytes(upBuffer+sizeof(header), dataLength, i) != header.length )
    328             {
    329               PRINTF(1)("%s did not read all the data id = %d!\n", (*it)->getClassName(), (*it)->getUniqueID());
    330               break;
    331             }
    332             continue;
    333           }
    334 
    335         }
    336 
    337       } while ( dataLength>0 );
    338     }
    339 
    340   }
    341 #endif
     204
    342205}
    343206
     
    476339          delete it->second.handshake;
    477340          it->second.handshake = NULL;
     341         
     342          handleNewClient( it->second.userId );
    478343        }
    479344        else
     
    699564}
    700565
    701 
    702 
    703 
    704 
    705 
     566/**
     567 * is executed when a handshake has finished
     568 * @todo create playable for new user
     569 */
     570void NetworkStream::handleNewClient( int userId )
     571{
     572  MessageManager::getInstance()->initUser( userId );
     573}
     574
     575
     576
     577
     578
     579
  • branches/network/src/lib/network/network_stream.h

    r7631 r7671  
    8181    void handleUpstream();
    8282    void handleDownstream();
     83    void handleNewClient( int userId );
    8384
    8485
Note: See TracChangeset for help on using the changeset viewer.