// File: lzham_task_pool_pthreads.cpp // // Copyright (c) 2009-2010 Richard Geldreich, Jr. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. #include "lzham_core.h" #include "lzham_pthreads_threading.h" #include "lzham_timer.h" #ifdef WIN32 #include #endif #if defined(__GNUC__) #include #endif #if LZHAM_USE_PTHREADS_API #ifdef WIN32 #pragma comment(lib, "../ext/libpthread/lib/pthreadVC2.lib") #endif namespace lzham { task_pool::task_pool() : m_num_threads(0), m_tasks_available(0, 32767), m_num_outstanding_tasks(0), m_exit_flag(false) { utils::zero_object(m_threads); } task_pool::task_pool(uint num_threads) : m_num_threads(0), m_tasks_available(0, 32767), m_num_outstanding_tasks(0), m_exit_flag(false) { utils::zero_object(m_threads); bool status = init(num_threads); LZHAM_VERIFY(status); } task_pool::~task_pool() { deinit(); } bool task_pool::init(uint num_threads) { LZHAM_ASSERT(num_threads <= cMaxThreads); num_threads = math::minimum(num_threads, cMaxThreads); deinit(); bool succeeded = true; m_num_threads = 0; while (m_num_threads < num_threads) { int status = pthread_create(&m_threads[m_num_threads], NULL, thread_func, this); if (status) { succeeded = false; break; } m_num_threads++; } if (!succeeded) { deinit(); return false; } return true; } void task_pool::deinit() { if (m_num_threads) { join(); atomic_exchange32(&m_exit_flag, true); m_tasks_available.release(m_num_threads); for (uint i = 0; i < m_num_threads; i++) pthread_join(m_threads[i], NULL); m_num_threads = 0; atomic_exchange32(&m_exit_flag, false); } m_task_stack.clear(); m_num_outstanding_tasks = 0; } bool task_pool::queue_task(task_callback_func pFunc, uint64 data, void* pData_ptr) { LZHAM_ASSERT(m_num_threads); LZHAM_ASSERT(pFunc); task tsk; tsk.m_callback = pFunc; tsk.m_data = data; tsk.m_pData_ptr = pData_ptr; tsk.m_flags = 0; if (!m_task_stack.try_push(tsk)) return false; atomic_increment32(&m_num_outstanding_tasks); m_tasks_available.release(1); return true; } // It's the object's responsibility to delete pObj within the execute_task() method, if needed! bool task_pool::queue_task(executable_task* pObj, uint64 data, void* pData_ptr) { LZHAM_ASSERT(m_num_threads); LZHAM_ASSERT(pObj); task tsk; tsk.m_pObj = pObj; tsk.m_data = data; tsk.m_pData_ptr = pData_ptr; tsk.m_flags = cTaskFlagObject; if (!m_task_stack.try_push(tsk)) return false; atomic_increment32(&m_num_outstanding_tasks); m_tasks_available.release(1); return true; } void task_pool::process_task(task& tsk) { if (tsk.m_flags & cTaskFlagObject) tsk.m_pObj->execute_task(tsk.m_data, tsk.m_pData_ptr); else tsk.m_callback(tsk.m_data, tsk.m_pData_ptr); atomic_decrement32(&m_num_outstanding_tasks); } void task_pool::join() { task tsk; while (atomic_add32(&m_num_outstanding_tasks, 0) > 0) { if (m_task_stack.pop(tsk)) { process_task(tsk); } else { lzham_sleep(1); } } } void * task_pool::thread_func(void *pContext) { task_pool* pPool = static_cast(pContext); task tsk; for ( ; ; ) { if (!pPool->m_tasks_available.wait()) break; if (pPool->m_exit_flag) break; if (pPool->m_task_stack.pop(tsk)) { pPool->process_task(tsk); } } return NULL; } uint lzham_get_max_helper_threads() { #if defined(__GNUC__) uint num_procs = get_nprocs(); return num_procs ? (num_procs - 1) : 0; #else printf("TODO: lzham_get_max_helper_threads(): Implement system specific func to determine the max # of helper threads\n"); // Just assume a dual-core machine. return 1; #endif } } // namespace lzham #endif // LZHAM_USE_PTHREADS_API