Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

source: orxonox.OLD/trunk/src/lib/network/message_manager.cc @ 10566

Last change on this file since 10566 was 10114, checked in by patrick, 18 years ago

merged network back to trunk

File size: 16.6 KB
RevLine 
[7631]1/*
2   orxonox - the future of 3D-vertical-scrollers
3
4   Copyright (C) 2004 orx
5
6   This program is free software; you can redistribute it and/or modify
7   it under the terms of the GNU General Public License as published by
8   the Free Software Foundation; either version 2, or (at your option)
9   any later version.
10
11   ### File Specific:
12   main-programmer: Christoph Renner
[9656]13   co-programmer: Patrick Boenzli (patrick@orxonox.ethz.ch)
14
15     June 2006: finishing work on the network stream for pps presentation (rennerc@ee.ethz.ch)
16     July 2006: some code rearangement and integration of the proxy server mechanism (boenzlip@ee.ethz.ch)
17     July 2006: message forwarding algorithms
[7631]18*/
19
20//#define DEBUG_SPECIAL_MODULE DEBUG_MODULE_
21
22#include "message_manager.h"
23
[8228]24#include "network_stream.h"
[8708]25#include "shared_network_data.h"
[9406]26#include "converter.h"
27#include <cassert>
[8228]28
[7631]29
[9869]30ObjectListDefinition(MessageManager);
[7671]31MessageManager* MessageManager::singletonRef = NULL;
[7631]32
[7671]33
[7631]34/**
35 * standard constructor
36*/
37MessageManager::MessageManager ()
38{
[9869]39  this->registerObject(this, MessageManager::_objectList);
[7681]40  newNumber = 1;
41  setSynchronized( true );
[7631]42}
43
44
45/**
46 * standard deconstructor
47*/
48MessageManager::~MessageManager ()
49{
[9656]50  for ( MessageQueue::iterator it = outgoingMessageQueue.begin(); it != outgoingMessageQueue.end(); it++ )
[7671]51  {
52    for ( std::list<NetworkMessage>::iterator it2 = it->second.messages.begin(); it2 != it->second.messages.end(); it2++ )
53    {
54      if ( it2->data )
55      {
[8623]56        delete [] it2->data;
[7671]57        it2->data = NULL;
58      }
59    }
[9406]60
[7671]61    it->second.messages.clear();
62    it->second.toAck.clear();
63  }
[9406]64
[9656]65  outgoingMessageQueue.clear();
[9406]66
[9059]67  this->messageHandlerMap.clear();
[9406]68
[9059]69  MessageManager::singletonRef = NULL;
[7631]70}
71
72/**
73 * get the diff to last acked state of userId
74 *
[7671]75 * this class does not use the normal SynchronizeableVars for zynchronisation. instead
76 * it defines its own protocol
[7631]77 *
78 * @param userId user to create diff for
79 * @param data buffer to copy diff in
80 * @param maxLength max bytes to copy into data
81 * @param stateId id of current state
82 * @param fromStateId the reference state for the delta state
83 * @param priorityTH tells getStateDiff to not send element with priority \< priorityTH
84 * @return n bytes copied into data
85 */
86int MessageManager::getStateDiff( int userId, byte * data, int maxLength, int stateId, int fromStateId, int priorityTH )
87{
[7671]88  int i = 0;
89  int n;
[9406]90
[9656]91  n = Converter::intToByteArray( outgoingMessageQueue[userId].toAck.size(), data + i, maxLength );
[7671]92  i += n;
93  assert( n == INTSIZE );
[9406]94
[9656]95  for ( std::list<int>::iterator it = outgoingMessageQueue[userId].toAck.begin(); it != outgoingMessageQueue[userId].toAck.end(); it++)
[7671]96  {
97    n = Converter::intToByteArray( *it, data + i, maxLength );
98    i += n;
99    assert( n == INTSIZE );
100  }
[9406]101
[9656]102  outgoingMessageQueue[userId].toAck.clear();
[9406]103
[9656]104  n = Converter::intToByteArray( outgoingMessageQueue[userId].messages.size(), data + i, maxLength );
[7671]105  i += n;
106  assert( n == INTSIZE );
[9406]107
[9656]108  // write the message down, a message has this structure:
109  //   | data_length | serial_number | message_type | source_id | dest_id | ...data... |
110  //      4byte        4byte            4byte         4byte      4byte     data_length
111  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end(); it++ )
[7671]112  {
[9656]113    // send data length
[7671]114    n = Converter::intToByteArray( it->length, data + i, maxLength );
115    i += n;
116    assert( n == INTSIZE );
[9406]117
[9656]118    // send serial number
[7671]119    n = Converter::intToByteArray( it->number, data + i, maxLength );
120    i += n;
121    assert( n == INTSIZE );
[9406]122
[9656]123    // send message type
124    n = Converter::intToByteArray( it->messageType, data + i, maxLength );
[7671]125    i += n;
126    assert( n == INTSIZE );
[9406]127
[9656]128    // send sender id
129    n = Converter::intToByteArray( it->senderId, data + i, maxLength );
130    i += n;
131    assert( n == INTSIZE );
132
133    // send destination id
134    n = Converter::intToByteArray( it->destinationId, data + i, maxLength );
135    i += n;
136    assert( n == INTSIZE );
137
138    // send receiver type
139    n = Converter::intToByteArray( it->recieverType, data + i, maxLength );
140    i += n;
141    assert( n == INTSIZE );
142
143    // and copy the data
[7671]144    assert( i + it->length <= maxLength );
145    memcpy( data + i, it->data, it->length );
146    i += it->length;
147  }
[9406]148
[7671]149  return i;
[7631]150}
151
152/**
153 * sets a new state out of a diff created on another host
154 * @param userId hostId of user who send me that diff
155 * @param data pointer to diff
156 * @param length length of diff
157 * @param stateId id of current state
158 * @param fromStateId id of the base state id
159 * @return number bytes read
160 * @todo check for permissions
161 */
162int MessageManager::setStateDiff( int userId, byte * data, int length, int stateId, int fromStateId )
163{
[7671]164  int i = 0;
165  int n;
[9406]166
[7671]167  int nAcks;
[9406]168
[9656]169
[7678]170  assert( i + INTSIZE <= length );
[7671]171  n = Converter::byteArrayToInt( data + i, &nAcks );
172  assert( n == INTSIZE );
173  i += n;
[9406]174
[7671]175  std::list<int> acks;
[9406]176
[7671]177  int number;
[9406]178
[7671]179  for ( int j = 0; j < nAcks; j++ )
180  {
[7678]181    assert( i + INTSIZE <= length );
[7671]182    n = Converter::byteArrayToInt( data + i, &number );
183    assert( n == INTSIZE );
184    i += n;
[9406]185
[7671]186    acks.push_back( number );
187  }
[9406]188
[7671]189  int nMessages;
[9406]190
[7678]191  assert( i + INTSIZE <= length );
[7671]192  n = Converter::byteArrayToInt( data + i, &nMessages );
193  assert( n == INTSIZE );
194  i += n;
[7678]195
[9656]196  int messageLength, messageType;
197  int senderId, destinationId, recieverType;
[9406]198
[9656]199  // now go through all newly received messages and assemble them
[7671]200  for ( int j = 0; j < nMessages; j++ )
201  {
[9656]202    // read the length
[7678]203    assert( i + INTSIZE <= length );
204    n = Converter::byteArrayToInt( data + i, &messageLength );
205    assert( n == INTSIZE );
206    i += n;
[9406]207
[9656]208    // read the serial number
[7678]209    assert( i + INTSIZE <= length );
210    n = Converter::byteArrayToInt( data + i, &number );
211    assert( n == INTSIZE );
212    i += n;
[9406]213
[9656]214    // read the message type
[7678]215    assert( i + INTSIZE <= length );
[9656]216    n = Converter::byteArrayToInt( data + i, &messageType );
[7678]217    assert( n == INTSIZE );
218    i += n;
[9406]219
[9656]220    // read the sender id
221    assert( i + INTSIZE <= length );
222    n = Converter::byteArrayToInt( data + i, &senderId );
223    assert( n == INTSIZE );
224    i += n;
225
226    //read the destination id
227    assert( i + INTSIZE <= length );
228    n = Converter::byteArrayToInt( data + i, &destinationId);
229    assert( n == INTSIZE );
230    i += n;
231
232    // read the receiver type
233    assert( i + INTSIZE <= length );
234    n = Converter::byteArrayToInt( data + i, &recieverType);
235    assert( n == INTSIZE );
236    i += n;
237
[7678]238    if ( number > 0 )
[9656]239      outgoingMessageQueue[userId].toAck.push_back( number );
[9406]240
[9656]241//     PRINTF(0)("got message with type: %i\n", messageType);
[7678]242    assert( i + messageLength <= length );
[9656]243    // make sure there is a message handler for this message type
[10114]244    //if ( !( messageHandlerMap.find( (MessageType)messageType ) != messageHandlerMap.end()) )
245    //{
246//          PRINTF(0)("messagetype: %d\n", messageType);
247//          this->objectList().debugAll( 0 );
248//    }
249   
[9656]250    assert( messageHandlerMap.find( (MessageType)messageType ) != messageHandlerMap.end());
251
252
253    if ( std::find( outgoingMessageQueue[userId].recievedMessages.begin(), outgoingMessageQueue[userId].recievedMessages.end(), number ) ==
254         outgoingMessageQueue[userId].recievedMessages.end() )
[7681]255    {
[9656]256
257      // find out if this message is addressed for this client too
258      if( recieverType == RT_ALL_BUT_ME  && SharedNetworkData::getInstance()->getHostID() != senderId ||
259          recieverType == RT_ALL_ME ||
260          recieverType == RT_NOT_USER && SharedNetworkData::getInstance()->getHostID() != destinationId ||
261          recieverType == RT_USER  && SharedNetworkData::getInstance()->getHostID() == destinationId ||
262          recieverType == RT_SERVER && SharedNetworkData::getInstance()->isMasterServer() ||
263          recieverType == RT_SERVER && SharedNetworkData::getInstance()->isProxyServerActive())
[7693]264      {
[9406]265
[9656]266        PRINTF(0)("<<< MessageManager: got msg with type: %i, from sender %i, to rec: %i\n", messageType, senderId, destinationId);
267      // call the handler function and handle errors
268        if ( !(*(messageHandlerMap[(MessageType)messageType].cb))( (MessageType)messageType, data + i, messageLength,
269                 messageHandlerMap[(MessageType)messageType].someData, senderId, destinationId ) )
270        {
271        // if the message is not handled correctly, bush it back to the incoming packets therefore trying it later
272          NetworkMessage msg;
[9406]273
[9656]274          msg.data = new byte[messageLength];
275          memcpy( msg.data, data + i, messageLength );
276          msg.length = messageLength;
277          msg.messageType = (MessageType)messageType;
278          msg.number = userId;
279          msg.senderId = senderId;
280          msg.recieverType = (RecieverType)recieverType;
281          msg.destinationId = destinationId;
282
283          incomingMessageQueue.push_back( msg );
284        }
[7693]285      }
[9656]286
287
288      // check if the message needs to be forwarded
289      if( recieverType == RT_ALL_BUT_ME ||
290          recieverType == RT_ALL_ME ||
291          recieverType == RT_NOT_USER ||
292          recieverType == RT_USER  && SharedNetworkData::getInstance()->getHostID() != destinationId ||
293          recieverType == RT_SERVER && SharedNetworkData::getInstance()->isProxyServerActive() )
294      {
295        // forwarding the messages but only if its a proxy
296        if( SharedNetworkData::getInstance()->isProxyServerActive())
297        {
298          PRINTF(0)("===========>> Forwarding Message msg with type: %i, from sender %i, to rec: %i\n", messageType, senderId, destinationId);
299          NetworkMessage msg;
300
301          msg.data = new byte[messageLength];
302          memcpy( msg.data, data + i, messageLength );
303          msg.length = messageLength;
304          msg.messageType = (MessageType)messageType;
305          msg.number = userId;
306          msg.senderId = senderId;
307          msg.destinationId = destinationId;
308          msg.recieverType = (RecieverType)recieverType;
309
310          this->sendMessage(msg.messageType, msg.data, msg.length, msg.recieverType, msg.senderId = senderId, msg.destinationId, MP_HIGHBANDWIDTH);
311        }
312      }
313
314      // save the serial number for ack signaling
315      outgoingMessageQueue[userId].recievedMessages.push_back( number );
[7681]316    }
[9656]317
[7678]318    i += messageLength;
[7671]319  }
[9406]320
321
[9656]322  //walk throu message queue and remove acked messages
323  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end();  )
[7693]324  {
[9656]325    if ( std::find( acks.begin(), acks.end(), it->number) != acks.end() )
[7693]326    {
327      std::list<NetworkMessage>::iterator delIt = it;
328      it++;
[9656]329      outgoingMessageQueue[userId].messages.erase( delIt );
[7693]330      continue;
331    }
332    it++;
333  }
[9406]334
[9656]335  //TODO find bether way. maybe with timestamp
336  if ( outgoingMessageQueue[userId].recievedMessages.size() > 1000 )
[7681]337  {
[9656]338    for ( int j = 0; j < (int)outgoingMessageQueue[userId].recievedMessages.size() - 1000; j++ )
339      outgoingMessageQueue[userId].recievedMessages.erase( outgoingMessageQueue[userId].recievedMessages.begin() );
340  }
341
342  return i;
343}
344
345
346
347
348/**
349 * processes the message manager data, specialy check for localy generated messages
350 */
351void MessageManager::processData()
352{
353  // now call the message handlers with the new message
354  for ( std::list<NetworkMessage>::iterator it = incomingMessageQueue.begin(); it != incomingMessageQueue.end();  )
355  {
356    PRINTF(0)("<<< MessageManager: got local msg with type: %i, from sender %i, to rec: %i\n", (*it).messageType, (*it).senderId, (*it).destinationId);
357
358    if ( (*(messageHandlerMap[it->messageType].cb))( it->messageType, it->data, it->length, messageHandlerMap[it->messageType].someData,
359                                                     /*it->number, */it->senderId, it->destinationId ) )
[7681]360    {
361      std::list<NetworkMessage>::iterator delIt = it;
[9656]362      if ( it->data )
363        delete it->data;
[7681]364      it++;
[9656]365      incomingMessageQueue.erase( delIt );
[7681]366      continue;
367    }
368    it++;
369  }
[9406]370
[7631]371}
372
[9656]373
374
375
[7631]376/**
377 * clean up memory reserved for user
378 * @param userId userid
379 */
380void MessageManager::cleanUpUser( int userId )
381{
[9656]382  if ( outgoingMessageQueue.find( userId ) == outgoingMessageQueue.end() )
[7678]383    return;
[9406]384
[9656]385  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end(); it++ )
[7678]386  {
387    if ( it->data )
388      delete it->data;
389    it->data = NULL;
390  }
[9406]391
[9656]392  outgoingMessageQueue[userId].toAck.clear();
[9406]393
[9656]394  outgoingMessageQueue.erase( userId );
[7631]395}
[7671]396
397/**
[9656]398 * registers function to handle messages with id messageType. someData is passed to callbackfuntion
399 * @param messageType message id to handle
[7671]400 * @param cb function pointer to callback function
401 * @param someData this pointer is passed to callback function without modification
402 * @return true on success
403 */
[9656]404bool MessageManager::registerMessageHandler( MessageType messageType, MessageCallback cb, void * someData )
[7671]405{
[7678]406  MessageHandler messageHandler;
[9406]407
[7678]408  messageHandler.cb = cb;
[9656]409  messageHandler.messageType = messageType;
[7678]410  messageHandler.someData = someData;
[9406]411
[9656]412  messageHandlerMap[messageType] = messageHandler;
[9406]413
[7678]414  return true;
[7671]415}
416
417/**
418 * initializes buffers for user
419 * @param userId userId
420 */
421void MessageManager::initUser( int userId )
422{
[7678]423  // just do something so map creates a new entry
[9656]424  outgoingMessageQueue[userId].toAck.clear();
425  //assert( outgoingMessageQueue[userId].messages.size() == 0 );
[7671]426}
[7681]427
[9656]428
429
[7681]430/**
431 * send a message to one or more clients
432 * recieverType:
433 *               RT_ALL send to all users. reciever is ignored
434 *               RT_USER send only to reciever
435 *               RT_NOT_USER send to all but reciever
436 *
[9656]437 * @param messageType message id
[7681]438 * @param data pointer to data
439 * @param dataLength length of data
[9656]440 * @param recieverType type of the receiver
441 * @param reciever the userId of the receiver if needed (depends on the ReceiverType)
[7681]442 */
[9656]443void MessageManager::sendMessage( MessageType messageType, byte * data, int dataLength, RecieverType recieverType, int reciever, MessagePriority messagePriority )
[7681]444{
[9656]445  this->sendMessage(messageType, data, dataLength, recieverType, SharedNetworkData::getInstance()->getHostID(), reciever, messagePriority);
446}
447
448
449/**
450 * send a message to one or more clients as a special client
451 * recieverType:
452 *               RT_ALL send to all users. reciever is ignored
453 *               RT_USER send only to reciever
454 *               RT_NOT_USER send to all but reciever
455 *
456 * @param messageType message id
457 * @param data pointer to data
458 * @param dataLength length of data
459 * @param recieverType type of the receiver
460 * @param sender the userId of the sender if there is need for shadowing it (eg. for msg forwarding)
461 * @param reciever the userId of the receiver if needed (depends on the ReceiverType)
462 */
463    void MessageManager::sendMessage( MessageType messageType, byte * data, int dataLength, RecieverType recieverType, int sender, int reciever, MessagePriority messagePriority )
464{
465  PRINTF(0)(" >>> MessageManager: sending msg with type: %i, recieverType: %i, reciever %i\n", messageType, recieverType, reciever);
466
467  // go through all outgoing message queues and add the message if its appropriate
468  for ( MessageQueue::iterator it = this->outgoingMessageQueue.begin(); it != this->outgoingMessageQueue.end(); it++ )
[7681]469  {
[9656]470
[9406]471    if (
[9656]472         recieverType == RT_ALL_ME      ||
473         recieverType == RT_ALL_BUT_ME  ||
474         recieverType == RT_USER        && it->first == reciever ||
475         recieverType == RT_USER        && reciever == NET_ID_MASTER_SERVER && !getNetworkStream()->isUserMasterServer( it->first ) ||  //(*)
476         recieverType == RT_NOT_USER    && it->first != reciever ||
477         recieverType == RT_SERVER      && getNetworkStream()->isUserMasterServer( it->first ) ||
478         recieverType == RT_SERVER      && getNetworkStream()->isUserProxyServerActive( it->first )
479       )// (*) special case: forward
[7681]480    {
481      NetworkMessage msg;
[8623]482
[7681]483      msg.data = new byte[dataLength];
484      memcpy( msg.data, data, dataLength );
485      msg.length = dataLength;
[9656]486      msg.messageType = messageType;
487      msg.number = this->newNumber++;
488      msg.senderId = sender;
489      msg.destinationId = reciever;
490      msg.recieverType = recieverType;
[7681]491      msg.priority = messagePriority;
[8623]492
[7681]493      it->second.messages.push_back( msg );
494    }
[9656]495
496
[7681]497  }
[9406]498
[9656]499
500  // if the message is also for myself, handle it here
501  if ( recieverType == RT_ALL_ME ||
502       recieverType == RT_USER   && reciever == SharedNetworkData::getInstance()->getHostID()
503     )
[8623]504  {
505    NetworkMessage msg;
506
507    msg.data = new byte[dataLength];
508    memcpy( msg.data, data, dataLength );
509    msg.length = dataLength;
[9656]510    msg.messageType = messageType;
[8708]511    msg.number = SharedNetworkData::getInstance()->getHostID();
[9656]512    msg.senderId = sender;
513    msg.destinationId = reciever;
514    msg.recieverType = recieverType;
[8623]515    msg.priority = messagePriority;
516
[9656]517    this->incomingMessageQueue.push_back( msg );
[8623]518  }
[7681]519}
520
521
Note: See TracBrowser for help on using the repository browser.