pollf.cpp 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. #include <unistd.h>
  2. #include <string.h>
  3. #include <fcntl.h>
  4. #include <signal.h>
  5. #include <errno.h>
  6. #include "pollf.h"
  7. #include "defines.h"
  8. /////////////////////////////////////////////////////////////////////////////
  9. static CPollF g_pof;
  10. /////////////////////////////////////////////////////////////////////////////
  11. CPollF::CPollF(void) : m_bThreadExecuting(false), m_nNextJobID(0), m_mtx(PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP)
  12. {
  13. }
  14. CPollF::~CPollF(void)
  15. {
  16. if(m_bThreadExecuting)
  17. {
  18. }
  19. ::pthread_mutex_destroy(&m_mtx);
  20. }
  21. /////////////////////////////////////////////////////////////////////////////
  22. pjob_t CPollF::AddJob(const char *pszFilepath, int nEvents, PFN_POLLF_CALLBACK pfnCb, void *pParam)
  23. {
  24. return g_pof.CreateJob(pszFilepath, nEvents, pfnCb, pParam);
  25. }
  26. int CPollF::CancelJob(pjob_t nJobID)
  27. {
  28. return g_pof.ReleaseJob(nJobID);
  29. }
  30. /////////////////////////////////////////////////////////////////////////////
  31. pjob_t CPollF::CreateJob(const char *pszFilepath, int nEvents, PFN_POLLF_CALLBACK pfnCb, void *pParam)
  32. {
  33. if(!pszFilepath || !*pszFilepath || !pfnCb)
  34. {
  35. errno = EINVAL;
  36. return -1;
  37. }
  38. int fd = open(pszFilepath, O_RDONLY, 0);
  39. if(fd < 0)
  40. return -1;
  41. int nJobID = ++m_nNextJobID;
  42. m_nNextJobID &= 0x7FFFFFFF;
  43. LPPOLLFJOB pJob = new POLLFJOB;
  44. pJob->nJobID = nJobID;
  45. pJob->fd = fd;
  46. pJob->events = nEvents;
  47. pJob->pfnCb = pfnCb;
  48. pJob->pParam = pParam;
  49. ::pthread_mutex_lock(&m_mtx);
  50. m_jobs.push_back(pJob);
  51. ::pthread_mutex_unlock(&m_mtx);
  52. if(!m_bThreadExecuting)
  53. {
  54. if(m_thread.Create(&CPollF::PollThreadRoutine, reinterpret_cast<void*>(this)) < 0)
  55. {
  56. close(fd);
  57. delete pJob;
  58. return -1;
  59. }
  60. ::pthread_mutex_lock(&m_mtx);
  61. m_bThreadExecuting = true;
  62. ::pthread_mutex_unlock(&m_mtx);
  63. }
  64. else
  65. {
  66. }
  67. return nJobID;
  68. }
  69. int CPollF::ReleaseJob(pjob_t nJobID)
  70. {
  71. UNUSED(nJobID);
  72. ::pthread_mutex_lock(&m_mtx);
  73. ::pthread_mutex_unlock(&m_mtx);
  74. return 0;
  75. }
  76. /////////////////////////////////////////////////////////////////////////////
  77. LPPOLLFJOB CPollF::FindByFileDescriptor(int fd)
  78. {
  79. for(auto it = m_jobs.begin(); it != m_jobs.end(); ++it)
  80. {
  81. LPPOLLFJOB pjob = *it;
  82. if(pjob->fd == fd)
  83. return pjob;
  84. }
  85. return NULL;
  86. }
  87. LPPOLLFJOB CPollF::FindByJobID(pjob_t jid)
  88. {
  89. for(auto it = m_jobs.begin(); it != m_jobs.end(); ++it)
  90. {
  91. LPPOLLFJOB pjob = *it;
  92. if(pjob->nJobID == jid)
  93. return pjob;
  94. }
  95. return NULL;
  96. }
  97. /////////////////////////////////////////////////////////////////////////////
  98. void* CPollF::PollThreadRoutine(void *pParam)
  99. {
  100. int ret;
  101. bool bRun = true;
  102. CPollF *pThis = reinterpret_cast<CPollF*>(pParam);
  103. while(bRun)
  104. {
  105. struct pollfd *pfd = NULL;
  106. ::pthread_mutex_lock(&pThis->m_mtx);
  107. auto nSize = pThis->m_jobs.size();
  108. pfd = new struct pollfd[nSize];
  109. memset(pfd, 0, sizeof(struct pollfd) * nSize);
  110. for(size_t i = 0; i < nSize; ++i)
  111. {
  112. const POLLFJOB &job = *pThis->m_jobs[i];
  113. pfd[i].fd = job.fd;
  114. pfd[i].events = (short)job.events;
  115. }
  116. ::pthread_mutex_unlock(&pThis->m_mtx);
  117. if((ret = ::poll(pfd, nSize, -1)) >= 0)
  118. {
  119. for(size_t i = 0; i < nSize; ++i)
  120. {
  121. int evt;
  122. LPCPOLLFJOB pJob;
  123. const struct pollfd &rpfd = pfd[i];
  124. if(!rpfd.revents)
  125. continue;
  126. ::pthread_mutex_lock(&pThis->m_mtx);
  127. if((pJob = pThis->FindByFileDescriptor(rpfd.fd)))
  128. {
  129. if(rpfd.revents & (POLLERR | POLLHUP | POLLNVAL))
  130. {
  131. pThis->ReleaseJob(pJob->nJobID);
  132. (*pJob->pfnCb)(pJob->nJobID, -1, rpfd.revents, pJob->pParam);
  133. }
  134. else if((evt = rpfd.revents & (short)pJob->events))
  135. {
  136. (*pJob->pfnCb)(pJob->nJobID, pJob->fd, evt, pJob->pParam);
  137. }
  138. }
  139. ::pthread_mutex_unlock(&pThis->m_mtx);
  140. if(!--ret)
  141. break;
  142. }
  143. }
  144. else
  145. {
  146. if(errno == EINTR)
  147. {
  148. }
  149. }
  150. delete pfd;
  151. }
  152. return NULL;
  153. }