Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

source: downloads/boost_1_34_1/libs/thread/src/condition.cpp @ 29

Last change on this file since 29 was 29, checked in by landauf, 16 years ago

updated boost from 1_33_1 to 1_34_1

File size: 17.7 KB
Line 
1// Copyright (C) 2001-2003
2// William E. Kempf
3//
4//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6
7#include <boost/thread/detail/config.hpp>
8
9#include <boost/thread/condition.hpp>
10#include <boost/thread/xtime.hpp>
11#include <boost/thread/thread.hpp>
12#include <boost/thread/exceptions.hpp>
13#include <boost/limits.hpp>
14#include <cassert>
15#include "timeconv.inl"
16
17#if defined(BOOST_HAS_WINTHREADS)
18#   ifndef NOMINMAX
19#      define NOMINMAX
20#   endif
21#   include <windows.h>
22#elif defined(BOOST_HAS_PTHREADS)
23#   include <errno.h>
24#elif defined(BOOST_HAS_MPTASKS)
25#   include <MacErrors.h>
26#   include "mac/init.hpp"
27#   include "mac/safe.hpp"
28#endif
29
30// The following include can be removed after the bug on QNX
31// has been tracked down. I need this only for debugging
32//#if !defined(NDEBUG) && defined(BOOST_HAS_PTHREADS)
33#include <iostream>
34//#endif
35
36namespace boost {
37
38namespace detail {
39
40#if defined(BOOST_HAS_WINTHREADS)
41condition_impl::condition_impl()
42    : m_gone(0), m_blocked(0), m_waiting(0)
43{
44    m_gate = reinterpret_cast<void*>(CreateSemaphore(0, 1, 1, 0));
45    m_queue = reinterpret_cast<void*>(
46        CreateSemaphore(0, 0, (std::numeric_limits<long>::max)(), 0));
47    m_mutex = reinterpret_cast<void*>(CreateMutex(0, 0, 0));
48
49    if (!m_gate || !m_queue || !m_mutex)
50    {
51        int res = 0;
52        if (m_gate)
53        {
54            res = CloseHandle(reinterpret_cast<HANDLE>(m_gate));
55            assert(res);
56        }
57        if (m_queue)
58        {
59            res = CloseHandle(reinterpret_cast<HANDLE>(m_queue));
60            assert(res);
61        }
62        if (m_mutex)
63        {
64            res = CloseHandle(reinterpret_cast<HANDLE>(m_mutex));
65            assert(res);
66        }
67
68        throw thread_resource_error();
69    }
70}
71
72condition_impl::~condition_impl()
73{
74    int res = 0;
75    res = CloseHandle(reinterpret_cast<HANDLE>(m_gate));
76    assert(res);
77    res = CloseHandle(reinterpret_cast<HANDLE>(m_queue));
78    assert(res);
79    res = CloseHandle(reinterpret_cast<HANDLE>(m_mutex));
80    assert(res);
81}
82
83void condition_impl::notify_one()
84{
85    unsigned signals = 0;
86
87    int res = 0;
88    res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_mutex), INFINITE);
89    assert(res == WAIT_OBJECT_0);
90
91    if (m_waiting != 0) // the m_gate is already closed
92    {
93        if (m_blocked == 0)
94        {
95            res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex));
96            assert(res);
97            return;
98        }
99
100        ++m_waiting;
101        --m_blocked;
102        signals = 1;
103    }
104    else
105    {
106        res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_gate), INFINITE);
107        assert(res == WAIT_OBJECT_0);
108        if (m_blocked > m_gone)
109        {
110            if (m_gone != 0)
111            {
112                m_blocked -= m_gone;
113                m_gone = 0;
114            }
115            signals = m_waiting = 1;
116            --m_blocked;
117        }
118        else
119        {
120            res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
121            assert(res);
122        }
123    }
124
125    res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex));
126    assert(res);
127
128    if (signals)
129    {
130        res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_queue), signals, 0);
131        assert(res);
132    }
133}
134
135void condition_impl::notify_all()
136{
137    unsigned signals = 0;
138
139    int res = 0;
140    res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_mutex), INFINITE);
141    assert(res == WAIT_OBJECT_0);
142
143    if (m_waiting != 0) // the m_gate is already closed
144    {
145        if (m_blocked == 0)
146        {
147            res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex));
148            assert(res);
149            return;
150        }
151
152        m_waiting += (signals = m_blocked);
153        m_blocked = 0;
154    }
155    else
156    {
157        res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_gate), INFINITE);
158        assert(res == WAIT_OBJECT_0);
159        if (m_blocked > m_gone)
160        {
161            if (m_gone != 0)
162            {
163                m_blocked -= m_gone;
164                m_gone = 0;
165            }
166            signals = m_waiting = m_blocked;
167            m_blocked = 0;
168        }
169        else
170        {
171            res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
172            assert(res);
173        }
174    }
175
176    res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex));
177    assert(res);
178
179    if (signals)
180    {
181        res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_queue), signals, 0);
182        assert(res);
183    }
184}
185
186void condition_impl::enter_wait()
187{
188    int res = 0;
189    res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_gate), INFINITE);
190    assert(res == WAIT_OBJECT_0);
191    ++m_blocked;
192    res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
193    assert(res);
194}
195
196void condition_impl::do_wait()
197{
198    int res = 0;
199    res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue), INFINITE);
200    assert(res == WAIT_OBJECT_0);
201
202    unsigned was_waiting=0;
203    unsigned was_gone=0;
204
205    res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_mutex), INFINITE);
206    assert(res == WAIT_OBJECT_0);
207    was_waiting = m_waiting;
208    was_gone = m_gone;
209    if (was_waiting != 0)
210    {
211        if (--m_waiting == 0)
212        {
213            if (m_blocked != 0)
214            {
215                res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1,
216                    0); // open m_gate
217                assert(res);
218                was_waiting = 0;
219            }
220            else if (m_gone != 0)
221                m_gone = 0;
222        }
223    }
224    else if (++m_gone == ((std::numeric_limits<unsigned>::max)() / 2))
225    {
226        // timeout occured, normalize the m_gone count
227        // this may occur if many calls to wait with a timeout are made and
228        // no call to notify_* is made
229        res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_gate), INFINITE);
230        assert(res == WAIT_OBJECT_0);
231        m_blocked -= m_gone;
232        res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
233        assert(res);
234        m_gone = 0;
235    }
236    res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex));
237    assert(res);
238
239    if (was_waiting == 1)
240    {
241        for (/**/ ; was_gone; --was_gone)
242        {
243            // better now than spurious later
244            res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue),
245                INFINITE);
246            assert(res == WAIT_OBJECT_0);
247        }
248        res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
249        assert(res);
250    }
251}
252
253bool condition_impl::do_timed_wait(const xtime& xt)
254{
255    bool ret = false;
256    unsigned int res = 0;
257
258    for (;;)
259    {
260        int milliseconds;
261        to_duration(xt, milliseconds);
262
263        res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue),
264            milliseconds);
265        assert(res != WAIT_FAILED && res != WAIT_ABANDONED);
266        ret = (res == WAIT_OBJECT_0);
267
268        if (res == WAIT_TIMEOUT)
269        {
270            xtime cur;
271            xtime_get(&cur, TIME_UTC);
272            if (xtime_cmp(xt, cur) > 0)
273                continue;
274        }
275
276        break;
277    }
278
279    unsigned was_waiting=0;
280    unsigned was_gone=0;
281
282    res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_mutex), INFINITE);
283    assert(res == WAIT_OBJECT_0);
284    was_waiting = m_waiting;
285    was_gone = m_gone;
286    if (was_waiting != 0)
287    {
288        if (!ret) // timeout
289        {
290            if (m_blocked != 0)
291                --m_blocked;
292            else
293                ++m_gone; // count spurious wakeups
294        }
295        if (--m_waiting == 0)
296        {
297            if (m_blocked != 0)
298            {
299                res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1,
300                    0); // open m_gate
301                assert(res);
302                was_waiting = 0;
303            }
304            else if (m_gone != 0)
305                m_gone = 0;
306        }
307    }
308    else if (++m_gone == ((std::numeric_limits<unsigned>::max)() / 2))
309    {
310        // timeout occured, normalize the m_gone count
311        // this may occur if many calls to wait with a timeout are made and
312        // no call to notify_* is made
313        res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_gate), INFINITE);
314        assert(res == WAIT_OBJECT_0);
315        m_blocked -= m_gone;
316        res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
317        assert(res);
318        m_gone = 0;
319    }
320    res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex));
321    assert(res);
322
323    if (was_waiting == 1)
324    {
325        for (/**/ ; was_gone; --was_gone)
326        {
327            // better now than spurious later
328            res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue),
329                INFINITE);
330            assert(res ==  WAIT_OBJECT_0);
331        }
332        res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
333        assert(res);
334    }
335
336    return ret;
337}
338#elif defined(BOOST_HAS_PTHREADS)
339condition_impl::condition_impl()
340{
341    int res = 0;
342    res = pthread_cond_init(&m_condition, 0);
343    if (res != 0)
344        throw thread_resource_error();
345}
346
347condition_impl::~condition_impl()
348{
349    int res = 0;
350    res = pthread_cond_destroy(&m_condition);
351    assert(res == 0);
352}
353
354void condition_impl::notify_one()
355{
356    int res = 0;
357    res = pthread_cond_signal(&m_condition);
358    assert(res == 0);
359}
360
361void condition_impl::notify_all()
362{
363    int res = 0;
364    res = pthread_cond_broadcast(&m_condition);
365    assert(res == 0);
366}
367
368void condition_impl::do_wait(pthread_mutex_t* pmutex)
369{
370    int res = 0;
371    res = pthread_cond_wait(&m_condition, pmutex);
372    assert(res == 0);
373}
374
375bool condition_impl::do_timed_wait(const xtime& xt, pthread_mutex_t* pmutex)
376{
377    timespec ts;
378    to_timespec(xt, ts);
379
380    int res = 0;
381    res = pthread_cond_timedwait(&m_condition, pmutex, &ts);
382// Test code for QNX debugging, to get information during regressions
383#ifndef NDEBUG
384    if (res == EINVAL) {
385        boost::xtime now;
386        boost::xtime_get(&now, boost::TIME_UTC);
387        std::cerr << "now: " << now.sec << " " << now.nsec << std::endl;
388        std::cerr << "time: " << time(0) << std::endl;
389        std::cerr << "xtime: " << xt.sec << " " << xt.nsec << std::endl;
390        std::cerr << "ts: " << ts.tv_sec << " " << ts.tv_nsec << std::endl;
391        std::cerr << "pmutex: " << pmutex << std::endl;
392        std::cerr << "condition: " << &m_condition << std::endl;
393        assert(res != EINVAL);
394    }
395#endif   
396    assert(res == 0 || res == ETIMEDOUT);
397
398    return res != ETIMEDOUT;
399}
400#elif defined(BOOST_HAS_MPTASKS)
401
402using threads::mac::detail::safe_enter_critical_region;
403using threads::mac::detail::safe_wait_on_semaphore;
404
405condition_impl::condition_impl()
406    : m_gone(0), m_blocked(0), m_waiting(0)
407{
408    threads::mac::detail::thread_init();
409
410    OSStatus lStatus = noErr;
411
412    lStatus = MPCreateSemaphore(1, 1, &m_gate);
413    if(lStatus == noErr)
414        lStatus = MPCreateSemaphore(ULONG_MAX, 0, &m_queue);
415
416    if(lStatus != noErr || !m_gate || !m_queue)
417    {
418        if (m_gate)
419        {
420            lStatus = MPDeleteSemaphore(m_gate);
421            assert(lStatus == noErr);
422        }
423        if (m_queue)
424        {
425            lStatus = MPDeleteSemaphore(m_queue);
426            assert(lStatus == noErr);
427        }
428
429        throw thread_resource_error();
430    }
431}
432
433condition_impl::~condition_impl()
434{
435    OSStatus lStatus = noErr;
436    lStatus = MPDeleteSemaphore(m_gate);
437    assert(lStatus == noErr);
438    lStatus = MPDeleteSemaphore(m_queue);
439    assert(lStatus == noErr);
440}
441
442void condition_impl::notify_one()
443{
444    unsigned signals = 0;
445
446    OSStatus lStatus = noErr;
447    lStatus = safe_enter_critical_region(m_mutex, kDurationForever,
448        m_mutex_mutex);
449    assert(lStatus == noErr);
450
451    if (m_waiting != 0) // the m_gate is already closed
452    {
453        if (m_blocked == 0)
454        {
455            lStatus = MPExitCriticalRegion(m_mutex);
456            assert(lStatus == noErr);
457            return;
458        }
459
460        ++m_waiting;
461        --m_blocked;
462    }
463    else
464    {
465        lStatus = safe_wait_on_semaphore(m_gate, kDurationForever);
466        assert(lStatus == noErr);
467        if (m_blocked > m_gone)
468        {
469            if (m_gone != 0)
470            {
471                m_blocked -= m_gone;
472                m_gone = 0;
473            }
474            signals = m_waiting = 1;
475            --m_blocked;
476        }
477        else
478        {
479            lStatus = MPSignalSemaphore(m_gate);
480            assert(lStatus == noErr);
481        }
482
483        lStatus = MPExitCriticalRegion(m_mutex);
484        assert(lStatus == noErr);
485
486        while (signals)
487        {
488            lStatus = MPSignalSemaphore(m_queue);
489            assert(lStatus == noErr);
490            --signals;
491        }
492    }
493}
494
495void condition_impl::notify_all()
496{
497    unsigned signals = 0;
498
499    OSStatus lStatus = noErr;
500    lStatus = safe_enter_critical_region(m_mutex, kDurationForever,
501        m_mutex_mutex);
502    assert(lStatus == noErr);
503
504    if (m_waiting != 0) // the m_gate is already closed
505    {
506        if (m_blocked == 0)
507        {
508            lStatus = MPExitCriticalRegion(m_mutex);
509            assert(lStatus == noErr);
510            return;
511        }
512
513        m_waiting += (signals = m_blocked);
514        m_blocked = 0;
515    }
516    else
517    {
518        lStatus = safe_wait_on_semaphore(m_gate, kDurationForever);
519        assert(lStatus == noErr);
520        if (m_blocked > m_gone)
521        {
522            if (m_gone != 0)
523            {
524                m_blocked -= m_gone;
525                m_gone = 0;
526            }
527            signals = m_waiting = m_blocked;
528            m_blocked = 0;
529        }
530        else
531        {
532            lStatus = MPSignalSemaphore(m_gate);
533            assert(lStatus == noErr);
534        }
535
536        lStatus = MPExitCriticalRegion(m_mutex);
537        assert(lStatus == noErr);
538
539        while (signals)
540        {
541            lStatus = MPSignalSemaphore(m_queue);
542            assert(lStatus == noErr);
543            --signals;
544        }
545    }
546}
547
548void condition_impl::enter_wait()
549{
550    OSStatus lStatus = noErr;
551    lStatus = safe_wait_on_semaphore(m_gate, kDurationForever);
552    assert(lStatus == noErr);
553    ++m_blocked;
554    lStatus = MPSignalSemaphore(m_gate);
555    assert(lStatus == noErr);
556}
557
558void condition_impl::do_wait()
559{
560    OSStatus lStatus = noErr;
561    lStatus = safe_wait_on_semaphore(m_queue, kDurationForever);
562    assert(lStatus == noErr);
563
564    unsigned was_waiting=0;
565    unsigned was_gone=0;
566
567    lStatus = safe_enter_critical_region(m_mutex, kDurationForever,
568        m_mutex_mutex);
569    assert(lStatus == noErr);
570    was_waiting = m_waiting;
571    was_gone = m_gone;
572    if (was_waiting != 0)
573    {
574        if (--m_waiting == 0)
575        {
576            if (m_blocked != 0)
577            {
578                lStatus = MPSignalSemaphore(m_gate); // open m_gate
579                assert(lStatus == noErr);
580                was_waiting = 0;
581            }
582            else if (m_gone != 0)
583                m_gone = 0;
584        }
585    }
586    else if (++m_gone == ((std::numeric_limits<unsigned>::max)() / 2))
587    {
588        // timeout occured, normalize the m_gone count
589        // this may occur if many calls to wait with a timeout are made and
590        // no call to notify_* is made
591        lStatus = safe_wait_on_semaphore(m_gate, kDurationForever);
592        assert(lStatus == noErr);
593        m_blocked -= m_gone;
594        lStatus = MPSignalSemaphore(m_gate);
595        assert(lStatus == noErr);
596        m_gone = 0;
597    }
598    lStatus = MPExitCriticalRegion(m_mutex);
599    assert(lStatus == noErr);
600
601    if (was_waiting == 1)
602    {
603        for (/**/ ; was_gone; --was_gone)
604        {
605            // better now than spurious later
606            lStatus = safe_wait_on_semaphore(m_queue, kDurationForever);
607            assert(lStatus == noErr);
608        }
609        lStatus = MPSignalSemaphore(m_gate);
610        assert(lStatus == noErr);
611    }
612}
613
614bool condition_impl::do_timed_wait(const xtime& xt)
615{
616    int milliseconds;
617    to_duration(xt, milliseconds);
618
619    OSStatus lStatus = noErr;
620    lStatus = safe_wait_on_semaphore(m_queue, milliseconds);
621    assert(lStatus == noErr || lStatus == kMPTimeoutErr);
622
623    bool ret = (lStatus == noErr);
624
625    unsigned was_waiting=0;
626    unsigned was_gone=0;
627
628    lStatus = safe_enter_critical_region(m_mutex, kDurationForever,
629        m_mutex_mutex);
630    assert(lStatus == noErr);
631    was_waiting = m_waiting;
632    was_gone = m_gone;
633    if (was_waiting != 0)
634    {
635        if (!ret) // timeout
636        {
637            if (m_blocked != 0)
638                --m_blocked;
639            else
640                ++m_gone; // count spurious wakeups
641        }
642        if (--m_waiting == 0)
643        {
644            if (m_blocked != 0)
645            {
646                lStatus = MPSignalSemaphore(m_gate); // open m_gate
647                assert(lStatus == noErr);
648                was_waiting = 0;
649            }
650            else if (m_gone != 0)
651                m_gone = 0;
652        }
653    }
654    else if (++m_gone == ((std::numeric_limits<unsigned>::max)() / 2))
655    {
656        // timeout occured, normalize the m_gone count
657        // this may occur if many calls to wait with a timeout are made and
658        // no call to notify_* is made
659        lStatus = safe_wait_on_semaphore(m_gate, kDurationForever);
660        assert(lStatus == noErr);
661        m_blocked -= m_gone;
662        lStatus = MPSignalSemaphore(m_gate);
663        assert(lStatus == noErr);
664        m_gone = 0;
665    }
666    lStatus = MPExitCriticalRegion(m_mutex);
667    assert(lStatus == noErr);
668
669    if (was_waiting == 1)
670    {
671        for (/**/ ; was_gone; --was_gone)
672        {
673            // better now than spurious later
674            lStatus = safe_wait_on_semaphore(m_queue, kDurationForever);
675            assert(lStatus == noErr);
676        }
677        lStatus = MPSignalSemaphore(m_gate);
678        assert(lStatus == noErr);
679    }
680
681    return ret;
682}
683#endif
684
685} // namespace detail
686
687} // namespace boost
688
689// Change Log:
690//    8 Feb 01  WEKEMPF Initial version.
691//   22 May 01  WEKEMPF Modified to use xtime for time outs.
692//    3 Jan 03  WEKEMPF Modified for DLL implementation.
Note: See TracBrowser for help on using the repository browser.