#include #include #include #include #include #include "pollf.h" #include "defines.h" ///////////////////////////////////////////////////////////////////////////// static CPollF g_pof; ///////////////////////////////////////////////////////////////////////////// CPollF::CPollF(void) : m_bThreadExecuting(false), m_nNextJobID(0), m_mtx(PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP) { } CPollF::~CPollF(void) { if(m_bThreadExecuting) { } ::pthread_mutex_destroy(&m_mtx); } ///////////////////////////////////////////////////////////////////////////// pjob_t CPollF::AddJob(const char *pszFilepath, int nEvents, PFN_POLLF_CALLBACK pfnCb, void *pParam) { return g_pof.CreateJob(pszFilepath, nEvents, pfnCb, pParam); } int CPollF::CancelJob(pjob_t nJobID) { return g_pof.ReleaseJob(nJobID); } ///////////////////////////////////////////////////////////////////////////// pjob_t CPollF::CreateJob(const char *pszFilepath, int nEvents, PFN_POLLF_CALLBACK pfnCb, void *pParam) { if(!pszFilepath || !*pszFilepath || !pfnCb) { errno = EINVAL; return -1; } int fd = open(pszFilepath, O_RDONLY, 0); if(fd < 0) return -1; int nJobID = ++m_nNextJobID; m_nNextJobID &= 0x7FFFFFFF; LPPOLLFJOB pJob = new POLLFJOB; pJob->nJobID = nJobID; pJob->fd = fd; pJob->events = nEvents; pJob->pfnCb = pfnCb; pJob->pParam = pParam; ::pthread_mutex_lock(&m_mtx); m_jobs.push_back(pJob); ::pthread_mutex_unlock(&m_mtx); if(!m_bThreadExecuting) { if(m_thread.Create(&CPollF::PollThreadRoutine, reinterpret_cast(this)) < 0) { close(fd); delete pJob; return -1; } ::pthread_mutex_lock(&m_mtx); m_bThreadExecuting = true; ::pthread_mutex_unlock(&m_mtx); } else { } return nJobID; } int CPollF::ReleaseJob(pjob_t nJobID) { UNUSED(nJobID); ::pthread_mutex_lock(&m_mtx); ::pthread_mutex_unlock(&m_mtx); return 0; } ///////////////////////////////////////////////////////////////////////////// LPPOLLFJOB CPollF::FindByFileDescriptor(int fd) { for(auto it = m_jobs.begin(); it != m_jobs.end(); ++it) { LPPOLLFJOB pjob = *it; if(pjob->fd == fd) return pjob; } return NULL; } LPPOLLFJOB CPollF::FindByJobID(pjob_t jid) { for(auto it = m_jobs.begin(); it != m_jobs.end(); ++it) { LPPOLLFJOB pjob = *it; if(pjob->nJobID == jid) return pjob; } return NULL; } ///////////////////////////////////////////////////////////////////////////// void* CPollF::PollThreadRoutine(void *pParam) { int ret; bool bRun = true; CPollF *pThis = reinterpret_cast(pParam); while(bRun) { struct pollfd *pfd = NULL; ::pthread_mutex_lock(&pThis->m_mtx); auto nSize = pThis->m_jobs.size(); pfd = new struct pollfd[nSize]; memset(pfd, 0, sizeof(struct pollfd) * nSize); for(size_t i = 0; i < nSize; ++i) { const POLLFJOB &job = *pThis->m_jobs[i]; pfd[i].fd = job.fd; pfd[i].events = (short)job.events; } ::pthread_mutex_unlock(&pThis->m_mtx); if((ret = ::poll(pfd, nSize, -1)) >= 0) { for(size_t i = 0; i < nSize; ++i) { int evt; LPCPOLLFJOB pJob; const struct pollfd &rpfd = pfd[i]; if(!rpfd.revents) continue; ::pthread_mutex_lock(&pThis->m_mtx); if((pJob = pThis->FindByFileDescriptor(rpfd.fd))) { if(rpfd.revents & (POLLERR | POLLHUP | POLLNVAL)) { pThis->ReleaseJob(pJob->nJobID); (*pJob->pfnCb)(pJob->nJobID, -1, rpfd.revents, pJob->pParam); } else if((evt = rpfd.revents & (short)pJob->events)) { (*pJob->pfnCb)(pJob->nJobID, pJob->fd, evt, pJob->pParam); } } ::pthread_mutex_unlock(&pThis->m_mtx); if(!--ret) break; } } else { if(errno == EINTR) { } } delete pfd; } return NULL; }