ACE之ACE_Dev_Poll_Reactor
简介
ACE_Dev_Poll_Reactor使用的io复用是epoll或者dev poll
结构
会使用到事件处理器仓库,一方面在使用epoll时,只使用了epoll_event.data.fd
初始化
构造函数或者open
- 初始化像信号处理器
signal_handler_
- 初始化定时器队列
timer_queue_
- 初始化唤醒reactor的中断器
notify_handler_
- 初始化epoll fd或者dev poll
- 处理器仓库
handler_rep_
初始化open notify_handler_
初始化open,注册到reactor中
int
ACE_Dev_Poll_Reactor::open (size_t size,bool restart,ACE_Sig_Handler *sh,ACE_Timer_Queue *tq,int disable_notify_pipe,ACE_Reactor_Notify *notify)
{ACE_TRACE ("ACE_Dev_Poll_Reactor::open");ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));// Can't initialize ourselves more than once.if (this->initialized_)return -1;#ifdef ACE_HAS_EVENT_POLLACE_OS::memset (&this->event_, 0, sizeof (this->event_));this->event_.data.fd = ACE_INVALID_HANDLE;
#endif /* ACE_HAS_EVENT_POLL */this->restart_ = restart;this->signal_handler_ = sh;this->timer_queue_ = tq;this->notify_handler_ = notify;int result = 0;// Allows the signal handler to be overridden.if (this->signal_handler_ == 0){ACE_NEW_RETURN (this->signal_handler_,ACE_Sig_Handler,-1);if (this->signal_handler_ == 0)result = -1;elsethis->delete_signal_handler_ = true;}// Allows the timer queue to be overridden.if (result != -1 && this->timer_queue_ == 0){ACE_NEW_RETURN (this->timer_queue_,ACE_Timer_Heap,-1);if (this->timer_queue_ == 0)result = -1;elsethis->delete_timer_queue_ = true;}// Allows the Notify_Handler to be overridden.if (result != -1 && this->notify_handler_ == 0){ACE_NEW_RETURN (this->notify_handler_,ACE_Dev_Poll_Reactor_Notify,-1);if (this->notify_handler_ == 0)result = -1;elsethis->delete_notify_handler_ = true;}#if defined (ACE_HAS_EVENT_POLL)// Initialize epoll:this->poll_fd_ = ::epoll_create (size);if (this->poll_fd_ == -1)result = -1;#else// Allocate the array before opening the device to avoid a potential// resource leak if allocation fails.ACE_NEW_RETURN (this->dp_fds_,pollfd[size],-1);// Open the `/dev/poll' character device.this->poll_fd_ = ACE_OS::open ("/dev/poll", O_RDWR);if (this->poll_fd_ == ACE_INVALID_HANDLE)result = -1;#endif /* ACE_HAS_EVENT_POLL */if (result != -1 && this->handler_rep_.open (size) == -1)result = -1;// Registration of the notification handler must be done after the// /dev/poll device has been fully initialized.else if (this->notify_handler_->open (this,0,disable_notify_pipe) == -1|| (disable_notify_pipe == 0&& this->register_handler_i (this->notify_handler_->notify_handle (),this->notify_handler_,ACE_Event_Handler::READ_MASK) == -1))result = -1;if (result != -1)// We're all set to go.this->initialized_ = true;else// This will close down all the allocated resources properly.(void) this->close ();return result;
}
事件注册
- 将事件处理添加到
handler_rep_
仓库中 - 对于非
notify_handler_
即网络io事件添加时,使用epoll时会设置EPOLLONESHOT
事件只触发一次,需要在处理完后重置
int
ACE_Dev_Poll_Reactor::register_handler_i (ACE_HANDLE handle,ACE_Event_Handler *event_handler,ACE_Reactor_Mask mask)
{ACE_TRACE ("ACE_Dev_Poll_Reactor::register_handler_i");if (handle == ACE_INVALID_HANDLE|| mask == ACE_Event_Handler::NULL_MASK){errno = EINVAL;return -1;}if (this->handler_rep_.find (handle) == 0){// Handler not present in the repository. Bind it.if (this->handler_rep_.bind (handle, event_handler, mask) != 0)return -1;#if defined (ACE_HAS_EVENT_POLL)Event_Tuple *info = this->handler_rep_.find (handle);struct epoll_event epev;ACE_OS::memset (&epev, 0, sizeof (epev));static const int op = EPOLL_CTL_ADD;epev.data.fd = handle;epev.events = this->reactor_mask_to_poll_event (mask);// All but the notify handler get registered with oneshot to facilitate// auto suspend before the upcall. See dispatch_io_event for more// information.if (event_handler != this->notify_handler_)epev.events |= EPOLLONESHOT;if (::epoll_ctl (this->poll_fd_, op, handle, &epev) == -1){ACELIB_ERROR ((LM_ERROR, ACE_TEXT("%p\n"), ACE_TEXT("epoll_ctl")));(void) this->handler_rep_.unbind (handle);return -1;}info->controlled = true;#endif /* ACE_HAS_EVENT_POLL */}else{// Handler is already present in the repository, so register it// again, possibly for different event. Add new mask to the// current one.if (this->mask_ops_i (handle, mask, ACE_Reactor::ADD_MASK) == -1)ACELIB_ERROR_RETURN ((LM_ERROR, ACE_TEXT("%p\n"), ACE_TEXT("mask_ops_i")),-1);}#ifdef ACE_HAS_DEV_POLLstruct pollfd pfd;pfd.fd = handle;pfd.events = this->reactor_mask_to_poll_event (mask);pfd.revents = 0;// Add file descriptor to the "interest set."if (ACE_OS::write (this->poll_fd_, &pfd, sizeof (pfd)) != sizeof (pfd)){(void) this->handler_rep_.unbind (handle);return -1;}
#endif /*ACE_HAS_DEV_POLL*/// Note the fact that we've changed the state of the wait_set_,// which is used by the dispatching loop to determine whether it can// keep going or if it needs to reconsult select ().// this->state_changed_ = 1;return 0;
}
事件处理
- 使用epoll时,每次只监听一个事件
- 当处理中断事件时,sig_pending_`为1表示正在处理中断事件,将中断标志复位为0,等待下一轮事件循环分发io,定时,通知事件
- 先处理定时器事件
- 然后处理网络io事件,对于epoll,事件处理器不是通知事件处理器时,将仓库中对应的信息
suspended
修改为true
,调用完事件处理器后,resume_handler_i
重置事件,等待下一次事件再次触发,suspended
修改为false
,controlled
修改为true
// Wait for an event.int const nfds = ::epoll_wait (this->poll_fd_,&this->event_,1,static_cast<int> (timeout));
int
ACE_Dev_Poll_Reactor::handle_events_i (ACE_Time_Value *max_wait_time,Token_Guard &guard)
{ACE_TRACE ("ACE_Dev_Poll_Reactor::handle_events_i");int result = 0;// Poll for events//// If the underlying event wait call was interrupted via the interrupt// signal (i.e. returned -1 with errno == EINTR) then the loop will// be restarted if so desired.do{result = this->work_pending_i (max_wait_time);if (result == -1 && (this->restart_ == 0 || errno != EINTR))ACELIB_ERROR ((LM_ERROR, ACE_TEXT("%t: %p\n"), ACE_TEXT("work_pending_i")));}while (result == -1 && this->restart_ != 0 && errno == EINTR);if (result == 0 || (result == -1 && errno == ETIME))return 0;else if (result == -1){if (errno != EINTR)return -1;// Bail out -- we got here since the poll was interrupted.// If it was due to a signal registered through our ACE_Sig_Handler,// then it was dispatched, so we count it in the number of events// handled rather than cause an error return.if (ACE_Sig_Handler::sig_pending () != 0){ACE_Sig_Handler::sig_pending (0);return 1;}return -1;}// Dispatch an event.return this->dispatch (guard);
}int
ACE_Dev_Poll_Reactor::dispatch (Token_Guard &guard)
{ACE_TRACE ("ACE_Dev_Poll_Reactor::dispatch");// Perform the Template Method for dispatching the first located event.// We dispatch only one to effectively dispatch events concurrently.// As soon as an event is located, the token is released, allowing the// next waiter to begin getting an event while we dispatch one here.int result = 0;// Handle timers early since they may have higher latency// constraints than I/O handlers. Ideally, the order of// dispatching should be a strategy...if ((result = this->dispatch_timer_handler (guard)) != 0)return result;// If no timer dispatched, check for an I/O event.result = this->dispatch_io_event (guard);return result;
}int
ACE_Dev_Poll_Reactor::resume_handler_i (ACE_HANDLE handle)
{ACE_TRACE ("ACE_Dev_Poll_Reactor::resume_handler_i");Event_Tuple *info = this->handler_rep_.find (handle);if (info == 0)return -1;if (!info->suspended)return 0;ACE_Reactor_Mask mask = info->mask;if (mask == ACE_Event_Handler::NULL_MASK){info->suspended = false;return 0;}// Place the handle back in to the "interest set."//// Events for the given handle will once again be polled.#if defined (ACE_HAS_EVENT_POLL)struct epoll_event epev;ACE_OS::memset (&epev, 0, sizeof (epev));int op = EPOLL_CTL_ADD;if (info->controlled)op = EPOLL_CTL_MOD;epev.events = this->reactor_mask_to_poll_event (mask) | EPOLLONESHOT;epev.data.fd = handle;if (::epoll_ctl (this->poll_fd_, op, handle, &epev) == -1)return -1;info->controlled = true;#elsestruct pollfd pfd[1];pfd[0].fd = handle;pfd[0].events = this->reactor_mask_to_poll_event (mask);pfd[0].revents = 0;if (ACE_OS::write (this->poll_fd_, pfd, sizeof (pfd)) != sizeof (pfd))return -1;#endif /* ACE_HAS_EVENT_POLL */info->suspended = false;return 0;
}