Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

source: orxonox.OLD/branches/proxy/src/lib/network/message_manager.cc @ 9554

Last change on this file since 9554 was 9542, checked in by patrick, 18 years ago

more generalized message handling now

File size: 15.8 KB
Line 
1/*
2   orxonox - the future of 3D-vertical-scrollers
3
4   Copyright (C) 2004 orx
5
6   This program is free software; you can redistribute it and/or modify
7   it under the terms of the GNU General Public License as published by
8   the Free Software Foundation; either version 2, or (at your option)
9   any later version.
10
11   ### File Specific:
12   main-programmer: Christoph Renner
13   co-programmer: ...
14*/
15
16//#define DEBUG_SPECIAL_MODULE DEBUG_MODULE_
17
18#include "message_manager.h"
19
20#include "network_stream.h"
21#include "shared_network_data.h"
22#include "converter.h"
23#include <cassert>
24
25
26
27MessageManager* MessageManager::singletonRef = NULL;
28
29
30/**
31 * standard constructor
32*/
33MessageManager::MessageManager ()
34{
35  this->setClassID( CL_MESSAGE_MANAGER, "MessageManager" );
36  newNumber = 1;
37  setSynchronized( true );
38}
39
40
41/**
42 * standard deconstructor
43*/
44MessageManager::~MessageManager ()
45{
46  for ( MessageQueue::iterator it = outgoingMessageQueue.begin(); it != outgoingMessageQueue.end(); it++ )
47  {
48    for ( std::list<NetworkMessage>::iterator it2 = it->second.messages.begin(); it2 != it->second.messages.end(); it2++ )
49    {
50      if ( it2->data )
51      {
52        delete [] it2->data;
53        it2->data = NULL;
54      }
55    }
56
57    it->second.messages.clear();
58    it->second.toAck.clear();
59  }
60
61  outgoingMessageQueue.clear();
62
63  this->messageHandlerMap.clear();
64
65  MessageManager::singletonRef = NULL;
66}
67
68/**
69 * get the diff to last acked state of userId
70 *
71 * this class does not use the normal SynchronizeableVars for zynchronisation. instead
72 * it defines its own protocol
73 *
74 * @param userId user to create diff for
75 * @param data buffer to copy diff in
76 * @param maxLength max bytes to copy into data
77 * @param stateId id of current state
78 * @param fromStateId the reference state for the delta state
79 * @param priorityTH tells getStateDiff to not send element with priority \< priorityTH
80 * @return n bytes copied into data
81 */
82int MessageManager::getStateDiff( int userId, byte * data, int maxLength, int stateId, int fromStateId, int priorityTH )
83{
84  int i = 0;
85  int n;
86
87  n = Converter::intToByteArray( outgoingMessageQueue[userId].toAck.size(), data + i, maxLength );
88  i += n;
89  assert( n == INTSIZE );
90
91  for ( std::list<int>::iterator it = outgoingMessageQueue[userId].toAck.begin(); it != outgoingMessageQueue[userId].toAck.end(); it++)
92  {
93    n = Converter::intToByteArray( *it, data + i, maxLength );
94    i += n;
95    assert( n == INTSIZE );
96  }
97
98  outgoingMessageQueue[userId].toAck.clear();
99
100  n = Converter::intToByteArray( outgoingMessageQueue[userId].messages.size(), data + i, maxLength );
101  i += n;
102  assert( n == INTSIZE );
103
104  // write the message down, a message has this structure:
105  //   | data_length | serial_number | message_type | source_id | dest_id | ...data... |
106  //      4byte        4byte            4byte         4byte      4byte     data_length
107  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end(); it++ )
108  {
109    // send data length
110    n = Converter::intToByteArray( it->length, data + i, maxLength );
111    i += n;
112    assert( n == INTSIZE );
113
114    // send serial number
115    n = Converter::intToByteArray( it->number, data + i, maxLength );
116    i += n;
117    assert( n == INTSIZE );
118
119    // send message type
120    n = Converter::intToByteArray( it->messageType, data + i, maxLength );
121    i += n;
122    assert( n == INTSIZE );
123
124    // send sender id
125    n = Converter::intToByteArray( it->senderId, data + i, maxLength );
126    i += n;
127    assert( n == INTSIZE );
128
129    // send destination id
130    n = Converter::intToByteArray( it->destinationId, data + i, maxLength );
131    i += n;
132    assert( n == INTSIZE );
133
134    // send receiver type
135    n = Converter::intToByteArray( it->recieverType, data + i, maxLength );
136    i += n;
137    assert( n == INTSIZE );
138
139    // and copy the data
140    assert( i + it->length <= maxLength );
141    memcpy( data + i, it->data, it->length );
142    i += it->length;
143  }
144
145  return i;
146}
147
148/**
149 * sets a new state out of a diff created on another host
150 * @param userId hostId of user who send me that diff
151 * @param data pointer to diff
152 * @param length length of diff
153 * @param stateId id of current state
154 * @param fromStateId id of the base state id
155 * @return number bytes read
156 * @todo check for permissions
157 */
158int MessageManager::setStateDiff( int userId, byte * data, int length, int stateId, int fromStateId )
159{
160  int i = 0;
161  int n;
162
163  int nAcks;
164
165
166  assert( i + INTSIZE <= length );
167  n = Converter::byteArrayToInt( data + i, &nAcks );
168  assert( n == INTSIZE );
169  i += n;
170
171  std::list<int> acks;
172
173  int number;
174
175  for ( int j = 0; j < nAcks; j++ )
176  {
177    assert( i + INTSIZE <= length );
178    n = Converter::byteArrayToInt( data + i, &number );
179    assert( n == INTSIZE );
180    i += n;
181
182    acks.push_back( number );
183  }
184
185  int nMessages;
186
187  assert( i + INTSIZE <= length );
188  n = Converter::byteArrayToInt( data + i, &nMessages );
189  assert( n == INTSIZE );
190  i += n;
191
192  int messageLength, messageType;
193  int senderId, destinationId, recieverType;
194
195  // now go through all newly received messages and assemble them
196  for ( int j = 0; j < nMessages; j++ )
197  {
198    // read the length
199    assert( i + INTSIZE <= length );
200    n = Converter::byteArrayToInt( data + i, &messageLength );
201    assert( n == INTSIZE );
202    i += n;
203
204    // read the serial number
205    assert( i + INTSIZE <= length );
206    n = Converter::byteArrayToInt( data + i, &number );
207    assert( n == INTSIZE );
208    i += n;
209
210    // read the message type
211    assert( i + INTSIZE <= length );
212    n = Converter::byteArrayToInt( data + i, &messageType );
213    assert( n == INTSIZE );
214    i += n;
215
216    // read the sender id
217    assert( i + INTSIZE <= length );
218    n = Converter::byteArrayToInt( data + i, &senderId );
219    assert( n == INTSIZE );
220    i += n;
221
222    //read the destination id
223    assert( i + INTSIZE <= length );
224    n = Converter::byteArrayToInt( data + i, &destinationId);
225    assert( n == INTSIZE );
226    i += n;
227
228    // read the receiver type
229    assert( i + INTSIZE <= length );
230    n = Converter::byteArrayToInt( data + i, &recieverType);
231    assert( n == INTSIZE );
232    i += n;
233
234    if ( number > 0 )
235      outgoingMessageQueue[userId].toAck.push_back( number );
236
237    assert( i + messageLength <= length );
238    // make sure there is a message handler for this message type
239    assert( messageHandlerMap.find( (MessageType)messageType ) != messageHandlerMap.end() );
240
241
242    if ( std::find( outgoingMessageQueue[userId].recievedMessages.begin(), outgoingMessageQueue[userId].recievedMessages.end(), number ) ==
243         outgoingMessageQueue[userId].recievedMessages.end() )
244    {
245
246      // find out if this message is addressed for this client too
247      if( recieverType == RT_ALL_BUT_ME  && SharedNetworkData::getInstance()->getHostID() != destinationId ||
248          recieverType == RT_ALL_ME ||
249          recieverType == RT_NOT_USER && SharedNetworkData::getInstance()->getHostID() != destinationId ||
250          recieverType == RT_USER  && SharedNetworkData::getInstance()->getHostID() == destinationId ||
251          recieverType == RT_SERVER && SharedNetworkData::getInstance()->isMasterServer() ||
252          recieverType == RT_SERVER && SharedNetworkData::getInstance()->isProxyServerActive())
253      {
254
255      // call the handler function and handle errors
256        if ( !(*(messageHandlerMap[(MessageType)messageType].cb))( (MessageType)messageType, data + i, messageLength,
257                 messageHandlerMap[(MessageType)messageType].someData, senderId, destinationId ) )
258        {
259        // if the message is not handled correctly, bush it back to the incoming packets
260          NetworkMessage msg;
261
262          msg.data = new byte[messageLength];
263          memcpy( msg.data, data + i, messageLength );
264          msg.length = messageLength;
265          msg.messageType = (MessageType)messageType;
266          msg.number = userId;
267          msg.senderId = senderId;
268          msg.recieverType = (RecieverType)recieverType;
269          msg.destinationId = destinationId;
270
271          incomingMessageQueue.push_back( msg );
272        }
273        PRINTF(0)("<<< MessageManager: got msg with type: %i, from sender %i, to rec: %i\n", messageType, senderId, destinationId);
274      }
275      // or else forward the message to the other servers
276      else
277      {
278        // forwarding the messages but only if its a proxy
279        if( /*!SharedNetworkData::getInstance()->isClient()*/ SharedNetworkData::getInstance()->isProxyServerActive())
280        {
281          PRINTF(0)("===========>> Forwarding Message msg with type: %i, from sender %i, to rec: %i\n", messageType, senderId, destinationId);
282          NetworkMessage msg;
283
284          msg.data = new byte[messageLength];
285          memcpy( msg.data, data + i, messageLength );
286          msg.length = messageLength;
287          msg.messageType = (MessageType)messageType;
288          msg.number = userId;
289          msg.senderId = senderId;
290          msg.destinationId = destinationId;
291          msg.recieverType = (RecieverType)recieverType;
292
293          this->sendMessage(msg.messageType, msg.data, msg.length, msg.recieverType, msg.destinationId, MP_HIGHBANDWIDTH);
294        }
295      }
296
297      // save the serial number for ack signaling
298      outgoingMessageQueue[userId].recievedMessages.push_back( number );
299    }
300
301    i += messageLength;
302  }
303
304  // now call the message handlers with the new message
305  //TODO maybe handle incomingMessage in tick function. else local messages will not be handled if no clients are connected
306  for ( std::list<NetworkMessage>::iterator it = incomingMessageQueue.begin(); it != incomingMessageQueue.end();  )
307  {
308    if ( (*(messageHandlerMap[it->messageType].cb))( it->messageType, it->data, it->length, messageHandlerMap[it->messageType].someData,
309                                                     /*it->number, */it->senderId, it->destinationId ) )
310    {
311      std::list<NetworkMessage>::iterator delIt = it;
312      if ( it->data )
313        delete it->data;
314      it++;
315      incomingMessageQueue.erase( delIt );
316      continue;
317    }
318    it++;
319  }
320
321  //walk throu message queue and remove acked messages
322  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end();  )
323  {
324    if ( std::find( acks.begin(), acks.end(), it->number) != acks.end() )
325    {
326      std::list<NetworkMessage>::iterator delIt = it;
327      it++;
328      outgoingMessageQueue[userId].messages.erase( delIt );
329      continue;
330    }
331    it++;
332  }
333
334  //TODO find bether way. maybe with timestamp
335  if ( outgoingMessageQueue[userId].recievedMessages.size() > 1000 )
336  {
337    for ( int j = 0; j < outgoingMessageQueue[userId].recievedMessages.size() - 1000; j++ )
338      outgoingMessageQueue[userId].recievedMessages.erase( outgoingMessageQueue[userId].recievedMessages.begin() );
339  }
340
341  return i;
342}
343
344
345
346/**
347 * clean up memory reserved for user
348 * @param userId userid
349 */
350void MessageManager::cleanUpUser( int userId )
351{
352  if ( outgoingMessageQueue.find( userId ) == outgoingMessageQueue.end() )
353    return;
354
355  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end(); it++ )
356  {
357    if ( it->data )
358      delete it->data;
359    it->data = NULL;
360  }
361
362  outgoingMessageQueue[userId].toAck.clear();
363
364  outgoingMessageQueue.erase( userId );
365}
366
367/**
368 * registers function to handle messages with id messageType. someData is passed to callbackfuntion
369 * @param messageType message id to handle
370 * @param cb function pointer to callback function
371 * @param someData this pointer is passed to callback function without modification
372 * @return true on success
373 */
374bool MessageManager::registerMessageHandler( MessageType messageType, MessageCallback cb, void * someData )
375{
376  MessageHandler messageHandler;
377
378  messageHandler.cb = cb;
379  messageHandler.messageType = messageType;
380  messageHandler.someData = someData;
381
382  messageHandlerMap[messageType] = messageHandler;
383
384  return true;
385}
386
387/**
388 * initializes buffers for user
389 * @param userId userId
390 */
391void MessageManager::initUser( int userId )
392{
393  // just do something so map creates a new entry
394  outgoingMessageQueue[userId].toAck.clear();
395  //assert( outgoingMessageQueue[userId].messages.size() == 0 );
396}
397
398/**
399 * send a message to one or more clients
400 * recieverType:
401 *               RT_ALL send to all users. reciever is ignored
402 *               RT_USER send only to reciever
403 *               RT_NOT_USER send to all but reciever
404 *
405 * @param messageType message id
406 * @param data pointer to data
407 * @param dataLength length of data
408 * @param recieverType type of the receiver
409 * @param reciever the userId of the receiver if needed (depends on the ReceiverType)
410 */
411void MessageManager::sendMessage( MessageType messageType, byte * data, int dataLength, RecieverType recieverType, int reciever, MessagePriority messagePriority )
412{
413  PRINTF(0)(" >>> MessageManager: sending msg with type: %i, recieverType: %i, reciever %i\n", messageType, recieverType, reciever);
414
415  // go through all outgoing message queues and add the message if its appropriate
416  for ( MessageQueue::iterator it = this->outgoingMessageQueue.begin(); it != this->outgoingMessageQueue.end(); it++ )
417  {
418
419    // relay server special handling
420//     if( SharedNetworkData::getInstance()->isMasterServer() && SharedNetworkData::getInstance()->isUserProxyServerActive(it->first))
421//     {
422//       NetworkMessage msg;
423//
424//       msg.data = new byte[dataLength];
425//       memcpy( msg.data, data, dataLength );
426//       msg.length = dataLength;
427//       msg.messageType = messageType;
428//       msg.number = this->newNumber++;
429//       msg.senderId = SharedNetworkData::getInstance()->getHostID();
430//       msg.destinationId = reciever;
431//       msg.recieverType = recieverType;
432//       msg.priority = messagePriority;
433//
434//       it->second.messages.push_back( msg );
435//     }
436//     // proxy server to master server
437//     else if( SharedNetworkData::getInstance()->isProxyServerActive() && SharedNetworkData::getInstance()->isUserMasterServer(it->first))
438//     {
439//       NetworkMessage msg;
440//
441//       msg.data = new byte[dataLength];
442//       memcpy( msg.data, data, dataLength );
443//       msg.length = dataLength;
444//       msg.messageType = messageType;
445//       msg.number = this->newNumber++;
446//       msg.senderId = SharedNetworkData::getInstance()->getHostID();
447//       msg.destinationId = reciever;
448//       msg.recieverType = recieverType;
449//       msg.priority = messagePriority;
450//
451//       it->second.messages.push_back( msg );
452//     }
453    // check for every node if the message is for it also
454    /*else*/ if (
455         recieverType == RT_ALL_ME ||
456         recieverType == RT_ALL_BUT_ME ||
457         recieverType == RT_USER && it->first == reciever ||
458         recieverType == RT_NOT_USER && it->first != reciever ||
459         recieverType == RT_SERVER && getNetworkStream()->isUserMasterServer( it->first ) ||
460         recieverType == RT_SERVER && getNetworkStream()->isUserProxyServerActive( it->first )
461       )
462    {
463      NetworkMessage msg;
464
465      msg.data = new byte[dataLength];
466      memcpy( msg.data, data, dataLength );
467      msg.length = dataLength;
468      msg.messageType = messageType;
469      msg.number = this->newNumber++;
470      msg.senderId = SharedNetworkData::getInstance()->getHostID();
471      msg.destinationId = reciever;
472      msg.recieverType = recieverType;
473      msg.priority = messagePriority;
474
475      it->second.messages.push_back( msg );
476    }
477
478
479  }
480
481
482  // if the message is also for myself, handle it here
483  if ( recieverType == RT_ALL_ME )
484  {
485    NetworkMessage msg;
486
487    msg.data = new byte[dataLength];
488    memcpy( msg.data, data, dataLength );
489    msg.length = dataLength;
490    msg.messageType = messageType;
491    msg.number = SharedNetworkData::getInstance()->getHostID();
492    msg.senderId = SharedNetworkData::getInstance()->getHostID();
493    msg.destinationId = reciever;
494    msg.recieverType = recieverType;
495    msg.priority = messagePriority;
496
497    incomingMessageQueue.push_back( msg );
498  }
499}
500
501
Note: See TracBrowser for help on using the repository browser.