Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

source: orxonox.OLD/branches/mountain_lake/src/lib/network/message_manager.cc @ 9013

Last change on this file since 9013 was 8708, checked in by bensch, 18 years ago

merged network back
merged with command:
svn merge -r8625:HEAD https://svn.orxonox.net/orxonox/branches/network .
no conflicts

File size: 9.7 KB
Line 
1/*
2   orxonox - the future of 3D-vertical-scrollers
3
4   Copyright (C) 2004 orx
5
6   This program is free software; you can redistribute it and/or modify
7   it under the terms of the GNU General Public License as published by
8   the Free Software Foundation; either version 2, or (at your option)
9   any later version.
10
11   ### File Specific:
12   main-programmer: Christoph Renner
13   co-programmer: ...
14*/
15
16//#define DEBUG_SPECIAL_MODULE DEBUG_MODULE_
17
18#include "message_manager.h"
19
20#include "network_stream.h"
21#include "shared_network_data.h"
22
23using namespace std;
24
25MessageManager* MessageManager::singletonRef = NULL;
26
27
28/**
29 * standard constructor
30*/
31MessageManager::MessageManager ()
32{
33  this->setClassID( CL_MESSAGE_MANAGER, "MessageManager" );
34  newNumber = 1;
35  setSynchronized( true );
36}
37
38
39/**
40 * standard deconstructor
41*/
42MessageManager::~MessageManager ()
43{
44  for ( MessageQueue::iterator it = messageQueue.begin(); it != messageQueue.end(); it++ )
45  {
46    for ( std::list<NetworkMessage>::iterator it2 = it->second.messages.begin(); it2 != it->second.messages.end(); it2++ )
47    {
48      if ( it2->data )
49      {
50        delete [] it2->data;
51        it2->data = NULL;
52      }
53    }
54   
55    it->second.messages.clear();
56    it->second.toAck.clear();
57  }
58 
59  messageQueue.clear();
60}
61
62/**
63 * get the diff to last acked state of userId
64 *
65 * this class does not use the normal SynchronizeableVars for zynchronisation. instead
66 * it defines its own protocol
67 *
68 * @param userId user to create diff for
69 * @param data buffer to copy diff in
70 * @param maxLength max bytes to copy into data
71 * @param stateId id of current state
72 * @param fromStateId the reference state for the delta state
73 * @param priorityTH tells getStateDiff to not send element with priority \< priorityTH
74 * @return n bytes copied into data
75 */
76int MessageManager::getStateDiff( int userId, byte * data, int maxLength, int stateId, int fromStateId, int priorityTH )
77{
78  int i = 0;
79  int n;
80 
81  n = Converter::intToByteArray( messageQueue[userId].toAck.size(), data + i, maxLength );
82  i += n;
83  assert( n == INTSIZE );
84 
85  for ( std::list<int>::iterator it = messageQueue[userId].toAck.begin(); it != messageQueue[userId].toAck.end(); it++)
86  {
87    n = Converter::intToByteArray( *it, data + i, maxLength );
88    i += n;
89    assert( n == INTSIZE );
90  }
91 
92  messageQueue[userId].toAck.clear();
93 
94  n = Converter::intToByteArray( messageQueue[userId].messages.size(), data + i, maxLength );
95  i += n;
96  assert( n == INTSIZE );
97 
98  for ( std::list<NetworkMessage>::iterator it = messageQueue[userId].messages.begin(); it != messageQueue[userId].messages.end(); it++ )
99  {
100    n = Converter::intToByteArray( it->length, data + i, maxLength );
101    i += n;
102    assert( n == INTSIZE );
103   
104    n = Converter::intToByteArray( it->number, data + i, maxLength );
105    i += n;
106    assert( n == INTSIZE );
107   
108    n = Converter::intToByteArray( it->messageId, data + i, maxLength );
109    i += n;
110    assert( n == INTSIZE );
111   
112    assert( i + it->length <= maxLength );
113    memcpy( data + i, it->data, it->length );
114    i += it->length;
115  }
116 
117  return i;
118}
119
120/**
121 * sets a new state out of a diff created on another host
122 * @param userId hostId of user who send me that diff
123 * @param data pointer to diff
124 * @param length length of diff
125 * @param stateId id of current state
126 * @param fromStateId id of the base state id
127 * @return number bytes read
128 * @todo check for permissions
129 */
130int MessageManager::setStateDiff( int userId, byte * data, int length, int stateId, int fromStateId )
131{
132  int i = 0;
133  int n;
134 
135  int nAcks;
136 
137  assert( i + INTSIZE <= length );
138  n = Converter::byteArrayToInt( data + i, &nAcks );
139  assert( n == INTSIZE );
140  i += n;
141 
142  std::list<int> acks;
143 
144  int number;
145 
146  for ( int j = 0; j < nAcks; j++ )
147  {
148    assert( i + INTSIZE <= length );
149    n = Converter::byteArrayToInt( data + i, &number );
150    assert( n == INTSIZE );
151    i += n;
152   
153    acks.push_back( number );
154  }
155 
156  int nMessages;
157 
158  assert( i + INTSIZE <= length );
159  n = Converter::byteArrayToInt( data + i, &nMessages );
160  assert( n == INTSIZE );
161  i += n;
162
163  int messageLength, messageId;
164 
165  for ( int j = 0; j < nMessages; j++ )
166  {
167    assert( i + INTSIZE <= length );
168    n = Converter::byteArrayToInt( data + i, &messageLength );
169    assert( n == INTSIZE );
170    i += n;
171   
172    assert( i + INTSIZE <= length );
173    n = Converter::byteArrayToInt( data + i, &number );
174    assert( n == INTSIZE );
175    i += n;
176 
177    assert( i + INTSIZE <= length );
178    n = Converter::byteArrayToInt( data + i, &messageId );
179    assert( n == INTSIZE );
180    i += n;
181   
182    if ( number > 0 )
183      messageQueue[userId].toAck.push_back( number );
184   
185    assert( i + messageLength <= length );
186    assert( messageHandlerMap.find( (MessageId)messageId ) != messageHandlerMap.end() );
187    if ( std::find( messageQueue[userId].recievedMessages.begin(), messageQueue[userId].recievedMessages.end(), number )== messageQueue[userId].recievedMessages.end() )
188    {
189      if ( !(*(messageHandlerMap[(MessageId)messageId].cb))( (MessageId)messageId, data + i, messageLength, messageHandlerMap[(MessageId)messageId].someData, userId ) )
190      {
191        NetworkMessage msg;
192       
193        msg.data = new byte[messageLength];
194        memcpy( msg.data, data + i, messageLength );
195        msg.length = messageLength;
196        msg.messageId = (MessageId)messageId;
197        msg.number = userId;
198       
199        incomingMessageBuffer.push_back( msg );
200      }
201      messageQueue[userId].recievedMessages.push_back( number );
202    }
203    i += messageLength;
204  }
205 
206 
207  //TODO maybe handle incomingMessage in tick function. else local messages will not be handled if no clients are connected
208  for ( std::list<NetworkMessage>::iterator it = incomingMessageBuffer.begin(); it != incomingMessageBuffer.end();  )
209  {
210    if ( (*(messageHandlerMap[it->messageId].cb))( it->messageId, it->data, it->length, messageHandlerMap[it->messageId].someData, it->number ) )
211    {
212      std::list<NetworkMessage>::iterator delIt = it;
213      if ( it->data )
214        delete it->data;
215      it++;
216      incomingMessageBuffer.erase( delIt );
217      continue;
218    }
219    it++;
220  }
221 
222  //walk throu message queue and remove acked messages
223  for ( std::list<NetworkMessage>::iterator it = messageQueue[userId].messages.begin(); it != messageQueue[userId].messages.end();  )
224  {
225    if ( std::find( acks.begin(), acks.end(), it->number) != acks.end() )
226    {
227      std::list<NetworkMessage>::iterator delIt = it;
228      it++;
229      messageQueue[userId].messages.erase( delIt );
230      continue;
231    }
232    it++;
233  }
234 
235  //TODO find bether way. maybe with timestamp
236  if ( messageQueue[userId].recievedMessages.size() > 1000 )
237  {
238    for ( int j = 0; j < messageQueue[userId].recievedMessages.size() - 1000; j++ )
239      messageQueue[userId].recievedMessages.erase( messageQueue[userId].recievedMessages.begin() );
240  }
241
242  return i;
243}
244
245/**
246 * clean up memory reserved for user
247 * @param userId userid
248 */
249void MessageManager::cleanUpUser( int userId )
250{
251  if ( messageQueue.find( userId ) == messageQueue.end() )
252    return;
253   
254  for ( std::list<NetworkMessage>::iterator it = messageQueue[userId].messages.begin(); it != messageQueue[userId].messages.end(); it++ )
255  {
256    if ( it->data )
257      delete it->data;
258    it->data = NULL;
259  }
260 
261  messageQueue[userId].toAck.clear();
262 
263  messageQueue.erase( userId );
264}
265
266/**
267 * registers function to handle messages with id messageId. someData is passed to callbackfuntion
268 * @param messageId message id to handle
269 * @param cb function pointer to callback function
270 * @param someData this pointer is passed to callback function without modification
271 * @return true on success
272 */
273bool MessageManager::registerMessageHandler( MessageId messageId, MessageCallback cb, void * someData )
274{
275  MessageHandler messageHandler;
276 
277  messageHandler.cb = cb;
278  messageHandler.messageId = messageId;
279  messageHandler.someData = someData;
280 
281  messageHandlerMap[messageId] = messageHandler;
282 
283  return true;
284}
285
286/**
287 * initializes buffers for user
288 * @param userId userId
289 */
290void MessageManager::initUser( int userId )
291{
292  // just do something so map creates a new entry
293  messageQueue[userId].toAck.clear();
294  //assert( messageQueue[userId].messages.size() == 0 );
295}
296
297/**
298 * send a message to one or more clients
299 * recieverType:
300 *               RT_ALL send to all users. reciever is ignored
301 *               RT_USER send only to reciever
302 *               RT_NOT_USER send to all but reciever
303 *
304 * @param messageId message id
305 * @param data pointer to data
306 * @param dataLength length of data
307 * @param recieverType
308 * @param reciever
309 */
310void MessageManager::sendMessage( MessageId messageId, byte * data, int dataLength, RecieverType recieverType, int reciever, MessagePriority messagePriority )
311{
312  for ( MessageQueue::iterator it = messageQueue.begin(); it != messageQueue.end(); it++ )
313  {
314    if ( 
315         recieverType == RT_ALL_ME ||
316         recieverType == RT_ALL_NOT_ME ||
317         recieverType == RT_USER && it->first == reciever ||
318         recieverType == RT_NOT_USER && it->first != reciever ||
319         recieverType == RT_SERVER && getNetworkStream()->isUserServer( it->first )
320       )
321    {
322      NetworkMessage msg;
323
324      msg.data = new byte[dataLength];
325      memcpy( msg.data, data, dataLength );
326      msg.length = dataLength;
327      msg.messageId = messageId;
328      msg.number = newNumber++;
329      msg.priority = messagePriority;
330
331      it->second.messages.push_back( msg );
332    }
333  }
334 
335  if ( recieverType == RT_ALL_ME )
336  {
337    NetworkMessage msg;
338
339    msg.data = new byte[dataLength];
340    memcpy( msg.data, data, dataLength );
341    msg.length = dataLength;
342    msg.messageId = messageId;
343    msg.number = SharedNetworkData::getInstance()->getHostID();
344    msg.priority = messagePriority;
345
346    incomingMessageBuffer.push_back( msg );
347  }
348}
349
350
Note: See TracBrowser for help on using the repository browser.