Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

source: orxonox.OLD/trunk/src/lib/network/message_manager.cc @ 9338

Last change on this file since 9338 was 9059, checked in by patrick, 18 years ago

merged the network branche with the trunk

File size: 9.8 KB
RevLine 
[7631]1/*
2   orxonox - the future of 3D-vertical-scrollers
3
4   Copyright (C) 2004 orx
5
6   This program is free software; you can redistribute it and/or modify
7   it under the terms of the GNU General Public License as published by
8   the Free Software Foundation; either version 2, or (at your option)
9   any later version.
10
11   ### File Specific:
12   main-programmer: Christoph Renner
13   co-programmer: ...
14*/
15
16//#define DEBUG_SPECIAL_MODULE DEBUG_MODULE_
17
18#include "message_manager.h"
19
[8228]20#include "network_stream.h"
[8708]21#include "shared_network_data.h"
[8228]22
[7631]23using namespace std;
24
[7671]25MessageManager* MessageManager::singletonRef = NULL;
[7631]26
[7671]27
[7631]28/**
29 * standard constructor
30*/
31MessageManager::MessageManager ()
32{
33  this->setClassID( CL_MESSAGE_MANAGER, "MessageManager" );
[7681]34  newNumber = 1;
35  setSynchronized( true );
[7631]36}
37
38
39/**
40 * standard deconstructor
41*/
42MessageManager::~MessageManager ()
43{
[7671]44  for ( MessageQueue::iterator it = messageQueue.begin(); it != messageQueue.end(); it++ )
45  {
46    for ( std::list<NetworkMessage>::iterator it2 = it->second.messages.begin(); it2 != it->second.messages.end(); it2++ )
47    {
48      if ( it2->data )
49      {
[8623]50        delete [] it2->data;
[7671]51        it2->data = NULL;
52      }
53    }
54   
55    it->second.messages.clear();
56    it->second.toAck.clear();
57  }
58 
59  messageQueue.clear();
[9059]60 
61  this->messageHandlerMap.clear();
62 
63  MessageManager::singletonRef = NULL;
[7631]64}
65
66/**
67 * get the diff to last acked state of userId
68 *
[7671]69 * this class does not use the normal SynchronizeableVars for zynchronisation. instead
70 * it defines its own protocol
[7631]71 *
72 * @param userId user to create diff for
73 * @param data buffer to copy diff in
74 * @param maxLength max bytes to copy into data
75 * @param stateId id of current state
76 * @param fromStateId the reference state for the delta state
77 * @param priorityTH tells getStateDiff to not send element with priority \< priorityTH
78 * @return n bytes copied into data
79 */
80int MessageManager::getStateDiff( int userId, byte * data, int maxLength, int stateId, int fromStateId, int priorityTH )
81{
[7671]82  int i = 0;
83  int n;
84 
85  n = Converter::intToByteArray( messageQueue[userId].toAck.size(), data + i, maxLength );
86  i += n;
87  assert( n == INTSIZE );
88 
89  for ( std::list<int>::iterator it = messageQueue[userId].toAck.begin(); it != messageQueue[userId].toAck.end(); it++)
90  {
91    n = Converter::intToByteArray( *it, data + i, maxLength );
92    i += n;
93    assert( n == INTSIZE );
94  }
95 
96  messageQueue[userId].toAck.clear();
97 
98  n = Converter::intToByteArray( messageQueue[userId].messages.size(), data + i, maxLength );
99  i += n;
100  assert( n == INTSIZE );
101 
102  for ( std::list<NetworkMessage>::iterator it = messageQueue[userId].messages.begin(); it != messageQueue[userId].messages.end(); it++ )
103  {
104    n = Converter::intToByteArray( it->length, data + i, maxLength );
105    i += n;
106    assert( n == INTSIZE );
107   
108    n = Converter::intToByteArray( it->number, data + i, maxLength );
109    i += n;
110    assert( n == INTSIZE );
111   
112    n = Converter::intToByteArray( it->messageId, data + i, maxLength );
113    i += n;
114    assert( n == INTSIZE );
115   
116    assert( i + it->length <= maxLength );
117    memcpy( data + i, it->data, it->length );
118    i += it->length;
119  }
120 
121  return i;
[7631]122}
123
124/**
125 * sets a new state out of a diff created on another host
126 * @param userId hostId of user who send me that diff
127 * @param data pointer to diff
128 * @param length length of diff
129 * @param stateId id of current state
130 * @param fromStateId id of the base state id
131 * @return number bytes read
132 * @todo check for permissions
133 */
134int MessageManager::setStateDiff( int userId, byte * data, int length, int stateId, int fromStateId )
135{
[7671]136  int i = 0;
137  int n;
138 
139  int nAcks;
140 
[7678]141  assert( i + INTSIZE <= length );
[7671]142  n = Converter::byteArrayToInt( data + i, &nAcks );
143  assert( n == INTSIZE );
144  i += n;
145 
146  std::list<int> acks;
147 
148  int number;
149 
150  for ( int j = 0; j < nAcks; j++ )
151  {
[7678]152    assert( i + INTSIZE <= length );
[7671]153    n = Converter::byteArrayToInt( data + i, &number );
154    assert( n == INTSIZE );
155    i += n;
156   
157    acks.push_back( number );
158  }
159 
160  int nMessages;
161 
[7678]162  assert( i + INTSIZE <= length );
[7671]163  n = Converter::byteArrayToInt( data + i, &nMessages );
164  assert( n == INTSIZE );
165  i += n;
[7678]166
167  int messageLength, messageId;
[7671]168 
169  for ( int j = 0; j < nMessages; j++ )
170  {
[7678]171    assert( i + INTSIZE <= length );
172    n = Converter::byteArrayToInt( data + i, &messageLength );
173    assert( n == INTSIZE );
174    i += n;
175   
176    assert( i + INTSIZE <= length );
177    n = Converter::byteArrayToInt( data + i, &number );
178    assert( n == INTSIZE );
179    i += n;
180 
181    assert( i + INTSIZE <= length );
182    n = Converter::byteArrayToInt( data + i, &messageId );
183    assert( n == INTSIZE );
184    i += n;
185   
186    if ( number > 0 )
187      messageQueue[userId].toAck.push_back( number );
188   
189    assert( i + messageLength <= length );
190    assert( messageHandlerMap.find( (MessageId)messageId ) != messageHandlerMap.end() );
[7681]191    if ( std::find( messageQueue[userId].recievedMessages.begin(), messageQueue[userId].recievedMessages.end(), number )== messageQueue[userId].recievedMessages.end() )
192    {
[7693]193      if ( !(*(messageHandlerMap[(MessageId)messageId].cb))( (MessageId)messageId, data + i, messageLength, messageHandlerMap[(MessageId)messageId].someData, userId ) )
194      {
195        NetworkMessage msg;
196       
197        msg.data = new byte[messageLength];
198        memcpy( msg.data, data + i, messageLength );
199        msg.length = messageLength;
200        msg.messageId = (MessageId)messageId;
201        msg.number = userId;
202       
[8708]203        incomingMessageBuffer.push_back( msg );
[7693]204      }
[7681]205      messageQueue[userId].recievedMessages.push_back( number );
206    }
[7678]207    i += messageLength;
[7671]208  }
[7681]209 
210 
[8708]211  //TODO maybe handle incomingMessage in tick function. else local messages will not be handled if no clients are connected
212  for ( std::list<NetworkMessage>::iterator it = incomingMessageBuffer.begin(); it != incomingMessageBuffer.end();  )
[7693]213  {
[7697]214    if ( (*(messageHandlerMap[it->messageId].cb))( it->messageId, it->data, it->length, messageHandlerMap[it->messageId].someData, it->number ) )
[7693]215    {
216      std::list<NetworkMessage>::iterator delIt = it;
[7697]217      if ( it->data )
218        delete it->data;
[7693]219      it++;
[8708]220      incomingMessageBuffer.erase( delIt );
[7693]221      continue;
222    }
223    it++;
224  }
225 
[7681]226  //walk throu message queue and remove acked messages
227  for ( std::list<NetworkMessage>::iterator it = messageQueue[userId].messages.begin(); it != messageQueue[userId].messages.end();  )
228  {
229    if ( std::find( acks.begin(), acks.end(), it->number) != acks.end() )
230    {
231      std::list<NetworkMessage>::iterator delIt = it;
232      it++;
233      messageQueue[userId].messages.erase( delIt );
234      continue;
235    }
236    it++;
237  }
238 
239  //TODO find bether way. maybe with timestamp
240  if ( messageQueue[userId].recievedMessages.size() > 1000 )
241  {
242    for ( int j = 0; j < messageQueue[userId].recievedMessages.size() - 1000; j++ )
243      messageQueue[userId].recievedMessages.erase( messageQueue[userId].recievedMessages.begin() );
244  }
[7678]245
246  return i;
[7631]247}
248
249/**
250 * clean up memory reserved for user
251 * @param userId userid
252 */
253void MessageManager::cleanUpUser( int userId )
254{
[7678]255  if ( messageQueue.find( userId ) == messageQueue.end() )
256    return;
257   
258  for ( std::list<NetworkMessage>::iterator it = messageQueue[userId].messages.begin(); it != messageQueue[userId].messages.end(); it++ )
259  {
260    if ( it->data )
261      delete it->data;
262    it->data = NULL;
263  }
264 
265  messageQueue[userId].toAck.clear();
266 
267  messageQueue.erase( userId );
[7631]268}
[7671]269
270/**
271 * registers function to handle messages with id messageId. someData is passed to callbackfuntion
272 * @param messageId message id to handle
273 * @param cb function pointer to callback function
274 * @param someData this pointer is passed to callback function without modification
275 * @return true on success
276 */
277bool MessageManager::registerMessageHandler( MessageId messageId, MessageCallback cb, void * someData )
278{
[7678]279  MessageHandler messageHandler;
280 
281  messageHandler.cb = cb;
282  messageHandler.messageId = messageId;
283  messageHandler.someData = someData;
284 
285  messageHandlerMap[messageId] = messageHandler;
286 
287  return true;
[7671]288}
289
290/**
291 * initializes buffers for user
292 * @param userId userId
293 */
294void MessageManager::initUser( int userId )
295{
[7678]296  // just do something so map creates a new entry
297  messageQueue[userId].toAck.clear();
[7872]298  //assert( messageQueue[userId].messages.size() == 0 );
[7671]299}
[7681]300
301/**
302 * send a message to one or more clients
303 * recieverType:
304 *               RT_ALL send to all users. reciever is ignored
305 *               RT_USER send only to reciever
306 *               RT_NOT_USER send to all but reciever
307 *
308 * @param messageId message id
309 * @param data pointer to data
310 * @param dataLength length of data
311 * @param recieverType
312 * @param reciever
313 */
314void MessageManager::sendMessage( MessageId messageId, byte * data, int dataLength, RecieverType recieverType, int reciever, MessagePriority messagePriority )
315{
316  for ( MessageQueue::iterator it = messageQueue.begin(); it != messageQueue.end(); it++ )
317  {
318    if ( 
[8068]319         recieverType == RT_ALL_ME ||
320         recieverType == RT_ALL_NOT_ME ||
[7681]321         recieverType == RT_USER && it->first == reciever ||
[8228]322         recieverType == RT_NOT_USER && it->first != reciever ||
323         recieverType == RT_SERVER && getNetworkStream()->isUserServer( it->first )
[7681]324       )
325    {
326      NetworkMessage msg;
[8623]327
[7681]328      msg.data = new byte[dataLength];
329      memcpy( msg.data, data, dataLength );
330      msg.length = dataLength;
331      msg.messageId = messageId;
332      msg.number = newNumber++;
333      msg.priority = messagePriority;
[8623]334
[7681]335      it->second.messages.push_back( msg );
336    }
337  }
[8623]338 
339  if ( recieverType == RT_ALL_ME )
340  {
341    NetworkMessage msg;
342
343    msg.data = new byte[dataLength];
344    memcpy( msg.data, data, dataLength );
345    msg.length = dataLength;
346    msg.messageId = messageId;
[8708]347    msg.number = SharedNetworkData::getInstance()->getHostID();
[8623]348    msg.priority = messagePriority;
349
[8708]350    incomingMessageBuffer.push_back( msg );
[8623]351  }
[7681]352}
353
354
Note: See TracBrowser for help on using the repository browser.