Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

source: orxonox.OLD/branches/atmospheric_engine/src/lib/network/message_manager.cc @ 8733

Last change on this file since 8733 was 8228, checked in by patrick, 19 years ago

trunk: merged the network branche back to trunk with command: svn merge branches/network trunk -r8150:HEAD

File size: 9.3 KB
Line 
1/*
2   orxonox - the future of 3D-vertical-scrollers
3
4   Copyright (C) 2004 orx
5
6   This program is free software; you can redistribute it and/or modify
7   it under the terms of the GNU General Public License as published by
8   the Free Software Foundation; either version 2, or (at your option)
9   any later version.
10
11   ### File Specific:
12   main-programmer: Christoph Renner
13   co-programmer: ...
14*/
15
16//#define DEBUG_SPECIAL_MODULE DEBUG_MODULE_
17
18#include "message_manager.h"
19
20#include "network_stream.h"
21
22using namespace std;
23
24MessageManager* MessageManager::singletonRef = NULL;
25
26
27/**
28 * standard constructor
29*/
30MessageManager::MessageManager ()
31{
32  this->setClassID( CL_MESSAGE_MANAGER, "MessageManager" );
33  newNumber = 1;
34  setSynchronized( true );
35}
36
37
38/**
39 * standard deconstructor
40*/
41MessageManager::~MessageManager ()
42{
43  for ( MessageQueue::iterator it = messageQueue.begin(); it != messageQueue.end(); it++ )
44  {
45    for ( std::list<NetworkMessage>::iterator it2 = it->second.messages.begin(); it2 != it->second.messages.end(); it2++ )
46    {
47      if ( it2->data )
48      {
49        delete it2->data;
50        it2->data = NULL;
51      }
52    }
53   
54    it->second.messages.clear();
55    it->second.toAck.clear();
56  }
57 
58  messageQueue.clear();
59}
60
61/**
62 * get the diff to last acked state of userId
63 *
64 * this class does not use the normal SynchronizeableVars for zynchronisation. instead
65 * it defines its own protocol
66 *
67 * @param userId user to create diff for
68 * @param data buffer to copy diff in
69 * @param maxLength max bytes to copy into data
70 * @param stateId id of current state
71 * @param fromStateId the reference state for the delta state
72 * @param priorityTH tells getStateDiff to not send element with priority \< priorityTH
73 * @return n bytes copied into data
74 */
75int MessageManager::getStateDiff( int userId, byte * data, int maxLength, int stateId, int fromStateId, int priorityTH )
76{
77  int i = 0;
78  int n;
79 
80  n = Converter::intToByteArray( messageQueue[userId].toAck.size(), data + i, maxLength );
81  i += n;
82  assert( n == INTSIZE );
83 
84  for ( std::list<int>::iterator it = messageQueue[userId].toAck.begin(); it != messageQueue[userId].toAck.end(); it++)
85  {
86    n = Converter::intToByteArray( *it, data + i, maxLength );
87    i += n;
88    assert( n == INTSIZE );
89  }
90 
91  messageQueue[userId].toAck.clear();
92 
93  n = Converter::intToByteArray( messageQueue[userId].messages.size(), data + i, maxLength );
94  i += n;
95  assert( n == INTSIZE );
96 
97  for ( std::list<NetworkMessage>::iterator it = messageQueue[userId].messages.begin(); it != messageQueue[userId].messages.end(); it++ )
98  {
99    n = Converter::intToByteArray( it->length, data + i, maxLength );
100    i += n;
101    assert( n == INTSIZE );
102   
103    n = Converter::intToByteArray( it->number, data + i, maxLength );
104    i += n;
105    assert( n == INTSIZE );
106   
107    n = Converter::intToByteArray( it->messageId, data + i, maxLength );
108    i += n;
109    assert( n == INTSIZE );
110   
111    assert( i + it->length <= maxLength );
112    memcpy( data + i, it->data, it->length );
113    i += it->length;
114  }
115 
116  return i;
117}
118
119/**
120 * sets a new state out of a diff created on another host
121 * @param userId hostId of user who send me that diff
122 * @param data pointer to diff
123 * @param length length of diff
124 * @param stateId id of current state
125 * @param fromStateId id of the base state id
126 * @return number bytes read
127 * @todo check for permissions
128 */
129int MessageManager::setStateDiff( int userId, byte * data, int length, int stateId, int fromStateId )
130{
131  int i = 0;
132  int n;
133 
134  int nAcks;
135 
136  assert( i + INTSIZE <= length );
137  n = Converter::byteArrayToInt( data + i, &nAcks );
138  assert( n == INTSIZE );
139  i += n;
140 
141  std::list<int> acks;
142 
143  int number;
144 
145  for ( int j = 0; j < nAcks; j++ )
146  {
147    assert( i + INTSIZE <= length );
148    n = Converter::byteArrayToInt( data + i, &number );
149    assert( n == INTSIZE );
150    i += n;
151   
152    acks.push_back( number );
153  }
154 
155  int nMessages;
156 
157  assert( i + INTSIZE <= length );
158  n = Converter::byteArrayToInt( data + i, &nMessages );
159  assert( n == INTSIZE );
160  i += n;
161
162  int messageLength, messageId;
163 
164  for ( int j = 0; j < nMessages; j++ )
165  {
166    assert( i + INTSIZE <= length );
167    n = Converter::byteArrayToInt( data + i, &messageLength );
168    assert( n == INTSIZE );
169    i += n;
170   
171    assert( i + INTSIZE <= length );
172    n = Converter::byteArrayToInt( data + i, &number );
173    assert( n == INTSIZE );
174    i += n;
175 
176    assert( i + INTSIZE <= length );
177    n = Converter::byteArrayToInt( data + i, &messageId );
178    assert( n == INTSIZE );
179    i += n;
180   
181    if ( number > 0 )
182      messageQueue[userId].toAck.push_back( number );
183   
184    assert( i + messageLength <= length );
185    assert( messageHandlerMap.find( (MessageId)messageId ) != messageHandlerMap.end() );
186    if ( std::find( messageQueue[userId].recievedMessages.begin(), messageQueue[userId].recievedMessages.end(), number )== messageQueue[userId].recievedMessages.end() )
187    {
188      if ( !(*(messageHandlerMap[(MessageId)messageId].cb))( (MessageId)messageId, data + i, messageLength, messageHandlerMap[(MessageId)messageId].someData, userId ) )
189      {
190        NetworkMessage msg;
191       
192        msg.data = new byte[messageLength];
193        memcpy( msg.data, data + i, messageLength );
194        msg.length = messageLength;
195        msg.messageId = (MessageId)messageId;
196        msg.number = userId;
197       
198        incomingMessabeBuffer.push_back( msg );
199      }
200      messageQueue[userId].recievedMessages.push_back( number );
201    }
202    i += messageLength;
203  }
204 
205 
206  for ( std::list<NetworkMessage>::iterator it = incomingMessabeBuffer.begin(); it != incomingMessabeBuffer.end();  )
207  {
208    if ( (*(messageHandlerMap[it->messageId].cb))( it->messageId, it->data, it->length, messageHandlerMap[it->messageId].someData, it->number ) )
209    {
210      std::list<NetworkMessage>::iterator delIt = it;
211      if ( it->data )
212        delete it->data;
213      it++;
214      incomingMessabeBuffer.erase( delIt );
215      continue;
216    }
217    it++;
218  }
219 
220  //walk throu message queue and remove acked messages
221  for ( std::list<NetworkMessage>::iterator it = messageQueue[userId].messages.begin(); it != messageQueue[userId].messages.end();  )
222  {
223    if ( std::find( acks.begin(), acks.end(), it->number) != acks.end() )
224    {
225      std::list<NetworkMessage>::iterator delIt = it;
226      it++;
227      messageQueue[userId].messages.erase( delIt );
228      continue;
229    }
230    it++;
231  }
232 
233  //TODO find bether way. maybe with timestamp
234  if ( messageQueue[userId].recievedMessages.size() > 1000 )
235  {
236    for ( int j = 0; j < messageQueue[userId].recievedMessages.size() - 1000; j++ )
237      messageQueue[userId].recievedMessages.erase( messageQueue[userId].recievedMessages.begin() );
238  }
239
240  return i;
241}
242
243/**
244 * clean up memory reserved for user
245 * @param userId userid
246 */
247void MessageManager::cleanUpUser( int userId )
248{
249  if ( messageQueue.find( userId ) == messageQueue.end() )
250    return;
251   
252  for ( std::list<NetworkMessage>::iterator it = messageQueue[userId].messages.begin(); it != messageQueue[userId].messages.end(); it++ )
253  {
254    if ( it->data )
255      delete it->data;
256    it->data = NULL;
257  }
258 
259  messageQueue[userId].toAck.clear();
260 
261  messageQueue.erase( userId );
262}
263
264/**
265 * registers function to handle messages with id messageId. someData is passed to callbackfuntion
266 * @param messageId message id to handle
267 * @param cb function pointer to callback function
268 * @param someData this pointer is passed to callback function without modification
269 * @return true on success
270 */
271bool MessageManager::registerMessageHandler( MessageId messageId, MessageCallback cb, void * someData )
272{
273  MessageHandler messageHandler;
274 
275  messageHandler.cb = cb;
276  messageHandler.messageId = messageId;
277  messageHandler.someData = someData;
278 
279  messageHandlerMap[messageId] = messageHandler;
280 
281  return true;
282}
283
284/**
285 * initializes buffers for user
286 * @param userId userId
287 */
288void MessageManager::initUser( int userId )
289{
290  // just do something so map creates a new entry
291  messageQueue[userId].toAck.clear();
292  //assert( messageQueue[userId].messages.size() == 0 );
293}
294
295/**
296 * send a message to one or more clients
297 * recieverType:
298 *               RT_ALL send to all users. reciever is ignored
299 *               RT_USER send only to reciever
300 *               RT_NOT_USER send to all but reciever
301 *
302 * @param messageId message id
303 * @param data pointer to data
304 * @param dataLength length of data
305 * @param recieverType
306 * @param reciever
307 */
308void MessageManager::sendMessage( MessageId messageId, byte * data, int dataLength, RecieverType recieverType, int reciever, MessagePriority messagePriority )
309{
310  for ( MessageQueue::iterator it = messageQueue.begin(); it != messageQueue.end(); it++ )
311  {
312    if ( 
313         recieverType == RT_ALL_ME ||
314         recieverType == RT_ALL_NOT_ME ||
315         recieverType == RT_USER && it->first == reciever ||
316         recieverType == RT_NOT_USER && it->first != reciever ||
317         recieverType == RT_SERVER && getNetworkStream()->isUserServer( it->first )
318       )
319    {
320      NetworkMessage msg;
321     
322      msg.data = new byte[dataLength];
323      memcpy( msg.data, data, dataLength );
324      msg.length = dataLength;
325      msg.messageId = messageId;
326      msg.number = newNumber++;
327      msg.priority = messagePriority;
328     
329      it->second.messages.push_back( msg );
330     
331      if ( recieverType == RT_ALL_ME )
332        incomingMessabeBuffer.push_back( msg );
333    }
334  }
335}
336
337
Note: See TracBrowser for help on using the repository browser.