Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

source: code/trunk/src/libraries/network/Connection.cc @ 11564

Last change on this file since 11564 was 11071, checked in by landauf, 9 years ago

merged branch cpp11_v3 back to trunk

  • Property svn:eol-style set to native
File size: 10.9 KB
RevLine 
[3214]1/*
2 *   ORXONOX - the hottest 3D action shooter ever to exist
3 *                    > www.orxonox.net <
4 *
5 *
6 *   License notice:
7 *
8 *   This program is free software; you can redistribute it and/or
9 *   modify it under the terms of the GNU General Public License
10 *   as published by the Free Software Foundation; either version 2
11 *   of the License, or (at your option) any later version.
12 *
13 *   This program is distributed in the hope that it will be useful,
14 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 *   GNU General Public License for more details.
17 *
18 *   You should have received a copy of the GNU General Public License
19 *   along with this program; if not, write to the Free Software
20 *   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
21 *
22 *   Author:
23 *      Oliver Scheuss
24 *   Co-authors:
25 *      ...
26 *
27 */
28
29#include "Connection.h"
30
31#include <cassert>
[7801]32#include <deque>
[5929]33#define WIN32_LEAN_AND_MEAN
[3214]34#include <enet/enet.h>
[7801]35#include <boost/thread.hpp>
36#include <boost/thread/mutex.hpp>
37#include <boost/date_time.hpp>
38
[3214]39#include "packet/Packet.h"
[8327]40#include <util/Sleep.h>
[3214]41
42namespace orxonox
43{
[8327]44  const boost::posix_time::millisec NETWORK_COMMUNICATION_THREAD_WAIT_TIME(200);
45  const unsigned int                NETWORK_DISCONNECT_TIMEOUT = 500;
[3214]46
[8327]47  Connection::Connection(uint32_t firstPeerID):
[11071]48    host_(nullptr), bCommunicationThreadRunning_(false), nextPeerID_(firstPeerID)
[3214]49  {
50    enet_initialize();
51    atexit(enet_deinitialize);
[7801]52    this->incomingEventsMutex_ = new boost::mutex;
53    this->outgoingEventsMutex_ = new boost::mutex;
[8327]54//     this->overallMutex_ = new boost::mutex;
[3214]55  }
56
[7801]57  Connection::~Connection()
58  {
59    delete this->incomingEventsMutex_;
60    delete this->outgoingEventsMutex_;
[3214]61  }
62
[7801]63  void Connection::startCommunicationThread()
64  {
65    this->bCommunicationThreadRunning_ = true;
66    this->communicationThread_ = new boost::thread(&Connection::communicationThread, this);
[3214]67  }
[7801]68 
69  void Connection::stopCommunicationThread()
70  {
71    this->bCommunicationThreadRunning_ = false;
72    if( !this->communicationThread_->timed_join(NETWORK_COMMUNICATION_THREAD_WAIT_TIME) )
73    {
74      // force thread to stop
75      this->communicationThread_->interrupt();
76    }
77    delete this->communicationThread_;
78  }
[3214]79
[8327]80  void Connection::disconnectPeer(uint32_t peerID)
[7801]81  {
[8327]82//     this->overallMutex_->lock();
[11071]83    outgoingEvent outEvent = { peerID, OutgoingEventType::disconnectPeer, nullptr, 0 };
[7801]84   
85    this->outgoingEventsMutex_->lock();
86    this->outgoingEvents_.push_back(outEvent);
87    this->outgoingEventsMutex_->unlock();
[8327]88//     this->overallMutex_->unlock();
[3214]89  }
[8327]90 
91  void Connection::disconnectPeers()
92  {
[11071]93    outgoingEvent outEvent = { 0, OutgoingEventType::disconnectPeers, nullptr, 0 };
[8327]94   
95    this->outgoingEventsMutex_->lock();
96    this->outgoingEvents_.push_back(outEvent);
97    this->outgoingEventsMutex_->unlock();
98  }
[3214]99
[8327]100  void Connection::addPacket(ENetPacket* packet, uint32_t peerID, uint8_t channelID)
[7801]101  {
[8327]102//     this->overallMutex_->lock();
[11071]103    outgoingEvent outEvent = { peerID, OutgoingEventType::sendPacket, packet, channelID };
[7801]104   
105    this->outgoingEventsMutex_->lock();
106    this->outgoingEvents_.push_back(outEvent);
107    this->outgoingEventsMutex_->unlock();
[8327]108//     this->overallMutex_->unlock();
[3214]109  }
[7801]110 
111  void Connection::broadcastPacket(ENetPacket* packet, uint8_t channelID)
112  {
[8327]113//     this->overallMutex_->lock();
[11071]114    outgoingEvent outEvent = { 0, OutgoingEventType::broadcastPacket, packet, channelID };
[7801]115   
116    this->outgoingEventsMutex_->lock();
117    this->outgoingEvents_.push_back(outEvent);
118    this->outgoingEventsMutex_->unlock();
[8327]119//     this->overallMutex_->unlock();
[7801]120  }
[3214]121
[7801]122 
123  void Connection::communicationThread()
124  {
125    ENetEvent event;
126   
[8327]127//     this->overallMutex_->lock();
[7801]128    while( bCommunicationThreadRunning_ )
129    {
130      // Receive all pending incoming Events (such as packets, connects and disconnects)
131      while( enet_host_check_events( this->host_, &event ) > 0 )
132      {
[8327]133        processIncomingEvent(event);
[7801]134      }
135     
[8327]136//       this->overallMutex_->unlock();
137      msleep(1);
138//       this->overallMutex_->lock();
139     
[7801]140      // Send all waiting outgoing packets
141      this->outgoingEventsMutex_->lock();
142      uint32_t outgoingEventsCount = this->outgoingEvents_.size();
143      this->outgoingEventsMutex_->unlock();
144      while( outgoingEventsCount > 0 )
145      {
[8858]146//         orxout(verbose, context::network) << "outgoing event" << endl;
[7801]147        this->outgoingEventsMutex_->lock();
148        outgoingEvent outEvent = this->outgoingEvents_.front();
149        this->outgoingEvents_.pop_front();
150        this->outgoingEventsMutex_->unlock();
151       
[8327]152        processOutgoingEvent(outEvent);
153       
[7801]154        this->outgoingEventsMutex_->lock();
155        outgoingEventsCount = this->outgoingEvents_.size();
156        this->outgoingEventsMutex_->unlock();
157      }
158     
159      // Wait for incoming events (at most NETWORK_WAIT_TIMEOUT ms)
160      if( enet_host_service( this->host_, &event, NETWORK_WAIT_TIMEOUT ) > 0 )
161      {
[8327]162        processIncomingEvent(event);
[7801]163      }
164    }
[8327]165//     this->overallMutex_->unlock();
[3214]166  }
[8327]167 
168  void Connection::processIncomingEvent(ENetEvent& event)
169  {
170    incomingEvent inEvent;
171    // preprocess event
172    switch( event.type )
173    {
174      case ENET_EVENT_TYPE_CONNECT:
175        inEvent = preprocessConnectEvent(event);
176        break;
177      case ENET_EVENT_TYPE_RECEIVE:
178        inEvent = preprocessReceiveEvent(event);
179        break;
180      case ENET_EVENT_TYPE_DISCONNECT:
181        inEvent = preprocessDisconnectEvent(event);
182        break;
183      case ENET_EVENT_TYPE_NONE:
184      default:
185        return;
186    }
187   
188    // pushing event to queue
189    this->incomingEventsMutex_->lock();
190    this->incomingEvents_.push_back(inEvent);
191    this->incomingEventsMutex_->unlock();
192  }
193 
194  void Connection::processOutgoingEvent(outgoingEvent& event)
195  {
196    ENetPeer* peer;
197    switch( event.type )
198    {
[11071]199      case OutgoingEventType::sendPacket:
[8327]200        // check whether the peer is still/already in the peer list
201        if( this->peerMap_.find(event.peerID) != this->peerMap_.end() )
202        {
203          peer = this->peerMap_[event.peerID];
204          enet_peer_send( peer, event.channelID, event.packet );
205        }
206        else
207        {
208          // peer probably already disconnected so just discard packet
209          assert(event.peerID<this->nextPeerID_);
210          enet_packet_destroy(event.packet);
211        }
212        break;
[11071]213      case OutgoingEventType::disconnectPeer:
[8327]214        if( this->peerMap_.find(event.peerID) != this->peerMap_.end() )
215        {
216          peer = this->peerMap_[event.peerID];
217          enet_peer_disconnect(peer, 0);
218        }
219        else
220        {
221          // peer probably already disconnected so just discard disconnect event
222          assert(event.peerID<this->nextPeerID_);
223        }
224        break;
[11071]225      case OutgoingEventType::disconnectPeers:
[8327]226        disconnectPeersInternal();
227        break;
[11071]228      case OutgoingEventType::broadcastPacket:
[8327]229        enet_host_broadcast( this->host_, event.channelID, event.packet );
230        break;
231      default:
232        assert(0);
233    }
234  }
[3214]235
[8327]236
237  void Connection::disconnectPeersInternal()
238  {
[11071]239    for( const auto& mapEntry : this->peerMap_ )
[8327]240    {
[11071]241      enet_peer_disconnect(mapEntry.second, 0);
[8327]242    }
243    uint32_t iterations = NETWORK_DISCONNECT_TIMEOUT/NETWORK_WAIT_TIMEOUT;
244    uint32_t i = 0;
245    while( this->peerMap_.size() && i++ < iterations )
246    {
247      ENetEvent event;
248      if( enet_host_service( this->host_, &event, NETWORK_WAIT_TIMEOUT ) > 0 )
249      {
250        processIncomingEvent(event);
251      }
252    }
253  }
254
[7801]255  void Connection::processQueue()
256  {
[8327]257    incomingEvent inEvent;
[6417]258
[7801]259    this->incomingEventsMutex_->lock();
260    uint32_t incomingEventsCount = this->incomingEvents_.size();
261    this->incomingEventsMutex_->unlock();
262    while( incomingEventsCount > 0 )
[3214]263    {
[8327]264      // pop event from queue
[7801]265      this->incomingEventsMutex_->lock();
[8327]266      inEvent = this->incomingEvents_.front();
[7801]267      this->incomingEvents_.pop_front();
268      this->incomingEventsMutex_->unlock();
269     
[8327]270      // process event
271      switch( inEvent.type )
[7801]272      {
[11071]273        case IncomingEventType::peerConnect:
[8327]274          addPeer(inEvent.peerID);
[3214]275          break;
[11071]276        case IncomingEventType::peerDisconnect:
[8327]277          removePeer(inEvent.peerID);
[3214]278          break;
[11071]279        case IncomingEventType::receivePacket:
[8327]280          processPacket(inEvent.packet);
[3214]281          break;
[8327]282        default:
[3214]283          break;
284      }
[7801]285     
[8327]286      // check whether there are still events in the queue
[7801]287      this->incomingEventsMutex_->lock();
288      incomingEventsCount = this->incomingEvents_.size();
289      this->incomingEventsMutex_->unlock();
[3214]290    }
291  }
[8327]292 
293  void Connection::waitOutgoingQueue()
294  {
295    uint32_t outgoingEventsCount;
296    this->outgoingEventsMutex_->lock();
297    outgoingEventsCount = this->outgoingEvents_.size();
298    this->outgoingEventsMutex_->unlock();
299    while( outgoingEventsCount )
300    {
301      msleep(1);
302      this->outgoingEventsMutex_->lock();
303      outgoingEventsCount = this->outgoingEvents_.size();
304      this->outgoingEventsMutex_->unlock();
305    }
306  }
[3214]307
[8327]308
309  incomingEvent Connection::preprocessConnectEvent(ENetEvent& event)
[7801]310  {
[8327]311    // make sure this peer doesn't exist
312    assert( this->peerMap_.find(this->nextPeerID_) == this->peerMap_.end() );
313    assert( this->peerIDMap_.find(event.peer) == this->peerIDMap_.end() );
314   
315    // give peer a new id and increase peerID for next peer
316    uint32_t peerID = this->nextPeerID_;
317    ++this->nextPeerID_;
318   
319    // add peer/peerID into peerMap_ and peerIDMap_
320    this->peerMap_[peerID] = event.peer;
321    this->peerIDMap_[event.peer] = peerID;
322   
323    // create new peerEvent and return it
[11071]324    incomingEvent inEvent = { peerID, IncomingEventType::peerConnect, nullptr };
[8327]325    return inEvent;
[3214]326  }
[7801]327 
[8327]328  incomingEvent Connection::preprocessDisconnectEvent(ENetEvent& event)
329  {
330    // assert that the peer exists and get peerID
331    assert( this->peerIDMap_.find(event.peer) != this->peerIDMap_.end() );
332    uint32_t peerID = this->peerIDMap_[event.peer];
333   
334    // remove peer/peerID from maps
335    this->peerIDMap_.erase(this->peerIDMap_.find(event.peer));
336    this->peerMap_.erase(this->peerMap_.find(peerID));
337   
338    // create new peerEvent and return it
[11071]339    incomingEvent inEvent = { peerID, IncomingEventType::peerDisconnect, nullptr };
[8327]340    return inEvent;
341  }
342 
343  incomingEvent Connection::preprocessReceiveEvent(ENetEvent& event)
344  {
345    // assert that the peer exists and get peerID
346    assert( this->peerIDMap_.find(event.peer) != this->peerIDMap_.end() );
347    uint32_t peerID = this->peerIDMap_[event.peer];
348   
349    // create new Packet from ENetPacket
350    packet::Packet* p = packet::Packet::createPacket(event.packet, peerID);
351   
352    // create new peerEvent and return it
[11071]353    incomingEvent inEvent = { peerID, IncomingEventType::receivePacket, p };
[8327]354    return inEvent;
355  }
356
357 
[7801]358  void Connection::enableCompression()
359  {
360    enet_host_compress_with_range_coder( this->host_ );
361  }
[3214]362
[7801]363
[3214]364}
Note: See TracBrowser for help on using the repository browser.