当前位置: 首页 > news >正文

ACE之ACE_Dev_Poll_Reactor

简介

ACE_Dev_Poll_Reactor使用的io复用是epoll或者dev poll

结构

ACE_Dev_Poll_Reactor
ACE_Dev_Poll_Reactor_Notify
ACE_Sig_Handler
Handler_Repository
ACE_Timer_Queue

会使用到事件处理器仓库,一方面在使用epoll时,只使用了epoll_event.data.fd

初始化

构造函数或者open

  • 初始化像信号处理器signal_handler_
  • 初始化定时器队列timer_queue_
  • 初始化唤醒reactor的中断器notify_handler_
  • 初始化epoll fd或者dev poll
  • 处理器仓库handler_rep_初始化open
  • notify_handler_初始化open,注册到reactor中
int
ACE_Dev_Poll_Reactor::open (size_t size,bool restart,ACE_Sig_Handler *sh,ACE_Timer_Queue *tq,int disable_notify_pipe,ACE_Reactor_Notify *notify)
{ACE_TRACE ("ACE_Dev_Poll_Reactor::open");ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));// Can't initialize ourselves more than once.if (this->initialized_)return -1;#ifdef ACE_HAS_EVENT_POLLACE_OS::memset (&this->event_, 0, sizeof (this->event_));this->event_.data.fd = ACE_INVALID_HANDLE;
#endif /* ACE_HAS_EVENT_POLL */this->restart_ = restart;this->signal_handler_ = sh;this->timer_queue_ = tq;this->notify_handler_ = notify;int result = 0;// Allows the signal handler to be overridden.if (this->signal_handler_ == 0){ACE_NEW_RETURN (this->signal_handler_,ACE_Sig_Handler,-1);if (this->signal_handler_ == 0)result = -1;elsethis->delete_signal_handler_ = true;}// Allows the timer queue to be overridden.if (result != -1 && this->timer_queue_ == 0){ACE_NEW_RETURN (this->timer_queue_,ACE_Timer_Heap,-1);if (this->timer_queue_ == 0)result = -1;elsethis->delete_timer_queue_ = true;}// Allows the Notify_Handler to be overridden.if (result != -1 && this->notify_handler_ == 0){ACE_NEW_RETURN (this->notify_handler_,ACE_Dev_Poll_Reactor_Notify,-1);if (this->notify_handler_ == 0)result = -1;elsethis->delete_notify_handler_ = true;}#if defined (ACE_HAS_EVENT_POLL)// Initialize epoll:this->poll_fd_ = ::epoll_create (size);if (this->poll_fd_ == -1)result = -1;#else// Allocate the array before opening the device to avoid a potential// resource leak if allocation fails.ACE_NEW_RETURN (this->dp_fds_,pollfd[size],-1);// Open the `/dev/poll' character device.this->poll_fd_ = ACE_OS::open ("/dev/poll", O_RDWR);if (this->poll_fd_ == ACE_INVALID_HANDLE)result = -1;#endif  /* ACE_HAS_EVENT_POLL */if (result != -1 && this->handler_rep_.open (size) == -1)result = -1;// Registration of the notification handler must be done after the// /dev/poll device has been fully initialized.else if (this->notify_handler_->open (this,0,disable_notify_pipe) == -1|| (disable_notify_pipe == 0&& this->register_handler_i (this->notify_handler_->notify_handle (),this->notify_handler_,ACE_Event_Handler::READ_MASK) == -1))result = -1;if (result != -1)// We're all set to go.this->initialized_ = true;else// This will close down all the allocated resources properly.(void) this->close ();return result;
}

事件注册

  • 将事件处理添加到handler_rep_仓库中
  • 对于非notify_handler_ 即网络io事件添加时,使用epoll时会设置EPOLLONESHOT事件只触发一次,需要在处理完后重置
