Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

source: downloads/ogre_src_v1-9-0/OgreMain/include/OgreWorkQueue.h @ 151

Last change on this file since 151 was 148, checked in by patricwi, 6 years ago

Added new dependencies for ogre1.9 and cegui0.8

File size: 22.6 KB
Line 
1/*
2-----------------------------------------------------------------------------
3This source file is part of OGRE
4(Object-oriented Graphics Rendering Engine)
5For the latest info, see http://www.ogre3d.org/
6
7Copyright (c) 2000-2013 Torus Knot Software Ltd
8
9Permission is hereby granted, free of charge, to any person obtaining a copy
10of this software and associated documentation files (the "Software"), to deal
11in the Software without restriction, including without limitation the rights
12to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13copies of the Software, and to permit persons to whom the Software is
14furnished to do so, subject to the following conditions:
15
16The above copyright notice and this permission notice shall be included in
17all copies or substantial portions of the Software.
18
19THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
25THE SOFTWARE.
26-----------------------------------------------------------------------------
27*/
28#ifndef __OgreWorkQueue_H__
29#define __OgreWorkQueue_H__
30
31#include "OgrePrerequisites.h"
32#include "OgreAny.h"
33#include "OgreSharedPtr.h"
34#include "Threading/OgreThreadHeaders.h"
35#include "OgreHeaderPrefix.h"
36
37namespace Ogre
38{
39        /** \addtogroup Core
40        *  @{
41        */
42        /** \addtogroup General
43        *  @{
44        */
45
46        /** Interface to a general purpose request / response style background work queue.
47        @remarks
48                A work queue is a simple structure, where requests for work are placed
49                onto the queue, then removed by a worker for processing, then finally
50                a response is placed on the result queue for the originator to pick up
51                at their leisure. The typical use for this is in a threaded environment,
52                although any kind of deferred processing could use this approach to
53                decouple and distribute work over a period of time even
54                if it was single threaded.
55        @par
56                WorkQueues also incorporate thread pools. One or more background worker threads
57                can wait on the queue and be notified when a request is waiting to be
58                processed. For maximal thread usage, a WorkQueue instance should be shared
59                among many sources of work, rather than many work queues being created.
60                This way, you can share a small number of hardware threads among a large
61                number of background tasks. This doesn't mean you have to implement all the
62                request processing in one class, you can plug in many handlers in order to
63                process the requests.
64        @par
65                This is an abstract interface definition; users can subclass this and
66                provide their own implementation if required to centralise task management
67                in their own subsystems. We also provide a default implementation in the
68                form of DefaultWorkQueue.
69        */
70        class _OgreExport WorkQueue : public UtilityAlloc
71        {
72        protected:
73                typedef map<String, uint16>::type ChannelMap;
74                ChannelMap mChannelMap;
75                uint16 mNextChannel;
76                OGRE_MUTEX(mChannelMapMutex);
77        public:
78                /// Numeric identifier for a request
79                typedef unsigned long long int RequestID;
80
81                /** General purpose request structure.
82                */
83                class _OgreExport Request : public UtilityAlloc
84                {
85                        friend class WorkQueue;
86                protected:
87                        /// The request channel, as an integer
88                        uint16 mChannel;
89                        /// The request type, as an integer within the channel (user can define enumerations on this)
90                        uint16 mType;
91                        /// The details of the request (user defined)
92                        Any mData;
93                        /// Retry count - set this to non-zero to have the request try again on failure
94                        uint8 mRetryCount;
95                        /// Identifier (assigned by the system)
96                        RequestID mID;
97                        /// Abort Flag
98                        mutable bool mAborted;
99
100                public:
101                        /// Constructor
102                        Request(uint16 channel, uint16 rtype, const Any& rData, uint8 retry, RequestID rid);
103                        ~Request();
104                        /// Set the abort flag
105                        void abortRequest() const { mAborted = true; }
106                        /// Get the request channel (top level categorisation)
107                        uint16 getChannel() const { return mChannel; }
108                        /// Get the type of this request within the given channel
109                        uint16 getType() const { return mType; }
110                        /// Get the user details of this request
111                        const Any& getData() const { return mData; }
112                        /// Get the remaining retry count
113                        uint8 getRetryCount() const { return mRetryCount; }
114                        /// Get the identifier of this request
115                        RequestID getID() const { return mID; }
116                        /// Get the abort flag
117                        bool getAborted() const { return mAborted; }
118                };
119
120                /** General purpose response structure.
121                */
122                struct _OgreExport Response : public UtilityAlloc
123                {
124                        /// Pointer to the request that this response is in relation to
125                        const Request* mRequest;
126                        /// Whether the work item succeeded or not
127                        bool mSuccess;
128                        /// Any diagnostic messages
129                        String mMessages;
130                        /// Data associated with the result of the process
131                        Any mData;
132
133                public:
134                        Response(const Request* rq, bool success, const Any& data, const String& msg = StringUtil::BLANK);
135                        ~Response();
136                        /// Get the request that this is a response to (NB destruction destroys this)
137                        const Request* getRequest() const { return mRequest; }
138                        /// Return whether this is a successful response
139                        bool succeeded() const { return mSuccess; }
140                        /// Get any diagnostic messages about the process
141                        const String& getMessages() const { return mMessages; }
142                        /// Return the response data (user defined, only valid on success)
143                        const Any& getData() const { return mData; }
144                        /// Abort the request
145                        void abortRequest() { mRequest->abortRequest(); mData.destroy(); }
146                };
147
148                /** Interface definition for a handler of requests.
149                @remarks
150                User classes are expected to implement this interface in order to
151                process requests on the queue. It's important to realise that
152                the calls to this class may be in a separate thread to the main
153                render context, and as such it may not be possible to make
154                rendersystem or other GPU-dependent calls in this handler. You can only
155                do so if the queue was created with 'workersCanAccessRenderSystem'
156                set to true, and OGRE_THREAD_SUPPORT=1, but this puts extra strain
157                on the thread safety of the render system and is not recommended.
158                It is best to perform CPU-side work in these handlers and let the
159                response handler transfer results to the GPU in the main render thread.
160                */
161                class _OgreExport RequestHandler
162                {
163                public:
164                        RequestHandler() {}
165                        virtual ~RequestHandler() {}
166
167                        /** Return whether this handler can process a given request.
168                        @remarks
169                        Defaults to true, but if you wish to add several handlers each of
170                        which deal with different types of request, you can override
171                        this method.
172                        */
173                        virtual bool canHandleRequest(const Request* req, const WorkQueue* srcQ) 
174                        { (void)srcQ; return !req->getAborted(); }
175
176                        /** The handler method every subclass must implement.
177                        If a failure is encountered, return a Response with a failure
178                        result rather than raise an exception.
179                        @param req The Request structure, which is effectively owned by the
180                        handler during this call. It must be attached to the returned
181                        Response regardless of success or failure.
182                        @param srcQ The work queue that this request originated from
183                        @return Pointer to a Response object - the caller is responsible
184                        for deleting the object.
185                        */
186                        virtual Response* handleRequest(const Request* req, const WorkQueue* srcQ) = 0;
187                };
188
189                /** Interface definition for a handler of responses.
190                @remarks
191                User classes are expected to implement this interface in order to
192                process responses from the queue. All calls to this class will be
193                in the main render thread and thus all GPU resources will be
194                available.
195                */
196                class _OgreExport ResponseHandler
197                {
198                public:
199                        ResponseHandler() {}
200                        virtual ~ResponseHandler() {}
201
202                        /** Return whether this handler can process a given response.
203                        @remarks
204                        Defaults to true, but if you wish to add several handlers each of
205                        which deal with different types of response, you can override
206                        this method.
207                        */
208                        virtual bool canHandleResponse(const Response* res, const WorkQueue* srcQ) 
209                        { (void)srcQ; return !res->getRequest()->getAborted(); }
210
211                        /** The handler method every subclass must implement.
212                        @param res The Response structure. The caller is responsible for
213                        deleting this after the call is made, none of the data contained
214                        (except pointers to structures in user Any data) will persist
215                        after this call is returned.
216                        @param srcQ The work queue that this request originated from
217                        */
218                        virtual void handleResponse(const Response* res, const WorkQueue* srcQ) = 0;
219                };
220
221                WorkQueue() : mNextChannel(0) {}
222                virtual ~WorkQueue() {}
223
224                /** Start up the queue with the options that have been set.
225                @param forceRestart If the queue is already running, whether to shut it
226                        down and restart.
227                */
228                virtual void startup(bool forceRestart = true) = 0;
229                /** Add a request handler instance to the queue.
230                @remarks
231                        Every queue must have at least one request handler instance for each
232                        channel in which requests are raised. If you
233                        add more than one handler per channel, then you must implement canHandleRequest
234                        differently     in each if you wish them to respond to different requests.
235                @param channel The channel for requests you want to handle
236                @param rh Your handler
237                */
238                virtual void addRequestHandler(uint16 channel, RequestHandler* rh) = 0;
239                /** Remove a request handler. */
240                virtual void removeRequestHandler(uint16 channel, RequestHandler* rh) = 0;
241
242                /** Add a response handler instance to the queue.
243                @remarks
244                        Every queue must have at least one response handler instance for each
245                        channel in which requests are raised. If you add more than one, then you
246                        must implement canHandleResponse differently in each if you wish them
247                        to respond to different responses.
248                @param channel The channel for responses you want to handle
249                @param rh Your handler
250                */
251                virtual void addResponseHandler(uint16 channel, ResponseHandler* rh) = 0;
252                /** Remove a Response handler. */
253                virtual void removeResponseHandler(uint16 channel, ResponseHandler* rh) = 0;
254
255                /** Add a new request to the queue.
256                @param channel The channel this request will go into = 0; the channel is the top-level
257                        categorisation of the request
258                @param requestType An identifier that's unique within this queue which
259                        identifies the type of the request (user decides the actual value)
260                @param rData The data required by the request process.
261                @param retryCount The number of times the request should be retried
262                        if it fails.
263                @param forceSynchronous Forces the request to be processed immediately
264                        even if threading is enabled.
265                @param idleThread Request should be processed on the idle thread.
266                    Idle requests will be processed on a single worker thread. You should use this in the following situations:
267                        1. If a request handler can't process multiple requests in parallel.
268                        2. If you add lot of requests, but you want to keep the game fast.
269                        3. If you have lot of more important threads. (example: physics).
270                @return The ID of the request that has been added
271                */
272                virtual RequestID addRequest(uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount = 0, 
273                        bool forceSynchronous = false, bool idleThread = false) = 0;
274
275                /** Abort a previously issued request.
276                If the request is still waiting to be processed, it will be
277                removed from the queue.
278                @param id The ID of the previously issued request.
279                */
280                virtual void abortRequest(RequestID id) = 0;
281
282                /** Abort all previously issued requests in a given channel.
283                Any requests still waiting to be processed of the given channel, will be
284                removed from the queue.
285                Requests which are processed, but response handler is not called will also be removed.
286                @param channel The type of request to be aborted
287                */
288                virtual void abortRequestsByChannel(uint16 channel) = 0;
289
290                /** Abort all previously issued requests in a given channel.
291                Any requests still waiting to be processed of the given channel, will be
292                removed from the queue.
293                It will not remove requests, where the request handler is already called.
294                @param channel The type of request to be aborted
295                */
296                virtual void abortPendingRequestsByChannel(uint16 channel) = 0;
297
298                /** Abort all previously issued requests.
299                Any requests still waiting to be processed will be removed from the queue.
300                Any requests that are being processed will still complete.
301                */
302                virtual void abortAllRequests() = 0;
303               
304                /** Set whether to pause further processing of any requests.
305                If true, any further requests will simply be queued and not processed until
306                setPaused(false) is called. Any requests which are in the process of being
307                worked on already will still continue.
308                */
309                virtual void setPaused(bool pause) = 0;
310                /// Return whether the queue is paused ie not sending more work to workers
311                virtual bool isPaused() const = 0;
312
313                /** Set whether to accept new requests or not.
314                If true, requests are added to the queue as usual. If false, requests
315                are silently ignored until setRequestsAccepted(true) is called.
316                */
317                virtual void setRequestsAccepted(bool accept) = 0;
318                /// Returns whether requests are being accepted right now
319                virtual bool getRequestsAccepted() const = 0;
320
321                /** Process the responses in the queue.
322                @remarks
323                        This method is public, and must be called from the main render
324                        thread to 'pump' responses through the system. The method will usually
325                        try to clear all responses before returning = 0; however, you can specify
326                        a time limit on the response processing to limit the impact of
327                        spikes in demand by calling setResponseProcessingTimeLimit.
328                */
329                virtual void processResponses() = 0; 
330
331                /** Get the time limit imposed on the processing of responses in a
332                        single frame, in milliseconds (0 indicates no limit).
333                */
334                virtual unsigned long getResponseProcessingTimeLimit() const = 0;
335
336                /** Set the time limit imposed on the processing of responses in a
337                        single frame, in milliseconds (0 indicates no limit).
338                        This sets the maximum time that will be spent in processResponses() in
339                        a single frame. The default is 8ms.
340                */
341                virtual void setResponseProcessingTimeLimit(unsigned long ms) = 0;
342
343                /** Shut down the queue.
344                */
345                virtual void shutdown() = 0;
346
347                /** Get a channel ID for a given channel name.
348                @remarks
349                        Channels are assigned on a first-come, first-served basis and are
350                        not persistent across application instances. This method allows
351                        applications to not worry about channel clashes through manually
352                        assigned channel numbers.
353                */
354                virtual uint16 getChannel(const String& channelName);
355
356        };
357
358        /** Base for a general purpose request / response style background work queue.
359        */
360        class _OgreExport DefaultWorkQueueBase : public WorkQueue
361        {
362        public:
363
364                /** Constructor.
365                        Call startup() to initialise.
366                @param name Optional name, just helps to identify logging output
367                */
368                DefaultWorkQueueBase(const String& name = StringUtil::BLANK);
369                virtual ~DefaultWorkQueueBase();
370                /// Get the name of the work queue
371                const String& getName() const;
372                /** Get the number of worker threads that this queue will start when
373                        startup() is called.
374                */
375                virtual size_t getWorkerThreadCount() const;
376
377                /** Set the number of worker threads that this queue will start
378                        when startup() is called (default 1).
379                        Calling this will have no effect unless the queue is shut down and
380                        restarted.
381                */
382                virtual void setWorkerThreadCount(size_t c);
383
384                /** Get whether worker threads will be allowed to access render system
385                        resources.
386                        Accessing render system resources from a separate thread can require that
387                        a context is maintained for that thread. Also, it requires that the
388                        render system is running in threadsafe mode, which only happens
389                        when OGRE_THREAD_SUPPORT=1. This option defaults to false, which means
390                        that threads can not use GPU resources, and the render system can
391                        work in non-threadsafe mode, which is more efficient.
392                */
393                virtual bool getWorkersCanAccessRenderSystem() const;
394
395
396                /** Set whether worker threads will be allowed to access render system
397                        resources.
398                        Accessing render system resources from a separate thread can require that
399                        a context is maintained for that thread. Also, it requires that the
400                        render system is running in threadsafe mode, which only happens
401                        when OGRE_THREAD_SUPPORT=1. This option defaults to false, which means
402                        that threads can not use GPU resources, and the render system can
403                        work in non-threadsafe mode, which is more efficient.
404                        Calling this will have no effect unless the queue is shut down and
405                        restarted.
406                */
407                virtual void setWorkersCanAccessRenderSystem(bool access);
408
409                /** Process the next request on the queue.
410                @remarks
411                        This method is public, but only intended for advanced users to call.
412                        The only reason you would call this, is if you were using your
413                        own thread to drive the worker processing. The thread calling this
414                        method will be the thread used to call the RequestHandler.
415                */
416                virtual void _processNextRequest();
417
418                /// Main function for each thread spawned.
419                virtual void _threadMain() = 0;
420
421                /** Returns whether the queue is trying to shut down. */
422                virtual bool isShuttingDown() const { return mShuttingDown; }
423
424                /// @copydoc WorkQueue::addRequestHandler
425                virtual void addRequestHandler(uint16 channel, RequestHandler* rh);
426                /// @copydoc WorkQueue::removeRequestHandler
427                virtual void removeRequestHandler(uint16 channel, RequestHandler* rh);
428                /// @copydoc WorkQueue::addResponseHandler
429                virtual void addResponseHandler(uint16 channel, ResponseHandler* rh);
430                /// @copydoc WorkQueue::removeResponseHandler
431                virtual void removeResponseHandler(uint16 channel, ResponseHandler* rh);
432
433                /// @copydoc WorkQueue::addRequest
434                virtual RequestID addRequest(uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount = 0, 
435                        bool forceSynchronous = false, bool idleThread = false);
436                /// @copydoc WorkQueue::abortRequest
437                virtual void abortRequest(RequestID id);
438                /// @copydoc WorkQueue::abortRequestsByChannel
439                virtual void abortRequestsByChannel(uint16 channel);
440                /// @copydoc WorkQueue::abortPendingRequestsByChannel
441                virtual void abortPendingRequestsByChannel(uint16 channel);
442                /// @copydoc WorkQueue::abortAllRequests
443                virtual void abortAllRequests();
444                /// @copydoc WorkQueue::setPaused
445                virtual void setPaused(bool pause);
446                /// @copydoc WorkQueue::isPaused
447                virtual bool isPaused() const;
448                /// @copydoc WorkQueue::setRequestsAccepted
449                virtual void setRequestsAccepted(bool accept);
450                /// @copydoc WorkQueue::getRequestsAccepted
451                virtual bool getRequestsAccepted() const;
452                /// @copydoc WorkQueue::processResponses
453                virtual void processResponses(); 
454                /// @copydoc WorkQueue::getResponseProcessingTimeLimit
455                virtual unsigned long getResponseProcessingTimeLimit() const { return mResposeTimeLimitMS; }
456                /// @copydoc WorkQueue::setResponseProcessingTimeLimit
457                virtual void setResponseProcessingTimeLimit(unsigned long ms) { mResposeTimeLimitMS = ms; }
458        protected:
459                String mName;
460                size_t mWorkerThreadCount;
461                bool mWorkerRenderSystemAccess;
462                bool mIsRunning;
463                unsigned long mResposeTimeLimitMS;
464
465                typedef deque<Request*>::type RequestQueue;
466                typedef deque<Response*>::type ResponseQueue;
467                RequestQueue mRequestQueue; // Guarded by mRequestMutex
468                RequestQueue mProcessQueue; // Guarded by mProcessMutex
469                ResponseQueue mResponseQueue; // Guarded by mResponseMutex
470
471                /// Thread function
472                struct _OgreExport WorkerFunc OGRE_THREAD_WORKER_INHERIT
473                {
474                        DefaultWorkQueueBase* mQueue;
475
476                        WorkerFunc(DefaultWorkQueueBase* q) 
477                                : mQueue(q) {}
478
479                        void operator()();
480                       
481                        void operator()() const;
482
483                        void run();
484                };
485                WorkerFunc* mWorkerFunc;
486
487                /** Intermediate structure to hold a pointer to a request handler which
488                        provides insurance against the handler itself being disconnected
489                        while the list remains unchanged.
490                */
491                class _OgreExport RequestHandlerHolder : public UtilityAlloc
492                {
493                protected:
494                        OGRE_RW_MUTEX(mRWMutex);
495                        RequestHandler* mHandler;
496                public:
497                        RequestHandlerHolder(RequestHandler* handler)
498                                : mHandler(handler)     {}
499
500                        // Disconnect the handler to allow it to be destroyed
501                        void disconnectHandler()
502                        {
503                                // write lock - must wait for all requests to finish
504                                OGRE_LOCK_RW_MUTEX_WRITE(mRWMutex);
505                                mHandler = 0;
506                        }
507
508                        /** Get handler pointer - note, only use this for == comparison or similar,
509                                do not attempt to call it as it is not thread safe.
510                        */
511                        RequestHandler* getHandler() { return mHandler; }
512
513                        /** Process a request if possible.
514                        @return Valid response if processed, null otherwise
515                        */
516                        Response* handleRequest(const Request* req, const WorkQueue* srcQ)
517                        {
518                                // Read mutex so that multiple requests can be processed by the
519                                // same handler in parallel if required
520                                OGRE_LOCK_RW_MUTEX_READ(mRWMutex);
521                                Response* response = 0;
522                                if (mHandler)
523                                {
524                                        if (mHandler->canHandleRequest(req, srcQ))
525                                        {
526                                                response = mHandler->handleRequest(req, srcQ);
527                                        }
528                                }
529                                return response;
530                        }
531
532                };
533                // Hold these by shared pointer so they can be copied keeping same instance
534                typedef SharedPtr<RequestHandlerHolder> RequestHandlerHolderPtr;
535
536                typedef list<RequestHandlerHolderPtr>::type RequestHandlerList;
537                typedef list<ResponseHandler*>::type ResponseHandlerList;
538                typedef map<uint16, RequestHandlerList>::type RequestHandlerListByChannel;
539                typedef map<uint16, ResponseHandlerList>::type ResponseHandlerListByChannel;
540
541                RequestHandlerListByChannel mRequestHandlers;
542                ResponseHandlerListByChannel mResponseHandlers;
543                RequestID mRequestCount; // Guarded by mRequestMutex
544                bool mPaused;
545                bool mAcceptRequests;
546                bool mShuttingDown;
547
548                //NOTE: If you lock multiple mutexes at the same time, the order is important!
549                // For example if threadA locks mIdleMutex first then tries to lock mProcessMutex,
550                // and threadB locks mProcessMutex first, then mIdleMutex. In this case you can get livelock and the system is dead!
551                //RULE: Lock mProcessMutex before other mutex, to prevent livelocks
552                OGRE_MUTEX(mIdleMutex);
553                OGRE_MUTEX(mRequestMutex);
554                OGRE_MUTEX(mProcessMutex);
555                OGRE_MUTEX(mResponseMutex);
556                OGRE_RW_MUTEX(mRequestHandlerMutex);
557
558
559                void processRequestResponse(Request* r, bool synchronous);
560                Response* processRequest(Request* r);
561                void processResponse(Response* r);
562                /// Notify workers about a new request.
563                virtual void notifyWorkers() = 0;
564                /// Put a Request on the queue with a specific RequestID.
565                void addRequestWithRID(RequestID rid, uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount);
566               
567                RequestQueue mIdleRequestQueue; // Guarded by mIdleMutex
568                bool mIdleThreadRunning; // Guarded by mIdleMutex
569                Request* mIdleProcessed; // Guarded by mProcessMutex
570               
571
572                bool processIdleRequests();
573        };
574
575
576
577
578
579        /** @} */
580        /** @} */
581
582}
583
584#include "OgreHeaderSuffix.h"
585
586#endif
587
Note: See TracBrowser for help on using the repository browser.