My Project
AsyncLoader.h
1 #pragma once
2 #include "IAttributeFields.h"
3 #include "IDataLoader.h"
4 #include "util/LogService.h"
5 #include <vector>
6 #include <set>
7 #include <boost/thread.hpp>
8 #include <boost/asio.hpp>
9 #include <boost/shared_ptr.hpp>
10 #include <boost/enable_shared_from_this.hpp>
11 
13 #define E_TRYAGAIN -123456
14 
22 #define MAX_PROCESS_QUEUE 16
23 
24 namespace ParaEngine
25 {
26  class CDirectXEngine;
27  class CGDIEngine;
28  //--------------------------------------------------------------------------------------
29  // Structures
30  //--------------------------------------------------------------------------------------
31 
63  {
64  public:
65  enum AssetLogLevelEnum
66  {
67  Log_All = 0,
68  Log_Debug,
69  Log_Remote,
70  Log_Warn,
71  Log_Error,
72  };
73 
75 
76  CAsyncLoader();
77  virtual ~CAsyncLoader();
78 
79  ATTRIBUTE_DEFINE_CLASS(CAsyncLoader);
80 
82  virtual int InstallFields(CAttributeClass* pClass, bool bOverride);
83 
84  ATTRIBUTE_METHOD1(CAsyncLoader, GetEstimatedSizeInBytes_s, int*) { *p1 = cls->GetEstimatedSizeInBytes(); return S_OK; }
85  ATTRIBUTE_METHOD1(CAsyncLoader, GetItemsLeft_s, int*) { *p1 = cls->GetItemsLeft(); return S_OK; }
86  ATTRIBUTE_METHOD1(CAsyncLoader, GetBytesProcessed_s, int*) { *p1 = cls->GetBytesProcessed(); return S_OK; }
87 
88  ATTRIBUTE_METHOD1(CAsyncLoader, SetWorkerThreads_s, Vector2) { cls->CreateWorkerThreads((int)p1.x, (int)p1.y); return S_OK; }
89 
90  ATTRIBUTE_METHOD1(CAsyncLoader, SetProcessorQueueSize_s, Vector2) { cls->SetProcessorQueueSize((int)p1.x, (int)p1.y); return S_OK; }
91 
92  ATTRIBUTE_METHOD1(CAsyncLoader, GetLogLevel_s, int*) { *p1 = cls->GetLogLevel(); return S_OK; }
93  ATTRIBUTE_METHOD1(CAsyncLoader, SetLogLevel_s, int) { cls->SetLogLevel(p1); return S_OK; }
94 
95  ATTRIBUTE_METHOD1(CAsyncLoader, log_s, const char*) { cls->log(p1); return S_OK; }
96  ATTRIBUTE_METHOD(CAsyncLoader, WaitForAllItems_s) { cls->WaitForAllItems(); return S_OK; }
97 
98  private:
99  struct ProcessorWorkerThread;
100  public:
102  static CAsyncLoader& GetSingleton();
103 
105  void CleanUp();
106 
110  int Start(int nWorkerCount = 5);
111 
113  int Stop();
114 
119  int AddWorkItem( IDataLoader* pDataLoader, IDataProcessor* pDataProcessor, HRESULT* pHResult, void** ppDeviceObject, int nProcessorThreadID=0);
120 
124  int AddWorkItem( ResourceRequest_ptr& request );
125 
129  HRESULT RunWorkItem( IDataLoader* pDataLoader, IDataProcessor* pDataProcessor, HRESULT* pHResult, void** ppDeviceObject);
130 
134  HRESULT RunWorkItem( ResourceRequest_ptr& request );
135 
144  void ProcessDeviceWorkItems( int CurrentNumResourcesToService=100, bool bRetryLoads = false );
145  /* this function is usually called by the render thread, but it may also be called in IO or worker thread is IsDeviceObject() is false. */
146  void ProcessDeviceWorkItemImp(ResourceRequest_ptr& pResourceRequest, bool bRetryLoads = false);
147 
158  bool CreateWorkerThreads(int nProcessorQueueID, int nMaxCount);
159  int GetWorkerThreadsCount(int nProcessorQueueID);
161  void SetProcessorQueueSize(int nProcessorQueueID, int nSize);
162  int GetProcessorQueueSize(int nProcessorQueueID);
163 
167  void WaitForAllItems();
168 
169 #ifdef PARAENGINE_CLIENT
170 
171  inline LPD3DXFILE GetFileParser() { return m_pXFileParser; };
172 
174  CDirectXEngine* GetEngine();
175 
177  CGDIEngine* GetGDIEngine();
178 #endif
179 
184  void AddPendingRequest(const char* sURL);
185 
190  bool HasPendingRequest(const char* sURL);
191 
195  void RemovePendingRequest(const char* sURL);
196 
201 
202  int GetLogLevel() const;
203  void SetLogLevel(int val);
204 
206  void log(const string& msg);
207  void log(int nLogLevel, const string& msg);
208 
210  inline bool interruption_requested(){return m_bInterruptSignal;}
211 
214  void Interrupt();
215 
226  int GetItemsLeft(int nItemType = -1);
227 
231 
241  int GetBytesProcessed(int nItemType = -1);
242 
243  protected:
244 
254  int FileIOThreadProc();
256  int FileIOThreadProc_HandleRequest(ResourceRequest_ptr& ResourceRequest);
257 
265  int ProcessingThreadProc(ProcessorWorkerThread* pThreadData);
266 
267  private:
268 
269  bool m_bDone;
270  bool m_bProcessThreadDone;
271  bool m_bIOThreadDone;
272 
274  volatile bool m_bInterruptSignal;
275 
276  int m_NumResourcesToService;
278  int m_NumOutstandingResources;
280  int m_nRemainingBytes;
281 
282  CResourceRequestQueue m_IOQueue;
283  CResourceRequestQueue m_RenderThreadQueue;
292  CResourceRequestQueue m_ProcessQueues[MAX_PROCESS_QUEUE];
293 
294 
296  Boost_Thread_ptr_type m_io_thread;
297 
298 
300  struct ProcessorWorkerThread : public IProcessorWorkerData
301  {
302  public:
303  ProcessorWorkerThread();
304  ProcessorWorkerThread(int nQueueID);
305 
306  ~ProcessorWorkerThread();
307  public:
308  void reset(boost::thread * pThread) {m_thread.reset(pThread);}
309  void reset() {m_thread.reset();}
310  void join() { if(m_thread.get()) m_thread->join();}
311 
316  bool timed_join(int nSeconds);
317 
319  virtual int GetProcessorQueueID() {return m_nQueueID;}
320 
325  virtual void* GetCurlInterface(int nID = 0);
326 
328  virtual void AddBytesProcessed(int nBytesProcessed);
329 
331  virtual int GetBytesProcessed();
332  public:
333  // thread ptr.
334  Boost_Thread_ptr_type m_thread;
335 
336  // which queue to process.
337  int m_nQueueID;
338 
340  void* m_curl;
341 
342  // bytes processed
343  volatile int m_nBytesProcessed;
344  };
345 
347  struct DefaultWorkerThreadData : public ProcessorWorkerThread
348  {
349  public:
351  virtual void* GetCurlInterface(int nID = 0);
352  };
353 
355  std::vector< ProcessorWorkerThread* > m_workers;
356 
358  DefaultWorkerThreadData* m_default_processor_worker_data;
359 
361  ParaEngine::mutex m_default_processor_mutex;
362 
364  CServiceLogger_ptr g_asset_logger;
365 
367  AssetLogLevelEnum m_nLogLevel;
368 
369 #ifdef PARAENGINE_CLIENT
370 
371  LPD3DXFILE m_pXFileParser;
372 
374  CDirectXEngine* m_pEngine;
375 
377  CGDIEngine* m_pGDIEngine;
378 #endif
379  // only for pending request.
380  ParaEngine::mutex m_pending_request_mutex;
381  // for statistics of request
382  ParaEngine::mutex m_request_stats;
383 
384  // all pending url, this could prevent the same url to be request multiple times.
385  std::set <std::string> m_pending_requests;
386  };
387 }
bool CreateWorkerThreads(int nProcessorQueueID, int nMaxCount)
make sure that there are nMaxCount workers threads processing the queue at nProcessorQueueID.
Definition: AsyncLoader.cpp:271
an attribute class is a collection of attribute fields.
Definition: AttributeClass.h:10
void CleanUp()
clean up everything, exit all threads created.
Definition: AsyncLoader.cpp:232
void AddPendingRequest(const char *sURL)
add a string url to a set.
Definition: AsyncLoader.cpp:938
void Interrupt()
call this function to interrupt all threads.
Definition: AsyncLoader.cpp:253
void WaitForAllItems()
Wait for all work in the queues to finish.
Definition: AsyncLoader.cpp:396
this allows us to create another device, such as in a worker thread to perform some background work w...
Definition: GDIEngine.h:57
virtual int InstallFields(CAttributeClass *pClass, bool bOverride)
this class should be implemented if one wants to add new attribute.
Definition: AsyncLoader.cpp:988
different physics engine has different winding order.
Definition: EventBinding.h:32
int GetItemsLeft(int nItemType=-1)
Get the number of items left in the asynchronous content loader queue.
Definition: AsyncLoader.cpp:341
the message queue
Definition: IDataLoader.h:156
int FileIOThreadProc_HandleRequest(ResourceRequest_ptr &ResourceRequest)
this is usually called by FileIOThreadProc(), but may be called by other thread as well if IsDeviceOb...
Definition: AsyncLoader.cpp:553
void log(const string &msg)
write formated text to "asset.log".
Definition: AsyncLoader.cpp:258
void RemovePendingRequest(const char *sURL)
remove a string url from a set.
Definition: AsyncLoader.cpp:952
interface of processor worker data
Definition: IDataLoader.h:44
int Stop()
stop everything.
Definition: AsyncLoader.cpp:421
bool HasPendingRequest(const char *sURL)
whether the given request is already in the pending set.
Definition: AsyncLoader.cpp:977
bool interruption_requested()
this is a global interrupt signal.
Definition: AsyncLoader.h:210
Standard 2-dimensional vector.
Definition: ParaVector2.h:16
int Start(int nWorkerCount=5)
call this only once to start async loader
Definition: AsyncLoader.cpp:501
int GetBytesProcessed(int nItemType=-1)
get the total number of bytes processed.
Definition: AsyncLoader.cpp:362
static CAsyncLoader & GetSingleton()
get singleton instance.
Definition: AsyncLoader.cpp:248
IDataProcessor is an interface that the AsyncLoader class uses to process and copy data into locked r...
Definition: IDataLoader.h:76
HRESULT RunWorkItem(IDataLoader *pDataLoader, IDataProcessor *pDataProcessor, HRESULT *pHResult, void **ppDeviceObject)
this is same as AddWorkItem, except that it is a synchronous function.
Definition: AsyncLoader.cpp:749
int FileIOThreadProc()
This is the one IO threadproc.
Definition: AsyncLoader.cpp:640
void ProcessDeviceWorkItems(int CurrentNumResourcesToService=100, bool bRetryLoads=false)
ProcessDeviceWorkItems is called by the graphics thread.
Definition: AsyncLoader.cpp:927
A common interface for all classes implementing IAttributeFields By implementing this class&#39;s virtual...
Definition: IAttributeFields.h:59
IDataLoader is an interface that the AsyncLoader class uses to load data from disk.
Definition: IDataLoader.h:14
void ClearAllPendingRequests()
clear all pending request.
Definition: AsyncLoader.cpp:961
It uses architecture proposed by the content streaming sample in DirectX 9&10.
Definition: AsyncLoader.h:62
int ProcessingThreadProc(ProcessorWorkerThread *pThreadData)
This is the threadproc for the processing thread.
Definition: AsyncLoader.cpp:660
void SetProcessorQueueSize(int nProcessorQueueID, int nSize)
message queue size of a given processor id
Definition: AsyncLoader.cpp:321
ParaEngine resource request.
Definition: IDataLoader.h:124
int AddWorkItem(IDataLoader *pDataLoader, IDataProcessor *pDataProcessor, HRESULT *pHResult, void **ppDeviceObject, int nProcessorThreadID=0)
Add a work item to the queue of work items Only call this from graphics thread @ param nProcessorThre...
Definition: AsyncLoader.cpp:786
cross platform mutex
Definition: mutex.h:95
int GetEstimatedSizeInBytes()
get total estimated size in bytes.
Definition: AsyncLoader.cpp:335