int
ACE_Dev_Poll_Reactor::register_handler_i (ACE_HANDLE handle,ACE_Event_Handler *event_handler,ACE_Reactor_Mask mask)
{ACE_TRACE ("ACE_Dev_Poll_Reactor::register_handler_i");if (handle == ACE_INVALID_HANDLE|| mask == ACE_Event_Handler::NULL_MASK){errno = EINVAL;return -1;}if (this->handler_rep_.find (handle) == 0){// Handler not present in the repository.  Bind it.if (this->handler_rep_.bind (handle, event_handler, mask) != 0)return -1;#if defined (ACE_HAS_EVENT_POLL)Event_Tuple *info = this->handler_rep_.find (handle);struct epoll_event epev;ACE_OS::memset (&epev, 0, sizeof (epev));static const int op = EPOLL_CTL_ADD;epev.data.fd = handle;epev.events  = this->reactor_mask_to_poll_event (mask);// All but the notify handler get registered with oneshot to facilitate// auto suspend before the upcall. See dispatch_io_event for more// information.if (event_handler != this->notify_handler_)epev.events |= EPOLLONESHOT;if (::epoll_ctl (this->poll_fd_, op, handle, &epev) == -1){ACELIB_ERROR ((LM_ERROR, ACE_TEXT("%p\n"), ACE_TEXT("epoll_ctl")));(void) this->handler_rep_.unbind (handle);return -1;}info->controlled = true;#endif /* ACE_HAS_EVENT_POLL */}else{// Handler is already present in the repository, so register it// again, possibly for different event.  Add new mask to the// current one.if (this->mask_ops_i (handle, mask, ACE_Reactor::ADD_MASK) == -1)ACELIB_ERROR_RETURN ((LM_ERROR, ACE_TEXT("%p\n"), ACE_TEXT("mask_ops_i")),-1);}#ifdef  ACE_HAS_DEV_POLLstruct pollfd pfd;pfd.fd      = handle;pfd.events  = this->reactor_mask_to_poll_event (mask);pfd.revents = 0;// Add file descriptor to the "interest set."if (ACE_OS::write (this->poll_fd_, &pfd, sizeof (pfd)) != sizeof (pfd)){(void) this->handler_rep_.unbind (handle);return -1;}
#endif /*ACE_HAS_DEV_POLL*/// Note the fact that we've changed the state of the wait_set_,// which is used by the dispatching loop to determine whether it can// keep going or if it needs to reconsult select ().// this->state_changed_ = 1;return 0;
}

事件处理

  • 使用epoll时,每次只监听一个事件
  • 当处理中断事件时,sig_pending_`为1表示正在处理中断事件,将中断标志复位为0,等待下一轮事件循环分发io,定时,通知事件
  • 先处理定时器事件
  • 然后处理网络io事件,对于epoll,事件处理器不是通知事件处理器时,将仓库中对应的信息suspended修改为true,调用完事件处理器后,resume_handler_i 重置事件,等待下一次事件再次触发,suspended修改为false,controlled修改为true
// Wait for an event.int const nfds = ::epoll_wait (this->poll_fd_,&this->event_,1,static_cast<int> (timeout));
int
ACE_Dev_Poll_Reactor::handle_events_i (ACE_Time_Value *max_wait_time,Token_Guard &guard)
{ACE_TRACE ("ACE_Dev_Poll_Reactor::handle_events_i");int result = 0;// Poll for events//// If the underlying event wait call was interrupted via the interrupt// signal (i.e. returned -1 with errno == EINTR) then the loop will// be restarted if so desired.do{result = this->work_pending_i (max_wait_time);if (result == -1 && (this->restart_ == 0 || errno != EINTR))ACELIB_ERROR ((LM_ERROR, ACE_TEXT("%t: %p\n"), ACE_TEXT("work_pending_i")));}while (result == -1 && this->restart_ != 0 && errno == EINTR);if (result == 0 || (result == -1 && errno == ETIME))return 0;else if (result == -1){if (errno != EINTR)return -1;// Bail out -- we got here since the poll was interrupted.// If it was due to a signal registered through our ACE_Sig_Handler,// then it was dispatched, so we count it in the number of events// handled rather than cause an error return.if (ACE_Sig_Handler::sig_pending () != 0){ACE_Sig_Handler::sig_pending (0);return 1;}return -1;}// Dispatch an event.return this->dispatch (guard);
}int
ACE_Dev_Poll_Reactor::dispatch (Token_Guard &guard)
{ACE_TRACE ("ACE_Dev_Poll_Reactor::dispatch");// Perform the Template Method for dispatching the first located event.// We dispatch only one to effectively dispatch events concurrently.// As soon as an event is located, the token is released, allowing the// next waiter to begin getting an event while we dispatch one here.int result = 0;// Handle timers early since they may have higher latency// constraints than I/O handlers.  Ideally, the order of// dispatching should be a strategy...if ((result = this->dispatch_timer_handler (guard)) != 0)return result;// If no timer dispatched, check for an I/O event.result = this->dispatch_io_event (guard);return result;
}int
ACE_Dev_Poll_Reactor::resume_handler_i (ACE_HANDLE handle)
{ACE_TRACE ("ACE_Dev_Poll_Reactor::resume_handler_i");Event_Tuple *info = this->handler_rep_.find (handle);if (info == 0)return -1;if (!info->suspended)return 0;ACE_Reactor_Mask mask = info->mask;if (mask == ACE_Event_Handler::NULL_MASK){info->suspended = false;return 0;}// Place the handle back in to the "interest set."//// Events for the given handle will once again be polled.#if defined (ACE_HAS_EVENT_POLL)struct epoll_event epev;ACE_OS::memset (&epev, 0, sizeof (epev));int op = EPOLL_CTL_ADD;if (info->controlled)op = EPOLL_CTL_MOD;epev.events  = this->reactor_mask_to_poll_event (mask) | EPOLLONESHOT;epev.data.fd = handle;if (::epoll_ctl (this->poll_fd_, op, handle, &epev) == -1)return -1;info->controlled = true;#elsestruct pollfd pfd[1];pfd[0].fd      = handle;pfd[0].events  = this->reactor_mask_to_poll_event (mask);pfd[0].revents = 0;if (ACE_OS::write (this->poll_fd_, pfd, sizeof (pfd)) != sizeof (pfd))return -1;#endif  /* ACE_HAS_EVENT_POLL */info->suspended = false;return 0;
}
http://www.lqws.cn/news/564571.html

相关文章:

  • 高性能 List 转 Map 解决方案(10,000 元素)
  • 阿里云-接入SLS日志
  • HarmonyOS NEXT仓颉开发语言实战案例:健身App
  • HarmonyOS NEXT仓颉开发语言实战案例:小而美的旅行App
  • [分布式并行] 流水线并行 PP(NaivePP/GPipe/F-then-B/PipeDream/1F1B)
  • MCPA2APPT 智能化演示文稿系统:A2A、MCP、ADK 三大架构全流程自动化
  • 区块链技术: 稳定币USDC的工作原理
  • 【八股消消乐】消息队列优化—消息丢失
  • python pyecharts 数据分析及可视化(2)
  • 基于Pandas和FineBI的昆明职位数据分析与可视化实现(三)- 职位数据统计分析
  • MAC 地址在 TCP 网络中的全面解析:从基础概念到高级应用
  • 【Redis原理】Redis事务与线程模型
  • StarRocks 3.5 新特性解读:Snapshot 快照恢复、大导入性能全面升级、分区管理更智能
  • opensuse/debian grub启动界面太模糊?
  • Wpf布局之WrapPanel面板!
  • 3.1.1、CAN总线单个设备环回测试
  • Git常见使用
  • WPF学习笔记(11)数据模板DataTemplate与数据模板选择器DataTemplateSelector
  • Mybatis学习总结
  • 鸿蒙5:自定义构建函数
  • 力扣第84题-柱状图中最大的矩形
  • Leetcode 3600. Maximize Spanning Tree Stability with Upgrades
  • Docker安装的gitlab配置ssl证书
  • 协作机器人优化自动化工作流程,提升工作效率
  • vue3报错No known conditions for “./lib/locale/lang/zh-cn”
  • HTML响应式Web设计
  • 链表题解——环形链表 II【LeetCode】
  • RK3588集群服务器性能优化案例:电网巡检集群、云手机集群、工业质检集群
  • Qwen2.5-7B-Instruct模型推理速度与量化对比分析
  • 【数据集】中国2016-2022年 城市土地利用数据集 CULU