Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

source: code/branches/output/src/libraries/network/Connection.cc @ 9352

Last change on this file since 9352 was 8807, checked in by landauf, 13 years ago

Replaced COUT with orxout in network library. Tried to set levels and contexts in a more or less useful way, but not really optimized. Used contexts network, packets, and master_server.
Please use endl instead of \n in the future (@smerkli) ;)

  • Property svn:eol-style set to native
File size: 10.9 KB
Line 
1/*
2 *   ORXONOX - the hottest 3D action shooter ever to exist
3 *                    > www.orxonox.net <
4 *
5 *
6 *   License notice:
7 *
8 *   This program is free software; you can redistribute it and/or
9 *   modify it under the terms of the GNU General Public License
10 *   as published by the Free Software Foundation; either version 2
11 *   of the License, or (at your option) any later version.
12 *
13 *   This program is distributed in the hope that it will be useful,
14 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 *   GNU General Public License for more details.
17 *
18 *   You should have received a copy of the GNU General Public License
19 *   along with this program; if not, write to the Free Software
20 *   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
21 *
22 *   Author:
23 *      Oliver Scheuss
24 *   Co-authors:
25 *      ...
26 *
27 */
28
29#include "Connection.h"
30
31#include <cassert>
32#include <deque>
33#define WIN32_LEAN_AND_MEAN
34#include <enet/enet.h>
35#include <boost/thread.hpp>
36#include <boost/thread/mutex.hpp>
37#include <boost/date_time.hpp>
38
39#include "packet/Packet.h"
40#include <util/Sleep.h>
41
42namespace orxonox
43{
44  const boost::posix_time::millisec NETWORK_COMMUNICATION_THREAD_WAIT_TIME(200);
45  const unsigned int                NETWORK_DISCONNECT_TIMEOUT = 500;
46
47  Connection::Connection(uint32_t firstPeerID):
48    host_(0), bCommunicationThreadRunning_(false), nextPeerID_(firstPeerID)
49  {
50    enet_initialize();
51    atexit(enet_deinitialize);
52    this->incomingEventsMutex_ = new boost::mutex;
53    this->outgoingEventsMutex_ = new boost::mutex;
54//     this->overallMutex_ = new boost::mutex;
55  }
56
57  Connection::~Connection()
58  {
59    delete this->incomingEventsMutex_;
60    delete this->outgoingEventsMutex_;
61  }
62
63  void Connection::startCommunicationThread()
64  {
65    this->bCommunicationThreadRunning_ = true;
66    this->communicationThread_ = new boost::thread(&Connection::communicationThread, this);
67  }
68 
69  void Connection::stopCommunicationThread()
70  {
71    this->bCommunicationThreadRunning_ = false;
72    if( !this->communicationThread_->timed_join(NETWORK_COMMUNICATION_THREAD_WAIT_TIME) )
73    {
74      // force thread to stop
75      this->communicationThread_->interrupt();
76    }
77    delete this->communicationThread_;
78  }
79
80  void Connection::disconnectPeer(uint32_t peerID)
81  {
82//     this->overallMutex_->lock();
83    outgoingEvent outEvent = { peerID, outgoingEventType::disconnectPeer, 0, 0 };
84   
85    this->outgoingEventsMutex_->lock();
86    this->outgoingEvents_.push_back(outEvent);
87    this->outgoingEventsMutex_->unlock();
88//     this->overallMutex_->unlock();
89  }
90 
91  void Connection::disconnectPeers()
92  {
93    outgoingEvent outEvent = { 0, outgoingEventType::disconnectPeers, 0, 0 };
94   
95    this->outgoingEventsMutex_->lock();
96    this->outgoingEvents_.push_back(outEvent);
97    this->outgoingEventsMutex_->unlock();
98  }
99
100  void Connection::addPacket(ENetPacket* packet, uint32_t peerID, uint8_t channelID)
101  {
102//     this->overallMutex_->lock();
103    outgoingEvent outEvent = { peerID, outgoingEventType::sendPacket, packet, channelID };
104   
105    this->outgoingEventsMutex_->lock();
106    this->outgoingEvents_.push_back(outEvent);
107    this->outgoingEventsMutex_->unlock();
108//     this->overallMutex_->unlock();
109  }
110 
111  void Connection::broadcastPacket(ENetPacket* packet, uint8_t channelID)
112  {
113//     this->overallMutex_->lock();
114    outgoingEvent outEvent = { 0, outgoingEventType::broadcastPacket, packet, channelID };
115   
116    this->outgoingEventsMutex_->lock();
117    this->outgoingEvents_.push_back(outEvent);
118    this->outgoingEventsMutex_->unlock();
119//     this->overallMutex_->unlock();
120  }
121
122 
123  void Connection::communicationThread()
124  {
125    ENetEvent event;
126   
127//     this->overallMutex_->lock();
128    while( bCommunicationThreadRunning_ )
129    {
130      // Receive all pending incoming Events (such as packets, connects and disconnects)
131      while( enet_host_check_events( this->host_, &event ) > 0 )
132      {
133        processIncomingEvent(event);
134      }
135     
136//       this->overallMutex_->unlock();
137      msleep(1);
138//       this->overallMutex_->lock();
139     
140      // Send all waiting outgoing packets
141      this->outgoingEventsMutex_->lock();
142      uint32_t outgoingEventsCount = this->outgoingEvents_.size();
143      this->outgoingEventsMutex_->unlock();
144      while( outgoingEventsCount > 0 )
145      {
146//         orxout(verbose, context::network) << "outgoing event" << endl;
147        this->outgoingEventsMutex_->lock();
148        outgoingEvent outEvent = this->outgoingEvents_.front();
149        this->outgoingEvents_.pop_front();
150        this->outgoingEventsMutex_->unlock();
151       
152        processOutgoingEvent(outEvent);
153       
154        this->outgoingEventsMutex_->lock();
155        outgoingEventsCount = this->outgoingEvents_.size();
156        this->outgoingEventsMutex_->unlock();
157      }
158     
159      // Wait for incoming events (at most NETWORK_WAIT_TIMEOUT ms)
160      if( enet_host_service( this->host_, &event, NETWORK_WAIT_TIMEOUT ) > 0 )
161      {
162        processIncomingEvent(event);
163      }
164    }
165//     this->overallMutex_->unlock();
166  }
167 
168  void Connection::processIncomingEvent(ENetEvent& event)
169  {
170    incomingEvent inEvent;
171    // preprocess event
172    switch( event.type )
173    {
174      case ENET_EVENT_TYPE_CONNECT:
175        inEvent = preprocessConnectEvent(event);
176        break;
177      case ENET_EVENT_TYPE_RECEIVE:
178        inEvent = preprocessReceiveEvent(event);
179        break;
180      case ENET_EVENT_TYPE_DISCONNECT:
181        inEvent = preprocessDisconnectEvent(event);
182        break;
183      case ENET_EVENT_TYPE_NONE:
184      default:
185        return;
186    }
187   
188    // pushing event to queue
189    this->incomingEventsMutex_->lock();
190    this->incomingEvents_.push_back(inEvent);
191    this->incomingEventsMutex_->unlock();
192  }
193 
194  void Connection::processOutgoingEvent(outgoingEvent& event)
195  {
196    ENetPeer* peer;
197    switch( event.type )
198    {
199      case outgoingEventType::sendPacket:
200        // check whether the peer is still/already in the peer list
201        if( this->peerMap_.find(event.peerID) != this->peerMap_.end() )
202        {
203          peer = this->peerMap_[event.peerID];
204          enet_peer_send( peer, event.channelID, event.packet );
205        }
206        else
207        {
208          // peer probably already disconnected so just discard packet
209          assert(event.peerID<this->nextPeerID_);
210          enet_packet_destroy(event.packet);
211        }
212        break;
213      case outgoingEventType::disconnectPeer:
214        if( this->peerMap_.find(event.peerID) != this->peerMap_.end() )
215        {
216          peer = this->peerMap_[event.peerID];
217          enet_peer_disconnect(peer, 0);
218        }
219        else
220        {
221          // peer probably already disconnected so just discard disconnect event
222          assert(event.peerID<this->nextPeerID_);
223        }
224        break;
225      case outgoingEventType::disconnectPeers:
226        disconnectPeersInternal();
227        break;
228      case outgoingEventType::broadcastPacket:
229        enet_host_broadcast( this->host_, event.channelID, event.packet );
230        break;
231      default:
232        assert(0);
233    }
234  }
235
236
237  void Connection::disconnectPeersInternal()
238  {
239    std::map<uint32_t, ENetPeer*>::iterator it;
240    for( it=this->peerMap_.begin(); it!=this->peerMap_.end(); ++it )
241    {
242      enet_peer_disconnect(it->second, 0);
243    }
244    uint32_t iterations = NETWORK_DISCONNECT_TIMEOUT/NETWORK_WAIT_TIMEOUT;
245    uint32_t i = 0;
246    while( this->peerMap_.size() && i++ < iterations )
247    {
248      ENetEvent event;
249      if( enet_host_service( this->host_, &event, NETWORK_WAIT_TIMEOUT ) > 0 )
250      {
251        processIncomingEvent(event);
252      }
253    }
254  }
255
256  void Connection::processQueue()
257  {
258    incomingEvent inEvent;
259
260    this->incomingEventsMutex_->lock();
261    uint32_t incomingEventsCount = this->incomingEvents_.size();
262    this->incomingEventsMutex_->unlock();
263    while( incomingEventsCount > 0 )
264    {
265      // pop event from queue
266      this->incomingEventsMutex_->lock();
267      inEvent = this->incomingEvents_.front();
268      this->incomingEvents_.pop_front();
269      this->incomingEventsMutex_->unlock();
270     
271      // process event
272      switch( inEvent.type )
273      {
274        case incomingEventType::peerConnect:
275          addPeer(inEvent.peerID);
276          break;
277        case incomingEventType::peerDisconnect:
278          removePeer(inEvent.peerID);
279          break;
280        case incomingEventType::receivePacket:
281          processPacket(inEvent.packet);
282          break;
283        default:
284          break;
285      }
286     
287      // check whether there are still events in the queue
288      this->incomingEventsMutex_->lock();
289      incomingEventsCount = this->incomingEvents_.size();
290      this->incomingEventsMutex_->unlock();
291    }
292  }
293 
294  void Connection::waitOutgoingQueue()
295  {
296    uint32_t outgoingEventsCount;
297    this->outgoingEventsMutex_->lock();
298    outgoingEventsCount = this->outgoingEvents_.size();
299    this->outgoingEventsMutex_->unlock();
300    while( outgoingEventsCount )
301    {
302      msleep(1);
303      this->outgoingEventsMutex_->lock();
304      outgoingEventsCount = this->outgoingEvents_.size();
305      this->outgoingEventsMutex_->unlock();
306    }
307  }
308
309
310  incomingEvent Connection::preprocessConnectEvent(ENetEvent& event)
311  {
312    // make sure this peer doesn't exist
313    assert( this->peerMap_.find(this->nextPeerID_) == this->peerMap_.end() );
314    assert( this->peerIDMap_.find(event.peer) == this->peerIDMap_.end() );
315   
316    // give peer a new id and increase peerID for next peer
317    uint32_t peerID = this->nextPeerID_;
318    ++this->nextPeerID_;
319   
320    // add peer/peerID into peerMap_ and peerIDMap_
321    this->peerMap_[peerID] = event.peer;
322    this->peerIDMap_[event.peer] = peerID;
323   
324    // create new peerEvent and return it
325    incomingEvent inEvent = { peerID, incomingEventType::peerConnect, 0 };
326    return inEvent;
327  }
328 
329  incomingEvent Connection::preprocessDisconnectEvent(ENetEvent& event)
330  {
331    // assert that the peer exists and get peerID
332    assert( this->peerIDMap_.find(event.peer) != this->peerIDMap_.end() );
333    uint32_t peerID = this->peerIDMap_[event.peer];
334   
335    // remove peer/peerID from maps
336    this->peerIDMap_.erase(this->peerIDMap_.find(event.peer));
337    this->peerMap_.erase(this->peerMap_.find(peerID));
338   
339    // create new peerEvent and return it
340    incomingEvent inEvent = { peerID, incomingEventType::peerDisconnect, 0 };
341    return inEvent;
342  }
343 
344  incomingEvent Connection::preprocessReceiveEvent(ENetEvent& event)
345  {
346    // assert that the peer exists and get peerID
347    assert( this->peerIDMap_.find(event.peer) != this->peerIDMap_.end() );
348    uint32_t peerID = this->peerIDMap_[event.peer];
349   
350    // create new Packet from ENetPacket
351    packet::Packet* p = packet::Packet::createPacket(event.packet, peerID);
352   
353    // create new peerEvent and return it
354    incomingEvent inEvent = { peerID, incomingEventType::receivePacket, p };
355    return inEvent;
356  }
357
358 
359  void Connection::enableCompression()
360  {
361    enet_host_compress_with_range_coder( this->host_ );
362  }
363
364
365}
Note: See TracBrowser for help on using the repository browser.