1 | /* |
---|
2 | ----------------------------------------------------------------------------- |
---|
3 | This source file is part of OGRE |
---|
4 | (Object-oriented Graphics Rendering Engine) |
---|
5 | For the latest info, see http://www.ogre3d.org/ |
---|
6 | |
---|
7 | Copyright (c) 2000-2013 Torus Knot Software Ltd |
---|
8 | |
---|
9 | Permission is hereby granted, free of charge, to any person obtaining a copy |
---|
10 | of this software and associated documentation files (the "Software"), to deal |
---|
11 | in the Software without restriction, including without limitation the rights |
---|
12 | to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
---|
13 | copies of the Software, and to permit persons to whom the Software is |
---|
14 | furnished to do so, subject to the following conditions: |
---|
15 | |
---|
16 | The above copyright notice and this permission notice shall be included in |
---|
17 | all copies or substantial portions of the Software. |
---|
18 | |
---|
19 | THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
---|
20 | IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
---|
21 | FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
---|
22 | AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
---|
23 | LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
---|
24 | OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
---|
25 | THE SOFTWARE. |
---|
26 | ----------------------------------------------------------------------------- |
---|
27 | */ |
---|
28 | #ifndef __OgreWorkQueue_H__ |
---|
29 | #define __OgreWorkQueue_H__ |
---|
30 | |
---|
31 | #include "OgrePrerequisites.h" |
---|
32 | #include "OgreAny.h" |
---|
33 | #include "OgreSharedPtr.h" |
---|
34 | #include "Threading/OgreThreadHeaders.h" |
---|
35 | #include "OgreHeaderPrefix.h" |
---|
36 | |
---|
37 | namespace Ogre |
---|
38 | { |
---|
39 | /** \addtogroup Core |
---|
40 | * @{ |
---|
41 | */ |
---|
42 | /** \addtogroup General |
---|
43 | * @{ |
---|
44 | */ |
---|
45 | |
---|
46 | /** Interface to a general purpose request / response style background work queue. |
---|
47 | @remarks |
---|
48 | A work queue is a simple structure, where requests for work are placed |
---|
49 | onto the queue, then removed by a worker for processing, then finally |
---|
50 | a response is placed on the result queue for the originator to pick up |
---|
51 | at their leisure. The typical use for this is in a threaded environment, |
---|
52 | although any kind of deferred processing could use this approach to |
---|
53 | decouple and distribute work over a period of time even |
---|
54 | if it was single threaded. |
---|
55 | @par |
---|
56 | WorkQueues also incorporate thread pools. One or more background worker threads |
---|
57 | can wait on the queue and be notified when a request is waiting to be |
---|
58 | processed. For maximal thread usage, a WorkQueue instance should be shared |
---|
59 | among many sources of work, rather than many work queues being created. |
---|
60 | This way, you can share a small number of hardware threads among a large |
---|
61 | number of background tasks. This doesn't mean you have to implement all the |
---|
62 | request processing in one class, you can plug in many handlers in order to |
---|
63 | process the requests. |
---|
64 | @par |
---|
65 | This is an abstract interface definition; users can subclass this and |
---|
66 | provide their own implementation if required to centralise task management |
---|
67 | in their own subsystems. We also provide a default implementation in the |
---|
68 | form of DefaultWorkQueue. |
---|
69 | */ |
---|
70 | class _OgreExport WorkQueue : public UtilityAlloc |
---|
71 | { |
---|
72 | protected: |
---|
73 | typedef map<String, uint16>::type ChannelMap; |
---|
74 | ChannelMap mChannelMap; |
---|
75 | uint16 mNextChannel; |
---|
76 | OGRE_MUTEX(mChannelMapMutex); |
---|
77 | public: |
---|
78 | /// Numeric identifier for a request |
---|
79 | typedef unsigned long long int RequestID; |
---|
80 | |
---|
81 | /** General purpose request structure. |
---|
82 | */ |
---|
83 | class _OgreExport Request : public UtilityAlloc |
---|
84 | { |
---|
85 | friend class WorkQueue; |
---|
86 | protected: |
---|
87 | /// The request channel, as an integer |
---|
88 | uint16 mChannel; |
---|
89 | /// The request type, as an integer within the channel (user can define enumerations on this) |
---|
90 | uint16 mType; |
---|
91 | /// The details of the request (user defined) |
---|
92 | Any mData; |
---|
93 | /// Retry count - set this to non-zero to have the request try again on failure |
---|
94 | uint8 mRetryCount; |
---|
95 | /// Identifier (assigned by the system) |
---|
96 | RequestID mID; |
---|
97 | /// Abort Flag |
---|
98 | mutable bool mAborted; |
---|
99 | |
---|
100 | public: |
---|
101 | /// Constructor |
---|
102 | Request(uint16 channel, uint16 rtype, const Any& rData, uint8 retry, RequestID rid); |
---|
103 | ~Request(); |
---|
104 | /// Set the abort flag |
---|
105 | void abortRequest() const { mAborted = true; } |
---|
106 | /// Get the request channel (top level categorisation) |
---|
107 | uint16 getChannel() const { return mChannel; } |
---|
108 | /// Get the type of this request within the given channel |
---|
109 | uint16 getType() const { return mType; } |
---|
110 | /// Get the user details of this request |
---|
111 | const Any& getData() const { return mData; } |
---|
112 | /// Get the remaining retry count |
---|
113 | uint8 getRetryCount() const { return mRetryCount; } |
---|
114 | /// Get the identifier of this request |
---|
115 | RequestID getID() const { return mID; } |
---|
116 | /// Get the abort flag |
---|
117 | bool getAborted() const { return mAborted; } |
---|
118 | }; |
---|
119 | |
---|
120 | /** General purpose response structure. |
---|
121 | */ |
---|
122 | struct _OgreExport Response : public UtilityAlloc |
---|
123 | { |
---|
124 | /// Pointer to the request that this response is in relation to |
---|
125 | const Request* mRequest; |
---|
126 | /// Whether the work item succeeded or not |
---|
127 | bool mSuccess; |
---|
128 | /// Any diagnostic messages |
---|
129 | String mMessages; |
---|
130 | /// Data associated with the result of the process |
---|
131 | Any mData; |
---|
132 | |
---|
133 | public: |
---|
134 | Response(const Request* rq, bool success, const Any& data, const String& msg = StringUtil::BLANK); |
---|
135 | ~Response(); |
---|
136 | /// Get the request that this is a response to (NB destruction destroys this) |
---|
137 | const Request* getRequest() const { return mRequest; } |
---|
138 | /// Return whether this is a successful response |
---|
139 | bool succeeded() const { return mSuccess; } |
---|
140 | /// Get any diagnostic messages about the process |
---|
141 | const String& getMessages() const { return mMessages; } |
---|
142 | /// Return the response data (user defined, only valid on success) |
---|
143 | const Any& getData() const { return mData; } |
---|
144 | /// Abort the request |
---|
145 | void abortRequest() { mRequest->abortRequest(); mData.destroy(); } |
---|
146 | }; |
---|
147 | |
---|
148 | /** Interface definition for a handler of requests. |
---|
149 | @remarks |
---|
150 | User classes are expected to implement this interface in order to |
---|
151 | process requests on the queue. It's important to realise that |
---|
152 | the calls to this class may be in a separate thread to the main |
---|
153 | render context, and as such it may not be possible to make |
---|
154 | rendersystem or other GPU-dependent calls in this handler. You can only |
---|
155 | do so if the queue was created with 'workersCanAccessRenderSystem' |
---|
156 | set to true, and OGRE_THREAD_SUPPORT=1, but this puts extra strain |
---|
157 | on the thread safety of the render system and is not recommended. |
---|
158 | It is best to perform CPU-side work in these handlers and let the |
---|
159 | response handler transfer results to the GPU in the main render thread. |
---|
160 | */ |
---|
161 | class _OgreExport RequestHandler |
---|
162 | { |
---|
163 | public: |
---|
164 | RequestHandler() {} |
---|
165 | virtual ~RequestHandler() {} |
---|
166 | |
---|
167 | /** Return whether this handler can process a given request. |
---|
168 | @remarks |
---|
169 | Defaults to true, but if you wish to add several handlers each of |
---|
170 | which deal with different types of request, you can override |
---|
171 | this method. |
---|
172 | */ |
---|
173 | virtual bool canHandleRequest(const Request* req, const WorkQueue* srcQ) |
---|
174 | { (void)srcQ; return !req->getAborted(); } |
---|
175 | |
---|
176 | /** The handler method every subclass must implement. |
---|
177 | If a failure is encountered, return a Response with a failure |
---|
178 | result rather than raise an exception. |
---|
179 | @param req The Request structure, which is effectively owned by the |
---|
180 | handler during this call. It must be attached to the returned |
---|
181 | Response regardless of success or failure. |
---|
182 | @param srcQ The work queue that this request originated from |
---|
183 | @return Pointer to a Response object - the caller is responsible |
---|
184 | for deleting the object. |
---|
185 | */ |
---|
186 | virtual Response* handleRequest(const Request* req, const WorkQueue* srcQ) = 0; |
---|
187 | }; |
---|
188 | |
---|
189 | /** Interface definition for a handler of responses. |
---|
190 | @remarks |
---|
191 | User classes are expected to implement this interface in order to |
---|
192 | process responses from the queue. All calls to this class will be |
---|
193 | in the main render thread and thus all GPU resources will be |
---|
194 | available. |
---|
195 | */ |
---|
196 | class _OgreExport ResponseHandler |
---|
197 | { |
---|
198 | public: |
---|
199 | ResponseHandler() {} |
---|
200 | virtual ~ResponseHandler() {} |
---|
201 | |
---|
202 | /** Return whether this handler can process a given response. |
---|
203 | @remarks |
---|
204 | Defaults to true, but if you wish to add several handlers each of |
---|
205 | which deal with different types of response, you can override |
---|
206 | this method. |
---|
207 | */ |
---|
208 | virtual bool canHandleResponse(const Response* res, const WorkQueue* srcQ) |
---|
209 | { (void)srcQ; return !res->getRequest()->getAborted(); } |
---|
210 | |
---|
211 | /** The handler method every subclass must implement. |
---|
212 | @param res The Response structure. The caller is responsible for |
---|
213 | deleting this after the call is made, none of the data contained |
---|
214 | (except pointers to structures in user Any data) will persist |
---|
215 | after this call is returned. |
---|
216 | @param srcQ The work queue that this request originated from |
---|
217 | */ |
---|
218 | virtual void handleResponse(const Response* res, const WorkQueue* srcQ) = 0; |
---|
219 | }; |
---|
220 | |
---|
221 | WorkQueue() : mNextChannel(0) {} |
---|
222 | virtual ~WorkQueue() {} |
---|
223 | |
---|
224 | /** Start up the queue with the options that have been set. |
---|
225 | @param forceRestart If the queue is already running, whether to shut it |
---|
226 | down and restart. |
---|
227 | */ |
---|
228 | virtual void startup(bool forceRestart = true) = 0; |
---|
229 | /** Add a request handler instance to the queue. |
---|
230 | @remarks |
---|
231 | Every queue must have at least one request handler instance for each |
---|
232 | channel in which requests are raised. If you |
---|
233 | add more than one handler per channel, then you must implement canHandleRequest |
---|
234 | differently in each if you wish them to respond to different requests. |
---|
235 | @param channel The channel for requests you want to handle |
---|
236 | @param rh Your handler |
---|
237 | */ |
---|
238 | virtual void addRequestHandler(uint16 channel, RequestHandler* rh) = 0; |
---|
239 | /** Remove a request handler. */ |
---|
240 | virtual void removeRequestHandler(uint16 channel, RequestHandler* rh) = 0; |
---|
241 | |
---|
242 | /** Add a response handler instance to the queue. |
---|
243 | @remarks |
---|
244 | Every queue must have at least one response handler instance for each |
---|
245 | channel in which requests are raised. If you add more than one, then you |
---|
246 | must implement canHandleResponse differently in each if you wish them |
---|
247 | to respond to different responses. |
---|
248 | @param channel The channel for responses you want to handle |
---|
249 | @param rh Your handler |
---|
250 | */ |
---|
251 | virtual void addResponseHandler(uint16 channel, ResponseHandler* rh) = 0; |
---|
252 | /** Remove a Response handler. */ |
---|
253 | virtual void removeResponseHandler(uint16 channel, ResponseHandler* rh) = 0; |
---|
254 | |
---|
255 | /** Add a new request to the queue. |
---|
256 | @param channel The channel this request will go into = 0; the channel is the top-level |
---|
257 | categorisation of the request |
---|
258 | @param requestType An identifier that's unique within this queue which |
---|
259 | identifies the type of the request (user decides the actual value) |
---|
260 | @param rData The data required by the request process. |
---|
261 | @param retryCount The number of times the request should be retried |
---|
262 | if it fails. |
---|
263 | @param forceSynchronous Forces the request to be processed immediately |
---|
264 | even if threading is enabled. |
---|
265 | @param idleThread Request should be processed on the idle thread. |
---|
266 | Idle requests will be processed on a single worker thread. You should use this in the following situations: |
---|
267 | 1. If a request handler can't process multiple requests in parallel. |
---|
268 | 2. If you add lot of requests, but you want to keep the game fast. |
---|
269 | 3. If you have lot of more important threads. (example: physics). |
---|
270 | @return The ID of the request that has been added |
---|
271 | */ |
---|
272 | virtual RequestID addRequest(uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount = 0, |
---|
273 | bool forceSynchronous = false, bool idleThread = false) = 0; |
---|
274 | |
---|
275 | /** Abort a previously issued request. |
---|
276 | If the request is still waiting to be processed, it will be |
---|
277 | removed from the queue. |
---|
278 | @param id The ID of the previously issued request. |
---|
279 | */ |
---|
280 | virtual void abortRequest(RequestID id) = 0; |
---|
281 | |
---|
282 | /** Abort all previously issued requests in a given channel. |
---|
283 | Any requests still waiting to be processed of the given channel, will be |
---|
284 | removed from the queue. |
---|
285 | Requests which are processed, but response handler is not called will also be removed. |
---|
286 | @param channel The type of request to be aborted |
---|
287 | */ |
---|
288 | virtual void abortRequestsByChannel(uint16 channel) = 0; |
---|
289 | |
---|
290 | /** Abort all previously issued requests in a given channel. |
---|
291 | Any requests still waiting to be processed of the given channel, will be |
---|
292 | removed from the queue. |
---|
293 | It will not remove requests, where the request handler is already called. |
---|
294 | @param channel The type of request to be aborted |
---|
295 | */ |
---|
296 | virtual void abortPendingRequestsByChannel(uint16 channel) = 0; |
---|
297 | |
---|
298 | /** Abort all previously issued requests. |
---|
299 | Any requests still waiting to be processed will be removed from the queue. |
---|
300 | Any requests that are being processed will still complete. |
---|
301 | */ |
---|
302 | virtual void abortAllRequests() = 0; |
---|
303 | |
---|
304 | /** Set whether to pause further processing of any requests. |
---|
305 | If true, any further requests will simply be queued and not processed until |
---|
306 | setPaused(false) is called. Any requests which are in the process of being |
---|
307 | worked on already will still continue. |
---|
308 | */ |
---|
309 | virtual void setPaused(bool pause) = 0; |
---|
310 | /// Return whether the queue is paused ie not sending more work to workers |
---|
311 | virtual bool isPaused() const = 0; |
---|
312 | |
---|
313 | /** Set whether to accept new requests or not. |
---|
314 | If true, requests are added to the queue as usual. If false, requests |
---|
315 | are silently ignored until setRequestsAccepted(true) is called. |
---|
316 | */ |
---|
317 | virtual void setRequestsAccepted(bool accept) = 0; |
---|
318 | /// Returns whether requests are being accepted right now |
---|
319 | virtual bool getRequestsAccepted() const = 0; |
---|
320 | |
---|
321 | /** Process the responses in the queue. |
---|
322 | @remarks |
---|
323 | This method is public, and must be called from the main render |
---|
324 | thread to 'pump' responses through the system. The method will usually |
---|
325 | try to clear all responses before returning = 0; however, you can specify |
---|
326 | a time limit on the response processing to limit the impact of |
---|
327 | spikes in demand by calling setResponseProcessingTimeLimit. |
---|
328 | */ |
---|
329 | virtual void processResponses() = 0; |
---|
330 | |
---|
331 | /** Get the time limit imposed on the processing of responses in a |
---|
332 | single frame, in milliseconds (0 indicates no limit). |
---|
333 | */ |
---|
334 | virtual unsigned long getResponseProcessingTimeLimit() const = 0; |
---|
335 | |
---|
336 | /** Set the time limit imposed on the processing of responses in a |
---|
337 | single frame, in milliseconds (0 indicates no limit). |
---|
338 | This sets the maximum time that will be spent in processResponses() in |
---|
339 | a single frame. The default is 8ms. |
---|
340 | */ |
---|
341 | virtual void setResponseProcessingTimeLimit(unsigned long ms) = 0; |
---|
342 | |
---|
343 | /** Shut down the queue. |
---|
344 | */ |
---|
345 | virtual void shutdown() = 0; |
---|
346 | |
---|
347 | /** Get a channel ID for a given channel name. |
---|
348 | @remarks |
---|
349 | Channels are assigned on a first-come, first-served basis and are |
---|
350 | not persistent across application instances. This method allows |
---|
351 | applications to not worry about channel clashes through manually |
---|
352 | assigned channel numbers. |
---|
353 | */ |
---|
354 | virtual uint16 getChannel(const String& channelName); |
---|
355 | |
---|
356 | }; |
---|
357 | |
---|
358 | /** Base for a general purpose request / response style background work queue. |
---|
359 | */ |
---|
360 | class _OgreExport DefaultWorkQueueBase : public WorkQueue |
---|
361 | { |
---|
362 | public: |
---|
363 | |
---|
364 | /** Constructor. |
---|
365 | Call startup() to initialise. |
---|
366 | @param name Optional name, just helps to identify logging output |
---|
367 | */ |
---|
368 | DefaultWorkQueueBase(const String& name = StringUtil::BLANK); |
---|
369 | virtual ~DefaultWorkQueueBase(); |
---|
370 | /// Get the name of the work queue |
---|
371 | const String& getName() const; |
---|
372 | /** Get the number of worker threads that this queue will start when |
---|
373 | startup() is called. |
---|
374 | */ |
---|
375 | virtual size_t getWorkerThreadCount() const; |
---|
376 | |
---|
377 | /** Set the number of worker threads that this queue will start |
---|
378 | when startup() is called (default 1). |
---|
379 | Calling this will have no effect unless the queue is shut down and |
---|
380 | restarted. |
---|
381 | */ |
---|
382 | virtual void setWorkerThreadCount(size_t c); |
---|
383 | |
---|
384 | /** Get whether worker threads will be allowed to access render system |
---|
385 | resources. |
---|
386 | Accessing render system resources from a separate thread can require that |
---|
387 | a context is maintained for that thread. Also, it requires that the |
---|
388 | render system is running in threadsafe mode, which only happens |
---|
389 | when OGRE_THREAD_SUPPORT=1. This option defaults to false, which means |
---|
390 | that threads can not use GPU resources, and the render system can |
---|
391 | work in non-threadsafe mode, which is more efficient. |
---|
392 | */ |
---|
393 | virtual bool getWorkersCanAccessRenderSystem() const; |
---|
394 | |
---|
395 | |
---|
396 | /** Set whether worker threads will be allowed to access render system |
---|
397 | resources. |
---|
398 | Accessing render system resources from a separate thread can require that |
---|
399 | a context is maintained for that thread. Also, it requires that the |
---|
400 | render system is running in threadsafe mode, which only happens |
---|
401 | when OGRE_THREAD_SUPPORT=1. This option defaults to false, which means |
---|
402 | that threads can not use GPU resources, and the render system can |
---|
403 | work in non-threadsafe mode, which is more efficient. |
---|
404 | Calling this will have no effect unless the queue is shut down and |
---|
405 | restarted. |
---|
406 | */ |
---|
407 | virtual void setWorkersCanAccessRenderSystem(bool access); |
---|
408 | |
---|
409 | /** Process the next request on the queue. |
---|
410 | @remarks |
---|
411 | This method is public, but only intended for advanced users to call. |
---|
412 | The only reason you would call this, is if you were using your |
---|
413 | own thread to drive the worker processing. The thread calling this |
---|
414 | method will be the thread used to call the RequestHandler. |
---|
415 | */ |
---|
416 | virtual void _processNextRequest(); |
---|
417 | |
---|
418 | /// Main function for each thread spawned. |
---|
419 | virtual void _threadMain() = 0; |
---|
420 | |
---|
421 | /** Returns whether the queue is trying to shut down. */ |
---|
422 | virtual bool isShuttingDown() const { return mShuttingDown; } |
---|
423 | |
---|
424 | /// @copydoc WorkQueue::addRequestHandler |
---|
425 | virtual void addRequestHandler(uint16 channel, RequestHandler* rh); |
---|
426 | /// @copydoc WorkQueue::removeRequestHandler |
---|
427 | virtual void removeRequestHandler(uint16 channel, RequestHandler* rh); |
---|
428 | /// @copydoc WorkQueue::addResponseHandler |
---|
429 | virtual void addResponseHandler(uint16 channel, ResponseHandler* rh); |
---|
430 | /// @copydoc WorkQueue::removeResponseHandler |
---|
431 | virtual void removeResponseHandler(uint16 channel, ResponseHandler* rh); |
---|
432 | |
---|
433 | /// @copydoc WorkQueue::addRequest |
---|
434 | virtual RequestID addRequest(uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount = 0, |
---|
435 | bool forceSynchronous = false, bool idleThread = false); |
---|
436 | /// @copydoc WorkQueue::abortRequest |
---|
437 | virtual void abortRequest(RequestID id); |
---|
438 | /// @copydoc WorkQueue::abortRequestsByChannel |
---|
439 | virtual void abortRequestsByChannel(uint16 channel); |
---|
440 | /// @copydoc WorkQueue::abortPendingRequestsByChannel |
---|
441 | virtual void abortPendingRequestsByChannel(uint16 channel); |
---|
442 | /// @copydoc WorkQueue::abortAllRequests |
---|
443 | virtual void abortAllRequests(); |
---|
444 | /// @copydoc WorkQueue::setPaused |
---|
445 | virtual void setPaused(bool pause); |
---|
446 | /// @copydoc WorkQueue::isPaused |
---|
447 | virtual bool isPaused() const; |
---|
448 | /// @copydoc WorkQueue::setRequestsAccepted |
---|
449 | virtual void setRequestsAccepted(bool accept); |
---|
450 | /// @copydoc WorkQueue::getRequestsAccepted |
---|
451 | virtual bool getRequestsAccepted() const; |
---|
452 | /// @copydoc WorkQueue::processResponses |
---|
453 | virtual void processResponses(); |
---|
454 | /// @copydoc WorkQueue::getResponseProcessingTimeLimit |
---|
455 | virtual unsigned long getResponseProcessingTimeLimit() const { return mResposeTimeLimitMS; } |
---|
456 | /// @copydoc WorkQueue::setResponseProcessingTimeLimit |
---|
457 | virtual void setResponseProcessingTimeLimit(unsigned long ms) { mResposeTimeLimitMS = ms; } |
---|
458 | protected: |
---|
459 | String mName; |
---|
460 | size_t mWorkerThreadCount; |
---|
461 | bool mWorkerRenderSystemAccess; |
---|
462 | bool mIsRunning; |
---|
463 | unsigned long mResposeTimeLimitMS; |
---|
464 | |
---|
465 | typedef deque<Request*>::type RequestQueue; |
---|
466 | typedef deque<Response*>::type ResponseQueue; |
---|
467 | RequestQueue mRequestQueue; // Guarded by mRequestMutex |
---|
468 | RequestQueue mProcessQueue; // Guarded by mProcessMutex |
---|
469 | ResponseQueue mResponseQueue; // Guarded by mResponseMutex |
---|
470 | |
---|
471 | /// Thread function |
---|
472 | struct _OgreExport WorkerFunc OGRE_THREAD_WORKER_INHERIT |
---|
473 | { |
---|
474 | DefaultWorkQueueBase* mQueue; |
---|
475 | |
---|
476 | WorkerFunc(DefaultWorkQueueBase* q) |
---|
477 | : mQueue(q) {} |
---|
478 | |
---|
479 | void operator()(); |
---|
480 | |
---|
481 | void operator()() const; |
---|
482 | |
---|
483 | void run(); |
---|
484 | }; |
---|
485 | WorkerFunc* mWorkerFunc; |
---|
486 | |
---|
487 | /** Intermediate structure to hold a pointer to a request handler which |
---|
488 | provides insurance against the handler itself being disconnected |
---|
489 | while the list remains unchanged. |
---|
490 | */ |
---|
491 | class _OgreExport RequestHandlerHolder : public UtilityAlloc |
---|
492 | { |
---|
493 | protected: |
---|
494 | OGRE_RW_MUTEX(mRWMutex); |
---|
495 | RequestHandler* mHandler; |
---|
496 | public: |
---|
497 | RequestHandlerHolder(RequestHandler* handler) |
---|
498 | : mHandler(handler) {} |
---|
499 | |
---|
500 | // Disconnect the handler to allow it to be destroyed |
---|
501 | void disconnectHandler() |
---|
502 | { |
---|
503 | // write lock - must wait for all requests to finish |
---|
504 | OGRE_LOCK_RW_MUTEX_WRITE(mRWMutex); |
---|
505 | mHandler = 0; |
---|
506 | } |
---|
507 | |
---|
508 | /** Get handler pointer - note, only use this for == comparison or similar, |
---|
509 | do not attempt to call it as it is not thread safe. |
---|
510 | */ |
---|
511 | RequestHandler* getHandler() { return mHandler; } |
---|
512 | |
---|
513 | /** Process a request if possible. |
---|
514 | @return Valid response if processed, null otherwise |
---|
515 | */ |
---|
516 | Response* handleRequest(const Request* req, const WorkQueue* srcQ) |
---|
517 | { |
---|
518 | // Read mutex so that multiple requests can be processed by the |
---|
519 | // same handler in parallel if required |
---|
520 | OGRE_LOCK_RW_MUTEX_READ(mRWMutex); |
---|
521 | Response* response = 0; |
---|
522 | if (mHandler) |
---|
523 | { |
---|
524 | if (mHandler->canHandleRequest(req, srcQ)) |
---|
525 | { |
---|
526 | response = mHandler->handleRequest(req, srcQ); |
---|
527 | } |
---|
528 | } |
---|
529 | return response; |
---|
530 | } |
---|
531 | |
---|
532 | }; |
---|
533 | // Hold these by shared pointer so they can be copied keeping same instance |
---|
534 | typedef SharedPtr<RequestHandlerHolder> RequestHandlerHolderPtr; |
---|
535 | |
---|
536 | typedef list<RequestHandlerHolderPtr>::type RequestHandlerList; |
---|
537 | typedef list<ResponseHandler*>::type ResponseHandlerList; |
---|
538 | typedef map<uint16, RequestHandlerList>::type RequestHandlerListByChannel; |
---|
539 | typedef map<uint16, ResponseHandlerList>::type ResponseHandlerListByChannel; |
---|
540 | |
---|
541 | RequestHandlerListByChannel mRequestHandlers; |
---|
542 | ResponseHandlerListByChannel mResponseHandlers; |
---|
543 | RequestID mRequestCount; // Guarded by mRequestMutex |
---|
544 | bool mPaused; |
---|
545 | bool mAcceptRequests; |
---|
546 | bool mShuttingDown; |
---|
547 | |
---|
548 | //NOTE: If you lock multiple mutexes at the same time, the order is important! |
---|
549 | // For example if threadA locks mIdleMutex first then tries to lock mProcessMutex, |
---|
550 | // and threadB locks mProcessMutex first, then mIdleMutex. In this case you can get livelock and the system is dead! |
---|
551 | //RULE: Lock mProcessMutex before other mutex, to prevent livelocks |
---|
552 | OGRE_MUTEX(mIdleMutex); |
---|
553 | OGRE_MUTEX(mRequestMutex); |
---|
554 | OGRE_MUTEX(mProcessMutex); |
---|
555 | OGRE_MUTEX(mResponseMutex); |
---|
556 | OGRE_RW_MUTEX(mRequestHandlerMutex); |
---|
557 | |
---|
558 | |
---|
559 | void processRequestResponse(Request* r, bool synchronous); |
---|
560 | Response* processRequest(Request* r); |
---|
561 | void processResponse(Response* r); |
---|
562 | /// Notify workers about a new request. |
---|
563 | virtual void notifyWorkers() = 0; |
---|
564 | /// Put a Request on the queue with a specific RequestID. |
---|
565 | void addRequestWithRID(RequestID rid, uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount); |
---|
566 | |
---|
567 | RequestQueue mIdleRequestQueue; // Guarded by mIdleMutex |
---|
568 | bool mIdleThreadRunning; // Guarded by mIdleMutex |
---|
569 | Request* mIdleProcessed; // Guarded by mProcessMutex |
---|
570 | |
---|
571 | |
---|
572 | bool processIdleRequests(); |
---|
573 | }; |
---|
574 | |
---|
575 | |
---|
576 | |
---|
577 | |
---|
578 | |
---|
579 | /** @} */ |
---|
580 | /** @} */ |
---|
581 | |
---|
582 | } |
---|
583 | |
---|
584 | #include "OgreHeaderSuffix.h" |
---|
585 | |
---|
586 | #endif |
---|
587 | |
---|