Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

source: code/trunk/src/libraries/network/Connection.cc @ 8460

Last change on this file since 8460 was 8327, checked in by scheusso, 14 years ago

merging network6 into trunk

  • Property svn:eol-style set to native
File size: 10.9 KB
Line 
1/*
2 *   ORXONOX - the hottest 3D action shooter ever to exist
3 *                    > www.orxonox.net <
4 *
5 *
6 *   License notice:
7 *
8 *   This program is free software; you can redistribute it and/or
9 *   modify it under the terms of the GNU General Public License
10 *   as published by the Free Software Foundation; either version 2
11 *   of the License, or (at your option) any later version.
12 *
13 *   This program is distributed in the hope that it will be useful,
14 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 *   GNU General Public License for more details.
17 *
18 *   You should have received a copy of the GNU General Public License
19 *   along with this program; if not, write to the Free Software
20 *   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
21 *
22 *   Author:
23 *      Oliver Scheuss
24 *   Co-authors:
25 *      ...
26 *
27 */
28
29#include "Connection.h"
30
31#include <cassert>
32#include <deque>
33#define WIN32_LEAN_AND_MEAN
34#include <enet/enet.h>
35#include <boost/thread.hpp>
36#include <boost/thread/mutex.hpp>
37#include <boost/date_time.hpp>
38
39#include "packet/Packet.h"
40#include <util/Sleep.h>
41
42namespace orxonox
43{
44  const boost::posix_time::millisec NETWORK_COMMUNICATION_THREAD_WAIT_TIME(200);
45  const unsigned int                NETWORK_DISCONNECT_TIMEOUT = 500;
46
47  Connection::Connection(uint32_t firstPeerID):
48    host_(0), bCommunicationThreadRunning_(false), nextPeerID_(firstPeerID)
49  {
50    enet_initialize();
51    atexit(enet_deinitialize);
52    this->incomingEventsMutex_ = new boost::mutex;
53    this->outgoingEventsMutex_ = new boost::mutex;
54//     this->overallMutex_ = new boost::mutex;
55  }
56
57  Connection::~Connection()
58  {
59    delete this->incomingEventsMutex_;
60    delete this->outgoingEventsMutex_;
61  }
62
63  void Connection::startCommunicationThread()
64  {
65    this->bCommunicationThreadRunning_ = true;
66    this->communicationThread_ = new boost::thread(&Connection::communicationThread, this);
67  }
68 
69  void Connection::stopCommunicationThread()
70  {
71    this->bCommunicationThreadRunning_ = false;
72    if( !this->communicationThread_->timed_join(NETWORK_COMMUNICATION_THREAD_WAIT_TIME) )
73    {
74      // force thread to stop
75      this->communicationThread_->interrupt();
76    }
77    delete this->communicationThread_;
78  }
79
80  void Connection::disconnectPeer(uint32_t peerID)
81  {
82//     this->overallMutex_->lock();
83    outgoingEvent outEvent = { peerID, outgoingEventType::disconnectPeer, 0, 0 };
84   
85    this->outgoingEventsMutex_->lock();
86    this->outgoingEvents_.push_back(outEvent);
87    this->outgoingEventsMutex_->unlock();
88//     this->overallMutex_->unlock();
89  }
90 
91  void Connection::disconnectPeers()
92  {
93    outgoingEvent outEvent = { 0, outgoingEventType::disconnectPeers, 0, 0 };
94   
95    this->outgoingEventsMutex_->lock();
96    this->outgoingEvents_.push_back(outEvent);
97    this->outgoingEventsMutex_->unlock();
98  }
99
100  void Connection::addPacket(ENetPacket* packet, uint32_t peerID, uint8_t channelID)
101  {
102//     this->overallMutex_->lock();
103    outgoingEvent outEvent = { peerID, outgoingEventType::sendPacket, packet, channelID };
104   
105    this->outgoingEventsMutex_->lock();
106    this->outgoingEvents_.push_back(outEvent);
107    this->outgoingEventsMutex_->unlock();
108//     this->overallMutex_->unlock();
109  }
110 
111  void Connection::broadcastPacket(ENetPacket* packet, uint8_t channelID)
112  {
113//     this->overallMutex_->lock();
114    outgoingEvent outEvent = { 0, outgoingEventType::broadcastPacket, packet, channelID };
115   
116    this->outgoingEventsMutex_->lock();
117    this->outgoingEvents_.push_back(outEvent);
118    this->outgoingEventsMutex_->unlock();
119//     this->overallMutex_->unlock();
120  }
121
122 
123  void Connection::communicationThread()
124  {
125    ENetEvent event;
126   
127//     this->overallMutex_->lock();
128    while( bCommunicationThreadRunning_ )
129    {
130      // Receive all pending incoming Events (such as packets, connects and disconnects)
131      while( enet_host_check_events( this->host_, &event ) > 0 )
132      {
133        processIncomingEvent(event);
134      }
135     
136//       this->overallMutex_->unlock();
137      msleep(1);
138//       this->overallMutex_->lock();
139     
140      // Send all waiting outgoing packets
141      this->outgoingEventsMutex_->lock();
142      uint32_t outgoingEventsCount = this->outgoingEvents_.size();
143      this->outgoingEventsMutex_->unlock();
144      while( outgoingEventsCount > 0 )
145      {
146//         COUT(0) << "outgoing event" << endl;
147        this->outgoingEventsMutex_->lock();
148        outgoingEvent outEvent = this->outgoingEvents_.front();
149        this->outgoingEvents_.pop_front();
150        this->outgoingEventsMutex_->unlock();
151       
152        processOutgoingEvent(outEvent);
153       
154        this->outgoingEventsMutex_->lock();
155        outgoingEventsCount = this->outgoingEvents_.size();
156        this->outgoingEventsMutex_->unlock();
157      }
158     
159      // Wait for incoming events (at most NETWORK_WAIT_TIMEOUT ms)
160      if( enet_host_service( this->host_, &event, NETWORK_WAIT_TIMEOUT ) > 0 )
161      {
162        processIncomingEvent(event);
163      }
164    }
165//     this->overallMutex_->unlock();
166  }
167 
168  void Connection::processIncomingEvent(ENetEvent& event)
169  {
170    incomingEvent inEvent;
171    // preprocess event
172    switch( event.type )
173    {
174      case ENET_EVENT_TYPE_CONNECT:
175        inEvent = preprocessConnectEvent(event);
176        break;
177      case ENET_EVENT_TYPE_RECEIVE:
178        inEvent = preprocessReceiveEvent(event);
179        break;
180      case ENET_EVENT_TYPE_DISCONNECT:
181        inEvent = preprocessDisconnectEvent(event);
182        break;
183      case ENET_EVENT_TYPE_NONE:
184      default:
185        return;
186    }
187   
188    // pushing event to queue
189    this->incomingEventsMutex_->lock();
190    this->incomingEvents_.push_back(inEvent);
191    this->incomingEventsMutex_->unlock();
192  }
193 
194  void Connection::processOutgoingEvent(outgoingEvent& event)
195  {
196    ENetPeer* peer;
197    switch( event.type )
198    {
199      case outgoingEventType::sendPacket:
200        // check whether the peer is still/already in the peer list
201        if( this->peerMap_.find(event.peerID) != this->peerMap_.end() )
202        {
203          peer = this->peerMap_[event.peerID];
204          enet_peer_send( peer, event.channelID, event.packet );
205        }
206        else
207        {
208          // peer probably already disconnected so just discard packet
209          assert(event.peerID<this->nextPeerID_);
210          enet_packet_destroy(event.packet);
211        }
212        break;
213      case outgoingEventType::disconnectPeer:
214        if( this->peerMap_.find(event.peerID) != this->peerMap_.end() )
215        {
216          peer = this->peerMap_[event.peerID];
217          enet_peer_disconnect(peer, 0);
218        }
219        else
220        {
221          // peer probably already disconnected so just discard disconnect event
222          assert(event.peerID<this->nextPeerID_);
223        }
224        break;
225      case outgoingEventType::disconnectPeers:
226        disconnectPeersInternal();
227        break;
228      case outgoingEventType::broadcastPacket:
229        enet_host_broadcast( this->host_, event.channelID, event.packet );
230        break;
231      default:
232        assert(0);
233    }
234  }
235
236
237  void Connection::disconnectPeersInternal()
238  {
239    std::map<uint32_t, ENetPeer*>::iterator it;
240    for( it=this->peerMap_.begin(); it!=this->peerMap_.end(); ++it )
241    {
242      enet_peer_disconnect(it->second, 0);
243    }
244    uint32_t iterations = NETWORK_DISCONNECT_TIMEOUT/NETWORK_WAIT_TIMEOUT;
245    uint32_t i = 0;
246    while( this->peerMap_.size() && i++ < iterations )
247    {
248      ENetEvent event;
249      if( enet_host_service( this->host_, &event, NETWORK_WAIT_TIMEOUT ) > 0 )
250      {
251        processIncomingEvent(event);
252      }
253    }
254  }
255
256  void Connection::processQueue()
257  {
258    incomingEvent inEvent;
259
260    this->incomingEventsMutex_->lock();
261    uint32_t incomingEventsCount = this->incomingEvents_.size();
262    this->incomingEventsMutex_->unlock();
263    while( incomingEventsCount > 0 )
264    {
265      // pop event from queue
266      this->incomingEventsMutex_->lock();
267      inEvent = this->incomingEvents_.front();
268      this->incomingEvents_.pop_front();
269      this->incomingEventsMutex_->unlock();
270     
271      // process event
272      switch( inEvent.type )
273      {
274        case incomingEventType::peerConnect:
275          addPeer(inEvent.peerID);
276          break;
277        case incomingEventType::peerDisconnect:
278          removePeer(inEvent.peerID);
279          break;
280        case incomingEventType::receivePacket:
281          processPacket(inEvent.packet);
282          break;
283        default:
284          break;
285      }
286     
287      // check whether there are still events in the queue
288      this->incomingEventsMutex_->lock();
289      incomingEventsCount = this->incomingEvents_.size();
290      this->incomingEventsMutex_->unlock();
291    }
292  }
293 
294  void Connection::waitOutgoingQueue()
295  {
296    uint32_t outgoingEventsCount;
297    this->outgoingEventsMutex_->lock();
298    outgoingEventsCount = this->outgoingEvents_.size();
299    this->outgoingEventsMutex_->unlock();
300    while( outgoingEventsCount )
301    {
302      msleep(1);
303      this->outgoingEventsMutex_->lock();
304      outgoingEventsCount = this->outgoingEvents_.size();
305      this->outgoingEventsMutex_->unlock();
306    }
307  }
308
309
310  incomingEvent Connection::preprocessConnectEvent(ENetEvent& event)
311  {
312    // make sure this peer doesn't exist
313    assert( this->peerMap_.find(this->nextPeerID_) == this->peerMap_.end() );
314    assert( this->peerIDMap_.find(event.peer) == this->peerIDMap_.end() );
315   
316    // give peer a new id and increase peerID for next peer
317    uint32_t peerID = this->nextPeerID_;
318    ++this->nextPeerID_;
319   
320    // add peer/peerID into peerMap_ and peerIDMap_
321    this->peerMap_[peerID] = event.peer;
322    this->peerIDMap_[event.peer] = peerID;
323   
324    // create new peerEvent and return it
325    incomingEvent inEvent = { peerID, incomingEventType::peerConnect, 0 };
326    return inEvent;
327  }
328 
329  incomingEvent Connection::preprocessDisconnectEvent(ENetEvent& event)
330  {
331    // assert that the peer exists and get peerID
332    assert( this->peerIDMap_.find(event.peer) != this->peerIDMap_.end() );
333    uint32_t peerID = this->peerIDMap_[event.peer];
334   
335    // remove peer/peerID from maps
336    this->peerIDMap_.erase(this->peerIDMap_.find(event.peer));
337    this->peerMap_.erase(this->peerMap_.find(peerID));
338   
339    // create new peerEvent and return it
340    incomingEvent inEvent = { peerID, incomingEventType::peerDisconnect, 0 };
341    return inEvent;
342  }
343 
344  incomingEvent Connection::preprocessReceiveEvent(ENetEvent& event)
345  {
346    // assert that the peer exists and get peerID
347    assert( this->peerIDMap_.find(event.peer) != this->peerIDMap_.end() );
348    uint32_t peerID = this->peerIDMap_[event.peer];
349   
350    // create new Packet from ENetPacket
351    packet::Packet* p = packet::Packet::createPacket(event.packet, peerID);
352   
353    // create new peerEvent and return it
354    incomingEvent inEvent = { peerID, incomingEventType::receivePacket, p };
355    return inEvent;
356  }
357
358 
359  void Connection::enableCompression()
360  {
361    enet_host_compress_with_range_coder( this->host_ );
362  }
363
364
365}
Note: See TracBrowser for help on using the repository browser.