Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

source: orxonox.OLD/trunk/src/lib/network/message_manager.cc @ 9434

Last change on this file since 9434 was 9406, checked in by bensch, 18 years ago

orxonox/trunk: merged the proxy back

merged with commandsvn merge -r9346:HEAD https://svn.orxonox.net/orxonox/branches/proxy .

no conflicts

File size: 9.7 KB
RevLine 
[7631]1/*
2   orxonox - the future of 3D-vertical-scrollers
3
4   Copyright (C) 2004 orx
5
6   This program is free software; you can redistribute it and/or modify
7   it under the terms of the GNU General Public License as published by
8   the Free Software Foundation; either version 2, or (at your option)
9   any later version.
10
11   ### File Specific:
12   main-programmer: Christoph Renner
13   co-programmer: ...
14*/
15
16//#define DEBUG_SPECIAL_MODULE DEBUG_MODULE_
17
18#include "message_manager.h"
19
[8228]20#include "network_stream.h"
[8708]21#include "shared_network_data.h"
[9406]22#include "converter.h"
23#include <cassert>
[8228]24
[7631]25
[9406]26
[7671]27MessageManager* MessageManager::singletonRef = NULL;
[7631]28
[7671]29
[7631]30/**
31 * standard constructor
32*/
33MessageManager::MessageManager ()
34{
35  this->setClassID( CL_MESSAGE_MANAGER, "MessageManager" );
[7681]36  newNumber = 1;
37  setSynchronized( true );
[7631]38}
39
40
41/**
42 * standard deconstructor
43*/
44MessageManager::~MessageManager ()
45{
[7671]46  for ( MessageQueue::iterator it = messageQueue.begin(); it != messageQueue.end(); it++ )
47  {
48    for ( std::list<NetworkMessage>::iterator it2 = it->second.messages.begin(); it2 != it->second.messages.end(); it2++ )
49    {
50      if ( it2->data )
51      {
[8623]52        delete [] it2->data;
[7671]53        it2->data = NULL;
54      }
55    }
[9406]56
[7671]57    it->second.messages.clear();
58    it->second.toAck.clear();
59  }
[9406]60
[7671]61  messageQueue.clear();
[9406]62
[9059]63  this->messageHandlerMap.clear();
[9406]64
[9059]65  MessageManager::singletonRef = NULL;
[7631]66}
67
68/**
69 * get the diff to last acked state of userId
70 *
[7671]71 * this class does not use the normal SynchronizeableVars for zynchronisation. instead
72 * it defines its own protocol
[7631]73 *
74 * @param userId user to create diff for
75 * @param data buffer to copy diff in
76 * @param maxLength max bytes to copy into data
77 * @param stateId id of current state
78 * @param fromStateId the reference state for the delta state
79 * @param priorityTH tells getStateDiff to not send element with priority \< priorityTH
80 * @return n bytes copied into data
81 */
82int MessageManager::getStateDiff( int userId, byte * data, int maxLength, int stateId, int fromStateId, int priorityTH )
83{
[7671]84  int i = 0;
85  int n;
[9406]86
[7671]87  n = Converter::intToByteArray( messageQueue[userId].toAck.size(), data + i, maxLength );
88  i += n;
89  assert( n == INTSIZE );
[9406]90
[7671]91  for ( std::list<int>::iterator it = messageQueue[userId].toAck.begin(); it != messageQueue[userId].toAck.end(); it++)
92  {
93    n = Converter::intToByteArray( *it, data + i, maxLength );
94    i += n;
95    assert( n == INTSIZE );
96  }
[9406]97
[7671]98  messageQueue[userId].toAck.clear();
[9406]99
[7671]100  n = Converter::intToByteArray( messageQueue[userId].messages.size(), data + i, maxLength );
101  i += n;
102  assert( n == INTSIZE );
[9406]103
[7671]104  for ( std::list<NetworkMessage>::iterator it = messageQueue[userId].messages.begin(); it != messageQueue[userId].messages.end(); it++ )
105  {
106    n = Converter::intToByteArray( it->length, data + i, maxLength );
107    i += n;
108    assert( n == INTSIZE );
[9406]109
[7671]110    n = Converter::intToByteArray( it->number, data + i, maxLength );
111    i += n;
112    assert( n == INTSIZE );
[9406]113
[7671]114    n = Converter::intToByteArray( it->messageId, data + i, maxLength );
115    i += n;
116    assert( n == INTSIZE );
[9406]117
[7671]118    assert( i + it->length <= maxLength );
119    memcpy( data + i, it->data, it->length );
120    i += it->length;
121  }
[9406]122
[7671]123  return i;
[7631]124}
125
126/**
127 * sets a new state out of a diff created on another host
128 * @param userId hostId of user who send me that diff
129 * @param data pointer to diff
130 * @param length length of diff
131 * @param stateId id of current state
132 * @param fromStateId id of the base state id
133 * @return number bytes read
134 * @todo check for permissions
135 */
136int MessageManager::setStateDiff( int userId, byte * data, int length, int stateId, int fromStateId )
137{
[7671]138  int i = 0;
139  int n;
[9406]140
[7671]141  int nAcks;
[9406]142
[7678]143  assert( i + INTSIZE <= length );
[7671]144  n = Converter::byteArrayToInt( data + i, &nAcks );
145  assert( n == INTSIZE );
146  i += n;
[9406]147
[7671]148  std::list<int> acks;
[9406]149
[7671]150  int number;
[9406]151
[7671]152  for ( int j = 0; j < nAcks; j++ )
153  {
[7678]154    assert( i + INTSIZE <= length );
[7671]155    n = Converter::byteArrayToInt( data + i, &number );
156    assert( n == INTSIZE );
157    i += n;
[9406]158
[7671]159    acks.push_back( number );
160  }
[9406]161
[7671]162  int nMessages;
[9406]163
[7678]164  assert( i + INTSIZE <= length );
[7671]165  n = Converter::byteArrayToInt( data + i, &nMessages );
166  assert( n == INTSIZE );
167  i += n;
[7678]168
169  int messageLength, messageId;
[9406]170
[7671]171  for ( int j = 0; j < nMessages; j++ )
172  {
[7678]173    assert( i + INTSIZE <= length );
174    n = Converter::byteArrayToInt( data + i, &messageLength );
175    assert( n == INTSIZE );
176    i += n;
[9406]177
[7678]178    assert( i + INTSIZE <= length );
179    n = Converter::byteArrayToInt( data + i, &number );
180    assert( n == INTSIZE );
181    i += n;
[9406]182
[7678]183    assert( i + INTSIZE <= length );
184    n = Converter::byteArrayToInt( data + i, &messageId );
185    assert( n == INTSIZE );
186    i += n;
[9406]187
[7678]188    if ( number > 0 )
189      messageQueue[userId].toAck.push_back( number );
[9406]190
[7678]191    assert( i + messageLength <= length );
192    assert( messageHandlerMap.find( (MessageId)messageId ) != messageHandlerMap.end() );
[7681]193    if ( std::find( messageQueue[userId].recievedMessages.begin(), messageQueue[userId].recievedMessages.end(), number )== messageQueue[userId].recievedMessages.end() )
194    {
[7693]195      if ( !(*(messageHandlerMap[(MessageId)messageId].cb))( (MessageId)messageId, data + i, messageLength, messageHandlerMap[(MessageId)messageId].someData, userId ) )
196      {
197        NetworkMessage msg;
[9406]198
[7693]199        msg.data = new byte[messageLength];
200        memcpy( msg.data, data + i, messageLength );
201        msg.length = messageLength;
202        msg.messageId = (MessageId)messageId;
203        msg.number = userId;
[9406]204
[8708]205        incomingMessageBuffer.push_back( msg );
[7693]206      }
[7681]207      messageQueue[userId].recievedMessages.push_back( number );
208    }
[7678]209    i += messageLength;
[7671]210  }
[9406]211
212
[8708]213  //TODO maybe handle incomingMessage in tick function. else local messages will not be handled if no clients are connected
214  for ( std::list<NetworkMessage>::iterator it = incomingMessageBuffer.begin(); it != incomingMessageBuffer.end();  )
[7693]215  {
[7697]216    if ( (*(messageHandlerMap[it->messageId].cb))( it->messageId, it->data, it->length, messageHandlerMap[it->messageId].someData, it->number ) )
[7693]217    {
218      std::list<NetworkMessage>::iterator delIt = it;
[7697]219      if ( it->data )
220        delete it->data;
[7693]221      it++;
[8708]222      incomingMessageBuffer.erase( delIt );
[7693]223      continue;
224    }
225    it++;
226  }
[9406]227
[7681]228  //walk throu message queue and remove acked messages
229  for ( std::list<NetworkMessage>::iterator it = messageQueue[userId].messages.begin(); it != messageQueue[userId].messages.end();  )
230  {
231    if ( std::find( acks.begin(), acks.end(), it->number) != acks.end() )
232    {
233      std::list<NetworkMessage>::iterator delIt = it;
234      it++;
235      messageQueue[userId].messages.erase( delIt );
236      continue;
237    }
238    it++;
239  }
[9406]240
[7681]241  //TODO find bether way. maybe with timestamp
242  if ( messageQueue[userId].recievedMessages.size() > 1000 )
243  {
244    for ( int j = 0; j < messageQueue[userId].recievedMessages.size() - 1000; j++ )
245      messageQueue[userId].recievedMessages.erase( messageQueue[userId].recievedMessages.begin() );
246  }
[7678]247
248  return i;
[7631]249}
250
251/**
252 * clean up memory reserved for user
253 * @param userId userid
254 */
255void MessageManager::cleanUpUser( int userId )
256{
[7678]257  if ( messageQueue.find( userId ) == messageQueue.end() )
258    return;
[9406]259
[7678]260  for ( std::list<NetworkMessage>::iterator it = messageQueue[userId].messages.begin(); it != messageQueue[userId].messages.end(); it++ )
261  {
262    if ( it->data )
263      delete it->data;
264    it->data = NULL;
265  }
[9406]266
[7678]267  messageQueue[userId].toAck.clear();
[9406]268
[7678]269  messageQueue.erase( userId );
[7631]270}
[7671]271
272/**
273 * registers function to handle messages with id messageId. someData is passed to callbackfuntion
[9406]274 * @param messageId message id to handle
[7671]275 * @param cb function pointer to callback function
276 * @param someData this pointer is passed to callback function without modification
277 * @return true on success
278 */
279bool MessageManager::registerMessageHandler( MessageId messageId, MessageCallback cb, void * someData )
280{
[7678]281  MessageHandler messageHandler;
[9406]282
[7678]283  messageHandler.cb = cb;
284  messageHandler.messageId = messageId;
285  messageHandler.someData = someData;
[9406]286
[7678]287  messageHandlerMap[messageId] = messageHandler;
[9406]288
[7678]289  return true;
[7671]290}
291
292/**
293 * initializes buffers for user
294 * @param userId userId
295 */
296void MessageManager::initUser( int userId )
297{
[7678]298  // just do something so map creates a new entry
299  messageQueue[userId].toAck.clear();
[7872]300  //assert( messageQueue[userId].messages.size() == 0 );
[7671]301}
[7681]302
303/**
304 * send a message to one or more clients
305 * recieverType:
306 *               RT_ALL send to all users. reciever is ignored
307 *               RT_USER send only to reciever
308 *               RT_NOT_USER send to all but reciever
309 *
310 * @param messageId message id
311 * @param data pointer to data
312 * @param dataLength length of data
[9406]313 * @param recieverType
314 * @param reciever
[7681]315 */
316void MessageManager::sendMessage( MessageId messageId, byte * data, int dataLength, RecieverType recieverType, int reciever, MessagePriority messagePriority )
317{
318  for ( MessageQueue::iterator it = messageQueue.begin(); it != messageQueue.end(); it++ )
319  {
[9406]320    if (
[8068]321         recieverType == RT_ALL_ME ||
322         recieverType == RT_ALL_NOT_ME ||
[7681]323         recieverType == RT_USER && it->first == reciever ||
[8228]324         recieverType == RT_NOT_USER && it->first != reciever ||
[9406]325         recieverType == RT_SERVER && getNetworkStream()->isUserMasterServer( it->first )
[7681]326       )
327    {
328      NetworkMessage msg;
[8623]329
[7681]330      msg.data = new byte[dataLength];
331      memcpy( msg.data, data, dataLength );
332      msg.length = dataLength;
333      msg.messageId = messageId;
334      msg.number = newNumber++;
335      msg.priority = messagePriority;
[8623]336
[7681]337      it->second.messages.push_back( msg );
338    }
339  }
[9406]340
[8623]341  if ( recieverType == RT_ALL_ME )
342  {
343    NetworkMessage msg;
344
345    msg.data = new byte[dataLength];
346    memcpy( msg.data, data, dataLength );
347    msg.length = dataLength;
348    msg.messageId = messageId;
[8708]349    msg.number = SharedNetworkData::getInstance()->getHostID();
[8623]350    msg.priority = messagePriority;
351
[8708]352    incomingMessageBuffer.push_back( msg );
[8623]353  }
[7681]354}
355
356
Note: See TracBrowser for help on using the repository browser.