es与netty

es使用netty来通信,实现分布式的功能,但在某些场景会oom。

1:netty概述
IO 多路复用
NioEventLoop.run select由jdk实现在win下使用select,在linux下使用epoll。
linux下也可以使用netty实现的epoll:EpollEventLoop。

线程模型
parent NioEventLoop[listen] [1]---- [n] children NioEventLoop[established] [1]----[n] channel [1]---- [1] channelPipeline[channelInboud/Outboundhandler list]。
1:每个NioEventLoop绑定一个线程。
2:parent NioEventLoop负责监听连接请求,并将建立连接请求轮询分配给childrenNioEventLoop。
3:一个child NioEventLoop通过select/epoll处理所负责的N个channels上的read,write事件。且一个channel上的read,write事件只能在该NioEventLoop线程上处理,并发安全。
4:具体处理时,由该channel对应的channelPipeline中的各个channelHandler顺序处理。

2:es对netty的使用
TCP/IP
1:一个request/response可能会因为分包(最大64k),对应产生多次channelRead/write。
2:一个数据包channelRead也可能因为tcp的延迟发送(粘包),而包括多个request,或者一个request的后半部分和下一个request的前半部分。
3:所以在channelPipeline中处理channelRead的数据包时,需要累积多个数据包数据组成一个完整的request,或者从一个数据包中拆分出多个request。

引起oom
当多个size很大的request,使用不同的channel,发送到不同的NioEventLoop处理时。

由于需要累积完整的数据包,才能交给下一个handler去反序列化成对象,进而处理请求。所以,多个channel中累计部分的request的数据时,可能会引起oom。

相关代码:
ByteToMessageDecoder
Netty4SizeHeaderFrameDecoder
Netty4MessageChannelHandler
Netty4Transport
TcpTransport

3:限流实现避免oom
增加总的计数器(breaker),读取request的第一个数据包,从中header中读取request的size,如果超过limit限制,则舍弃该request的本次,及后续的所有数据包。
注意
1:request的第一个数据包可能在一个新包的开头。也可能因为延迟发送,和其他request在一个数据包中。
2:累加器如果成功加上,则要读取完整size的request后,交给es处理请求,并保证处理后减去size。累加器如果超过limit,则需要从多个数据包中,舍弃size长度的数据,并交给es处理shardFailure。
3:可以设置request的status,指定只对某些大的结果(如:聚合)做限流。

原文地址:https://www.cnblogs.com/vsop/p/12703618.html

时间: 04-15

es与netty的相关文章

记一次netty版本冲突,报java.lang.NoSuchMethodError: io.netty.util.internal.ObjectUtil.checkPositive的问题

elasticsearch 5.6中使用TransportClient初始化抛异常 在引入elasticsearch5.6的transportclient包中,会引入netty进行通信. <!-- transport客户端 --> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version

springboot2.0整合es的异常总结

异常: availableProcessors is already set to [4], rejecting [4] 在启动类中加入 System.setProperty("es.set.netty.runtime.available.processors", "false"); 异常:org.elasticsearch.client.transport.NoNodeAvailableException: None of the configured nodes

springboot整合es availableProcessors is already set to [2], rejecting [2]

https://www.jianshu.com/p/4d6bedded895 启动类主方法里面加一句 System.setProperty("es.set.netty.runtime.available.processors", "false"); 原文地址:https://www.cnblogs.com/Uzai/p/11328392.html

8、2 es数据库的使用

1.注意问题.es和redis同时使用会报错 解决: package com.bw; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; //import lombok.extern.slf4j.Slf4j; //@Slf4j @Component public class ElasticSearchConfiguration imp

elasticsearch 数据丢失的分析

大家关注分布式系统可靠性的话,也许读过Call me maybe这个博客系列.博主Kyle Kingsbury用自己开发的Jepsen测试过多种分布式系统(NoSQL.Message Queue等),其中包括我们常用的Elasticsearch(简称ES),先后针对ES 1.1.0和ES 1.5.0测过两轮.得到相同的悲观结论:因为可靠性原因,ES不能作为主存储.具体来说,ES节点在录入数据时,如果进程由于软硬件原因崩溃,即使重新启动,一段时间内录入的数据也会丢失无法恢复.所以不适合要求数据10

java使用elasticsearch分组进行聚合查询(group by)

java连接elasticsearch 进行聚合查询进行相应操作 一:对单个字段进行分组求和 1.表结构图片: 根据任务id分组,分别统计出每个任务id下有多少个文字标题 1.SQL:select id, count(*) as sum from task group by taskid;   java ES连接工具类 public class ESClientConnectionUtil { public static TransportClient client=null; public f

java使用elasticsearch进行模糊查询之must使用

java使用elasticsearch进行多个条件模糊查询 文章说明: 1.本篇文章,本人会从java连接elasticsearch到查询结果生成并映射到具体实体类(涵盖分页功能) 2.代码背景:elasticsearch版本为:5.2.0; 3.本人以下代码是分别从两个索引中查询数据,再将两个数据进行整合,如果大家只需要分组查询,那么则选取文章中的分组查询部分代码 4.本人的实体类主要是按照layUI分页框架进行设计:实体大家可以根据自己的具体需求进行设计 一.java连接elasticsea

【转】netty-transport版本冲突

Springboot整合Elasticsearch报错 今天使用SpringBoot整合Elasticsearch时候,相关的配置完成后,启动项目就报错了. nested exception is java.lang.IllegalStateException: availableProcessors is already set to [4], rejecting [4] 我网上查询了一下,有人是是因为整合了Redis的原因.但是我把Redis相关的配置去掉后,问题还是没有解决,最后有人说是因

SpringBoot电商项目实战 — ElasticSearch接入实现

如今在一些中大型网站中,搜索引擎已是必不可少的内容了.首先我们看看搜索引擎到底是什么呢?搜索引擎,就是根据用户需求与一定算法,运用特定策略从互联网检索出制定信息反馈给用户的一门检索技术.搜索引擎依托于多种技术,如网络爬虫技术.检索排序技术.网页处理技术.大数据处理技术.自然语言处理技术等,为信息检索用户提供快速.高相关性的信息服务.搜索引擎技术的核心模块一般包括爬虫.索引.检索和排序等,同时可添加其他一系列辅助模块,以为用户创造更好的网络使用环境. image 基于Java的搜索引擎框架,目前市

电商订单ElasticSearch同步解决方案--使用logstash

一.使用logstash同步订单数据(订单表和订单项表)到ElasticSearch: 1.到官网下载logstash:https://www.elastic.co/cn/downloads/logstash 2.安装logstash前,确保需要先安装java的jdk环境 3.下载后,解压:之后千万别到bin环境点击logstash.bat这个命令启动,这样会报错的 4.接下来,在logstash安装目录找到config文件夹,在那里新增一个文件夹,我新建的为shop文件夹,然后在里面添加如下文