博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SparkSQL数据DataFrame向ElasticSearch写入的优化,亲测提高数倍
阅读量:2179 次
发布时间:2019-05-01

本文共 2907 字,大约阅读时间需要 9 分钟。

  1. 前言

    最近sparksql写入elasticsearch数据量逐渐增大,所以需要优化写入的速度.
    先说一下集群情况.
    es集群:elasticsearch-6.2.4, 机器配置:5台4C*16G阿里云机器.
    spark: 2.1.3.

  2. 优化方向

    从spark参数和es索引两个方向进行优化

  3. spark参数

    es提供了一套完整的hadoop生态支持.可以完整的支持spark读写es.
    在spark的config可以设置两个参数,es.batch.size.bytes 以及es.batch.size.entries,这两个参数表示写入时es bulk操作的batch大小和条数.这些设置对应到每个task中.hadoop/spark 相关配置信息见链接:
    这两个参数默认1mb和1000条,在调节的时候也不是越大越好,需要根据自己的数据不断的去测试.

  4. index参数

    1. mapping设置
      精细设置每个字段的type类型,不需要分词的字段尽量设置为keyword.所有的字段都精细设置.分词不仅会占用额外空间,还很影响写入速度.
      如果没有场景用到_all字段,那么禁用_all字段.
    2. setting设置
      1. number_of_replicas:在创建索引的时候(或者数据写入的时候),设置副本数为0,number_of_replicas:0,在大量数据写入的时候,数据从主分片往副本同步数据,也是很消耗资源的.在数据写入完成后,可以恢复副本,副本的主要作用是容灾和负载均衡,可以参考elasticsearch权威指南:
      2. refresh_interval:在实时性要求没那么高的时候,可以调大索引refresh_interval参数.因为我的是离线项目,所以我这里设置为-1. 默认情况下写入到es的数据并不是马上就刷到磁盘,先放在 in-memory buffer,但客户端是读取不到in-memory buffer中的数据,为了实时查询,需要定期(默认1s)将该数据刷写到介于es和磁盘之间的filesystem cache 即refresh,该操作轻量级的 。写入到filesystem cache相当于创建新的segment 是可以被客户端读取到的, 默认属性(阈值是1s)由于快速的刷数据导致很多小量的filesystem cache,同时写入到filesystem cache仍然有一些性能消耗,所以根据应用的使用场景,如果是关注写入速度并不关注实时查询,可以适当调整默认的阈值的,该属性是在创建索引(属性值为:index.refresh_interval)的时候设置的.
      3. translog.durability: 设置translog.durability为async.translog.durability 默认值是request,该属性类似于hbase 中的WAL,是为了防止数据写入后 ,还没有落盘之前 出现宕机导致数据丢失。client提交request后,除了在in-memory buffer 添加新的文档/操作,还需要在translog 追加新的文档/操作 ,以保证数据的可靠性 。 其实translog.durability 属性可以设置为async,当然async并不能保证新的文档/操作一定写入到trans-log,如果写到translog 没有成功 刚好这时也出现宕机,重启后从translog恢复时就会有数据丢失,所以我们需要在做好数据可靠性和写入效率之间做好权衡后再设置 。translog在 满足下面其中一个条件时 会执行commit 请求即filesystem cache 刷写到磁盘 condition1:缓存数据达到512M(default) condition2 : 缓存时间达到5s(default),commit成功后translog会删除. 这个参数也是在创建索引的时候设置的.
        4.translog.sync_interval, translog.flush_threshold_size 设置translog.sync_interval, translog.flush_threshold_size,这两个参数的作用已经在上面说明了.可以再创建索引的时候把这两个参数设大一些.
  5. 修改elasticsearch的yml文件

    网上很多博客都说修改yml的部分参数,也可以优化写入速度,但是我没有测.还是写出来供大家参考.

    1. indices.memory.index_buffer_size,indices.memory.min_index_buffer_size,indexing buffer在为 doc 建立索引时使用,当缓冲满时会刷入磁盘,生成一个新的 segment, 这是除refresh_interval外另外一个刷新索引,生成新 segment 的机会. 每个 shard 有自己的 indexing buffer,下面的关于这个 buffer 大小的配置需要除以这个节点上所有的 shard 数量
    2. 设置index、merge、bulk、search的线程数和队列数
      # Search poolthread_pool.search.size: 5thread_pool.search.queue_size: 100# 这个参数慎用!强制修改cpu核数,以突破写线程数限制# processors: 16# Bulk poolthread_pool.bulk.size: 16thread_pool.bulk.queue_size: 300# Index poolthread_pool.index.size: 16thread_pool.index.queue_size: 300
      1. 设置节点之间的故障检测配置,
        大数量写入的场景,会占用大量的网络带宽,很可能使节点之间的心跳超时。并且默认的心跳间隔也相对过于频繁(1s检测一次)
      discovery.zen.fd.ping_timeout: 120sdiscovery.zen.fd.ping_retries: 6discovery.zen.fd.ping_interval: 30s
  6. 其他调整

    1. 使用自动生成doc id, 网上很多博客都说使用自动生成文档id能比指定文档id块很多, 可能是我水平有限.我测下来使用自动生成稳定id是比指定文档id快一些,但是没有快到那么多.
    2. 磁盘,SSD磁盘比机械磁盘速度能快上不少,亲测,推荐使用SSD磁盘.
    3. 合理的分片数,建索引的时候设置合适的分片数,也可以提高数据写入的速度.这个参数需要根据自己的集群规模以及机器配置去测.
    4. 合适的mapping字段数.这个亲测,同样数据量的两个Dataframe,一个字段数在60左右,一个在280左右,建立对应字段数的索引,写入速度大概差3~4倍左右.
  7. 最后, 我这里只搭配调整了3,4,6里的大部分设置,写入速度大概提升了4~5倍左右. 这里只是记录了实际对写入有提升的一些配置,还没有深入研究有扩展继续填坑.

    参考:

转载地址:http://ykjkb.baihongyu.com/

你可能感兴趣的文章
【学习方法】如何分析源代码
查看>>
【LEETCODE】61- Rotate List [Python]
查看>>
【LEETCODE】143- Reorder List [Python]
查看>>
【LEETCODE】82- Remove Duplicates from Sorted List II [Python]
查看>>
【LEETCODE】86- Partition List [Python]
查看>>
【LEETCODE】147- Insertion Sort List [Python]
查看>>
【算法】- 动态规划的编织艺术
查看>>
用 TensorFlow 让你的机器人唱首原创给你听
查看>>
对比学习用 Keras 搭建 CNN RNN 等常用神经网络
查看>>
深度学习的主要应用举例
查看>>
word2vec 模型思想和代码实现
查看>>
怎样做情感分析
查看>>
用深度神经网络处理NER命名实体识别问题
查看>>
用 RNN 训练语言模型生成文本
查看>>
RNN与机器翻译
查看>>
用 Recursive Neural Networks 得到分析树
查看>>
RNN的高级应用
查看>>
TensorFlow-7-TensorBoard Embedding可视化
查看>>
轻松看懂机器学习十大常用算法
查看>>
一个框架解决几乎所有机器学习问题
查看>>