【Netty源码分析】04 服务端读流程
读流程
客户端接入后,下面一步操作就是读取客户端传输过来的数据,这一节我们就来分析下服务端读取客户端数据流程。从前面分析来看,channel
的事件轮询、事件处理是在NioEventLoop
的run
方法中,从这里我们就很容易找我服务端读流程的入口方法:processSelectedKeys()
。
从processSelectedKeys()
一直追踪下去,可以看到OP_READ
处理逻辑分支:
(资料图)
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read();}
可能你会比较奇怪:为什么OP_READ
和OP_ACCEPT
都会走这个分支?
OP_ACCEPT
是NioServerSocketChannel
处理的事件,而OP_READ
是NioSocketChannel
处理的事件,所以,虽然它们都走这个分支,但是channel类型确是不一样的,即这里的unsafe
类型也不一样,一个是:NioMessageUnsafe
,另一个是:NioSocketChannelUnsafe
。NioServerSocketChannel
负责监听客户端连接,当有客户端连接进入时,对它来说就是有个读入消息需要被处理。这里我们是处理client channle
的OP_READ
,所以,unsafe
是NioSocketChannelUnsafe
类型实例。
AbstractNioByteChannel.NioByteUnsafe#read
方法代码如下:
public final void read() { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { // 申请ByteBuf对象 byteBuf = allocHandle.allocate(allocator); //doReadBytes(byteBuf):将数据读取到ByteBuf中 //lastBytesRead()将读取的字节数设置到lastBytesRead allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { readPending = false; } break; } allocHandle.incMessagesRead(1); readPending = false; //触发pipeline channelRead事件,将读入数据ByteBuf传入到handler中 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading());//判断是否继续读取 allocHandle.readComplete(); //触发pipeline channelReadComplete pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } }}
这个方法刨除其它逻辑,关于客户端数据处理逻辑主要包括3个步骤:
allocHandle.lastBytesRead(doReadBytes(byteBuf))
:调用java api
,从channel
中读取字节数据到ByteBuf
缓存中;pipeline.fireChannelRead(byteBuf)
:触发pipeline
的channelRead
事件,并将带有读入数据的ByteBuf
通过参数传入;pipeline.fireChannelReadComplete()
:触发pipeline
的channelReadComplete
事件;事件传播
调用pipeline
的fireChannelRead()
就可触发channelRead
事件在handler
之间传播,事件传播这块代码比较绕,给人感觉不停的来回调用容易绕晕,下面通过图可以更加直观的看出调用流程,再配合代码就很好理解了。
关键点就在于HandlerContext
中提供了一个静态方法:invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)
,第一个是在哪个handler
上触发事件,第二个参数就是数据本身,通过这个方法就可以指定在哪个handler
上触发channelRead
事件。由于pipeline
中的handler
是被包装成HandlerContext
放入的,所以,可以通过handler()
方法找到真正的handler
对象进行触发。
比如pipeline
的fireChannelRead()
就是触发head
的channelRead
事件,如果处理完成需要把事件继续传播给下一个handler
,就需要调用ctx.fireChannelRead(msg)
方法即可,该方法中通过next
属性获取到下一个节点,然后执行static invokeChannelRead(next, msg)
这个方法就可以将事件传播到下一个节点上。
pipeline.fireChannelRead(byteBuf)
运行完成后会调用pipeline.fireChannelReadComplete()
方法,触发channelReadComplete
事件,执行机制和channelRead
事件一样,就不再赘述。
搞清楚上面原理,就很容易理解
ctx.fireChannelRead()
和ctx.pipeline().fireChannelRead()
之间的区别了,避免误用。
Pipeline线程模型
上面分析的都是常规模式,没有给handler
指定额外线程情况下channelRead
和channelReadComplete
传播机制,大致如下图:
先触发channelRead
事件,按照pipeline
中顺序依次触发,当所有handler
都触发完后,再触发channelReadComplete
事件,按照pipeline
中的顺序依次触发。这些所有流程采用的都是同步方式,在同一个线程中执行,这个线程就是channel
注册的NioEventLoop
。
我们来看下static void invokeChannelRead()
这个方法:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); }}
在执行next.invokeChannelRead(m)
方法前有个executor.inEventLoop()
判断,判断当前执行线程是不是就是handler
执行所需的线程。执行handler
方法是不能随便线程都可以去执行的,必须使用handler
内部指定的executor
线程执行器中执行才行。如下图,也就是说红色框框中的内容必须在executor
线程执行器中执行,如果当前线程和handler
执行线程不是同一个,就需要进行线程切换:则调用封装成一个任务,提交到executor
的任务队列中让其执行。
executor
线程执行器是通过next.executor()
方法获取到的,从这个方法源码中可以看到获取逻辑:如果HandlerContext
中executor
有值则直接返回;否则返回channel
注册的NioEventLoop
作为线程执行器。
在添加handler
时可以指定一个EventGroup
:pipeline.addLast( bizGroup, "handler2", new OtherTest02());
,这样,再把handler
包装成HandlerContext
过程中会从这个EventGroup
根据chooser
选取策略获得一个EventLoop
赋值给executor
。
所以,从上面分析,默认情况下handler
都是在channel
注册的NioEventLoop
线程中执行的,除非在addLast
添加handloer
时特别指定。
下面我们通过一个案例分析下pipeline
线程模型,如下,给handler02
添加一个额外的线程池:
EventLoopGroup bizGroup = new NioEventLoopGroup(10, new ThreadFactoryBuilder().setNameFormat("biz-%s").build());protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast( "handler01", new OtherTest01()); pipeline.addLast( bizGroup, "handler02", new OtherTest02()); pipeline.addLast( "handler03", new OtherTest03());}
这时,channelRead
和channelReadComplete
事件触发流程见下图:
channelRead
事件执行流程说明:
handler01
的channelRead
事件,本身当前线程和handler01
是同一个线程,所以,直接调用handler#channelRead()
方法;handler01#channelRead()
方法执行完成后,事件继续向下传播,需要调用handler02#channelRead()
方法,但是handler02
执行线程并不是默认的channel
的注册线程,而是额外设置的biz
线程,需要将调用包装成一个任务提交到biz
线程的任务队列taskQueue
中,然后直接返回;biz线程执行器内部线程会一直循环从taskQueue
中获取任务执行,这样就完成了线程切换效果;当handler02#channelRead()
方法执行完成后,需要执行handler03#channelRead()
,它们又不在同一个线程中执行,这时有需要切换线程,所以会把handler03#channelRead()
的调用封装成一个任务提交到register eventLoop的taskQueue
中,待其内部线程提取执行;下面再来看下channelReadComplete
事件执行流程:
a1
将任务提交给taskQueue
任务队列后直接返回了,而不是等其执行完成再返回;a1
返回后,从源码分析来看,会立即触发channelReadComplete
事件,涉及到线程切换,同理b1
这里也是将handler02#channelReadComplete()
调用封装成任务放入到biz eventLoop
的taskQueue
中的,然后也直接返回了;这样,biz eventLoop
线程执行器taskQueue
中就有两个任务,会按照顺序依次执行:先执行channelRead()
调用,再执行channelReadComplete()
调用;执行a3、b3
时同理;总结
从上面可以看出,Pipeline
中handler
可以在不同线程间切换得到关键是:taskQueue
。还要一点非常重要:handler
线程池执行器默认使用的channel
注册的NioEventLoop
这个,NioEventLoop
采用的是单线程工作模式,同时还需要处理selector.select()
事件轮询,所以,handler
里肯定不能有耗时、特别是IO
阻塞等操作,不然卡在handler
中,selector#select()
执行不到,无法及时接收到客户端传送过来的数据。
标签:
- 宿舍因床位过多通道仅留1人宽 难道没有安全隐患吗?
- 环球快报:三星电子明年将为开发者推出XR设备 特别工作组正在研发
- 世界速讯:汉嘉设计:近期公司作为联合体成员之一中标了钱潮嘉苑共有产权房项目EPC工程总承包工程以及北京师范大学丽水实验学校项目工程总承包(EPC)
- 重庆布局实施167个市级重大制造业项目 前7月投资进度71.6%
- 券商第一梯队海通证券遭遇多事之秋:业绩与规模“双降”
- 纸浆期货是否有效对冲废黄板纸现货价格波动风险?
- 理文、山鹰发布停机函 包装纸市场涨价100-200元/吨
- 全部合格!广东珠海抽查5批次油墨产品
- 浆系纸种再掀新一轮提价 涨幅达200-1500元/吨
- 保定满城区开展纸制品行业专项检查 规范纸制品企业生产
- 2022年3月14日全国各地区纸厂废纸价格信息
- 包装材料、人工费等成本上升 台湾生活用纸涨价
- 上周木浆系纸品价格均有提涨 箱板纸价小幅下跌
- 景兴纸业2021年营收同比增27.70% 净利同比增41.51%
- 1-2月全国快递业务收入1574.3亿 同比增长13.8%
- 原料成本压力持续上升 浙江多家包装厂产品价格上涨3%
- 3月7日-13日生活用纸主要区域市场周度价格情况
- 安徽出台“十四五”大气污染防治规划
- 原材料/燃料价格上涨 日本卫生纸、纸尿裤提价超10%
- 新加坡超市将对塑料袋收费 至少5分新币/个
- 电子商务兴起 印度纸类包装行业发展趋势
- 2022年1-2月芬兰木材交易同比下滑20%
- 山东造纸行业深入实施“链长制”工作推进机制
- 包装原料价格波动再成热点 揭秘2021造纸上市企业业绩
- 国家统计局:1-2月规上工业增加值同比实际增长7.5%
- 2022年3月18日各地区各大纸厂废纸价格信息
- 江苏开展精准造林绿化 深入推进国土绿化和全民义务植树
- 正隆纸业员工返岗率超95% 预计今年营收同比增10%
- 芬林芬宝劳马新锯材厂将启用自动装载生产线
- 山东一小镇发展纸箱包装生产企业近百家 年产值11亿元
- 打破性别“玻璃天花板” 95岁女院士是“她力量”最佳代言
- 河北辛集市暂停举办体育活动 关闭景区文娱场所
- 红色文物·党史故事 “推出胜利”的小推车
- 侵华日军南京大屠杀遇难同胞纪念馆闭馆
- 核酸采样:一位“点长”的50小时冲刺
- 跑道结冰 哈尔滨机场关闭至9日12时
- 北京地铁全面开启车内加热装置
- 黑河市多举措保障疫情期间残疾人等特殊群体生活稳定
- 北京丰台海淀两处管控区域解封 社区工作者收到“暖心礼物”
- 吉林四平一旅游项目违占耕地两千多亩 投资达10亿元
- 湖南双峰27名非法滞留缅北人员被惩戒:小孩回原籍入学
- 江西新增本土“1+6” 上饶增一中风险地区
- 江西上饶一地调整为中风险地区 实行封闭管理措施
- 快递旺季遭遇雨雪天气 国家邮政局呼吁理解快递小哥
- 高压、孤独,胆大、心细:手执焊枪的水下“蛙人”
- 掏粪掏了36年,他还在琢磨“新门道”
- 内蒙古:二连浩特市新增1例本土确诊病例 额济纳旗累计治愈出院本土确诊病例76例
- 坚守在海拔4300多米的“天路保健医生”
- 38年后,他终于知道了家在哪儿……
- 受降雪影响 辽宁鞍山一农贸市场发生坍塌
-
中国舞蹈家协会顶尖教师巡回课堂(重庆站)举办
中新网北京11月8日电 (记者 高凯)由中国舞蹈家协会主办,中国文联舞蹈艺术中心、重庆市舞蹈家协会...
-
边城战“疫”:夜晚七点的暂停键
11月4日晚上7点,是中俄边境城市黑河一个再平凡不过的抗疫时刻。 如果在这一刻按下时间的暂停键...
-
风雪高原战“疫”长卷 寒潮下的西宁疫情防控观察
大风7级,大雪纷扬,最高气温只有-5℃! 这是青海省西宁市开启全城全员首轮核酸检测的天气。 ...
-
拟音师:“雕刻”声音的人【三百六十行】
三百六十行 拟音师:“雕刻”声音的人 闭上眼,90后赵洪泽有时甚至可以通过走路的声音,来判...
-
“双减”之后 中小学教师资格考试为何依然火爆
聚焦 “双减”之后,中小学教师资格考试为何依然火爆 近日,2021年下半年中小学教师资格考试(...
-
大数据助力贫困生成长
探索 大数据助力贫困生成长大数据画像能为贫困生成长带来什么 今年9月,云南省楚雄彝族自治州...
-
“大漠明珠”驶上发展快车道 塔里木盆地做足生态大文章
塔里木盆地做足生态大文章 “大漠明珠”驶上发展快车道 从塔里木盆地的西北角到西南角,和田...
-
职校生可报考事业单位 搬走职业教育的一块绊脚石
职校生可报考事业单位 搬走职业教育的一块绊脚石 “职业院校毕业生也可以报考事业单位了。”...
-
打算“双十一”买买买的姐妹 看完这篇再“剁手”
打算“双十一”买买买的姐妹 看完这篇再“剁手” 女性对于保养的热衷超乎想象,不少人只要是听...
-
完美“飞天”仰仗全宇宙最酷飞船试驾员
完美“飞天”仰仗全宇宙最酷飞船试驾员 11月7日,航天员翟志刚、航天员王亚平开展神舟十三号航天...
-
冠状病毒中损伤血管的蛋白首次确定
冠状病毒中损伤血管的蛋白首次确定 国际战“疫”行动 科技日报北京11月4日电 (记者刘霞)不少...
-
新电池结构让飞行汽车成为可能 相关技术将亮相北京冬奥
新电池结构让飞行汽车成为可能 相关技术将亮相北京冬奥会 科技冬奥进行时 搭载全气候电池...
-
H5N8病毒肆虐全球,我国家禽为何“独善其身”
H5N8病毒肆虐全球,我国家禽为何“独善其身” 科技日报哈尔滨11月7日电 (记者李丽云)记者11月7...
-
重庆奉节一民警因公殉职 年仅28岁
中新网重庆11月9日电 (记者 刘相琳)记者9日从重庆市公安局获悉,重庆奉节县公安局民警袁华押解一...
-
哈尔滨市新增本土新冠肺炎确诊病例1例
中新网哈尔滨11月9日电 (记者 刘锡菊)9日,哈尔滨市卫健委发布哈尔滨市11月8日0-24时疫情通报:11...
-
成都本地累计在管密接2757人、次密9097人
(抗击新冠肺炎)成都本地累计在管密接2757人、次密9097人 中新网成都11月9日电 (记者 贺劭清 ...
-
成都累计报告确诊病例23例 出现1传13特殊案例
(抗击新冠肺炎)成都累计报告确诊病例23例 出现1传13特殊案例 中新社成都11月9日电 (记者 贺劭...
-
呼和浩特一学校宿管员扇打学生致双耳鼓膜穿孔 分管校长被免
中新网呼和浩特11月9日电 (记者 张林虎)9日,针对“宿管员扇打学生致其双耳鼓膜穿孔”一事,呼和...
-
郑州通报8例确诊病例和无症状感染者活动轨迹
中新网11月9日电 据郑州市委宣传部官方微信消息,11月8日0至24时,郑州市新增阳性感染者3例,均为...
-
新疆阿克苏果农:我们的生活像苹果一样甜
中新社新疆阿克苏11月9日电 题:新疆阿克苏果农:我们的生活像苹果一样甜 作者 苟继鹏 “我...
-
河北辛集开展大规模消毒消杀工作
今天(9日)上午,河北省辛集市召开疫情防控新闻发布会。会上,辛集市科学技术局局长辛彦卜介绍,新冠...
-
河北辛集新增本土确诊11例 已转运定点医院诊治
今天(9日)上午,河北省辛集市召开疫情防控新闻发布会,辛集市副市长刘士民介绍,2021年11月8日0时至...
-
石家庄深泽县第五轮全员核酸检测结果全部为阴性
11月9日,石家庄市召开第12场新冠肺炎疫情防控工作新闻发布会。发布会上,石家庄市深泽县县长郝英鹏...
-
海口市1例治愈后的境外输入病例复阳 已转至定点医院隔离医学观察
中新网海口11月8日电 (记者 张茜翼)海口市新型冠状病毒感染肺炎疫情防控工作指挥部8日通报称,11...
-
四川新增本土确诊病例4例
中新网11月8日电 据四川省卫健委网站消息,11月7日0-24时,四川新增新型冠状病毒肺炎确诊病例5例(...
-
黑龙江省新增新冠肺炎本土确诊病例6例
中新网哈尔滨11月8日电 (程岩 记者 史轶夫)黑龙江省卫健委8日发布消息,7日0-24时,黑龙江省黑河...
-
河南新增本土确诊病例18例 其中郑州市16例周口市2例
中新网11月8日电 据河南省卫健委官方微博消息,11月7日0—24时,河南省新增本土确诊病例18例(郑州...
-
河北新增确诊病例8例 新增无症状感染者1例
中新网11月8日电 据河北省卫健委网站消息,2021年11月7日0—24时,河北省新增新型冠状病毒肺炎确诊...
-
寒潮持续发威!南方气温纷纷触底 强降雪中心转移至东北
中国天气网讯 今天(11月8日),寒潮继续南下,持续发威,南方大部最高气温将纷纷触底。强降雪中心将...
-
雪后寒!今日北京晴天回归北风劲吹 最高气温5℃上下
中国天气网讯 今天(11月8日)北京晴天回归,但在风寒效应下,“冷”仍然是天气的主题。气温方面,今...
-
黑龙江新增本土确诊病例6例 均在黑河市爱辉区
中新网11月8日电 据黑龙江省卫健委网站消息,2021年11月7日0-24时,黑龙江省新增新冠肺炎本土确诊...
-
寒潮继续影响华东华南等地 东北地区等地有强降雪
中新网11月8日电 据中央气象台网站消息,受寒潮影响,预计11月8日08时至9日08时,黄淮东部、江淮东...
-
辽宁新增本土确诊病例20例 新增本土无症状感染者12例
中新网11月8日电 据辽宁省卫健委网站消息,11月7日0时至24时,辽宁省新增20例本土新冠肺炎确诊病例...
-
寒潮影响“加码”:吉林力保电力供应 停课停运范围加大
中新网长春11月9日电 (记者 郭佳 张瑶)连日来,一轮寒潮引发的强降雪席卷中国北方。位于东北地区...
-
常州连续一周无新增病例 10日全市各类学校将错峰复学
中新网常州11月9日电 (记者 唐娟)11月9日,常州疫情防控指挥部学校防控组对外发布,自11月10起,...
-
哈尔滨机场开放恢复运行 计划航班45架次
中新网哈尔滨11月9日电 (仇建 记者 史轶夫)9日12时22分,随着哈尔滨经阜阳飞往三亚的FU6685航班...
-
山西警方抓获6名“摸金校尉” 缴获“虎枕”等大量文物
中新网长治11月9日电 (记者 李庭耀)记者9日从山西省长治市公安局上党分局获悉,上党警方侦破系列...
-
西藏基层第一书记话产业发展推进乡村振兴
中新网日喀则11月9日电(记者 赵朗)近日,由西藏自治区网信办主办的第一书记话小康活动先后走进山南...
-
内蒙古通辽:强降雪致8个旗县区受灾
中新网通辽11月9日电 (记者 张林虎)9日,记者从内蒙古自治区通辽市应急管理局获悉,自11月5日起,...
-
成都金堂:医护人取消婚礼坚守岗位 手捧花被送到了战“疫”一线
中新网成都11月9日电 (邹立杨)连日来,华西医院金堂县第一人民医院实验医学科的主检验师易维佳都在...
-
江西铅山新一轮核酸检测结果均为阴性
(抗击新冠肺炎)江西铅山新一轮核酸检测结果均为阴性 中新网南昌11月9日电 (记者 吴鹏泉)江西省...
-
辽宁大连幼儿园和中小学学生即日起暂缓入校
中新网11月9日电 据辽宁省大连市人民政府新闻办公室官方微博消息,大连市新冠肺炎疫情防控总指挥部...
-
2021年北京市重点碳排放单位:涉及多家印刷包装企业
3月15日,北京市生态环境局、北京市统计局发布了《关于公布2021年度北京市重点碳排放单位 及一般报告单...
-
北京新增1例本土确诊病例
中新网11月8日电 据北京卫健委官方微博消息,11月7日0时至24时,北京新增1例本土确诊病例,无新增...
-
河北石家庄深泽县7日新增1例无症状感染者 为8岁男童
中新网11月8日电 据石家庄卫健委官方微信消息,石家庄深泽县应对新冠肺炎疫情工作领导小组办公室8...
-
高速封闭、机场关闭、学校停课 辽宁多部门发应急预案应对极端天气
中新网沈阳11月8日电 (李晛 王景巍)7日在寒潮影响下,东北地区局地降大雪。辽宁省气象部门当日连...
-
云南新增本土确诊病例3例 新增本土无症状感染者3例
中新网11月8日电 据云南省卫健委网站消息,11月7日0—24时,云南省新增确诊病例9例,其中境外输入...
-
努力让每个人都有出彩机会
努力让每个人都有出彩机会 “孩子明年要参加中考,成绩一直提不上去,送他读职高,也是一种选择...
-
参与和见证中国水电发展
参与和见证中国水电发展 余吉安的童年是在马来西亚加里曼丹岛的沙捞越州古晋市度过的。家门口的...
-
中国航天:为实现中国梦提供战略支撑
中国航天:为实现中国梦提供战略支撑(科技名家笔谈) 今年是中国共产党成立100周年,也是中国航...
X 关闭
X 关闭