博客
关于我
sparkstreaming消费kafka数据,如果发生消息积压,如何处理?
阅读量:634 次
发布时间:2019-03-14

本文共 2246 字,大约阅读时间需要 7 分钟。

为什么会出现消息积压?

原因分析

在默认配置下,Spark Streaming通过接收器(或者Direct方式)以生产者生成数据的速率接收数据。当批处理时间(batch processing time)超过批处理间隔(batch interval)的时候,即每个批次数据处理的时间都比Spark Streaming批处理间隔时间长;随着越来越多的数据被接收,而数据的处理速度无法跟上,系统可能会出现数据堆积,进一步可能导致Executor端内存溢出(OOM)的问题从而引发失败。

在Spark 1.5版本之前,为了解决这个问题,对于基于接收器的数据接收器,我们可以通过配置spark.streaming.receiver.maxRate参数来限制每个接收器每秒钟最大可以接收的记录数量;对于Direct Approach的数据接收,我们可以通过配置spark.streaming.kafka.maxRatePerPartition参数来限制每次作业中每个Kafka分区最少读取的记录条数。通过这种方式限制接收速率,能够合理配合处理能力。然而,这种方法还有几个问题:

  • 需要手动估算集群的处理速度以及消息数据的产生速度
  • 这两种方式都需要人工干预,修改好相关参数后,还需要手动重启Spark Streaming
  • 如果当前集群的处理能力高于我们配置的maxRate,而且生产者产生的数据量高于maxRate,这样会导致集群资源利用率低下,并且也会导致数据无法及时处理
  • 反压机制的介绍

    那么是否有办法不需要人工干预让Spark Streaming系统自动处理这些问题呢?是的!Spark 1.5版本引入了反压(Back Pressure)机制,其通过动态收集系统的一些数据来自动适配集群的数据处理能力。

    之前的Spark Streaming架构

    在Spark 1.5版本之前,Spark Streaming的架构如下:

  • 数据源源不断通过接收器接收,当数据被接收到后,它会被存储到Block Manager中;为了避免数据丢失,还会备份到其他的Block Manager中
  • Receiver Tracker接收到这些存储的Block IDs,并维护一个时间到这些Block IDs的关系
  • Job Generator每隔batchInterval的时间就生成一个JobSet
  • Job Scheduler运行上面生成的JobSet
  • Spark 1.5及以后的架构

    为了实现对数据传输速率的自动调节,在原有架构上新增了一个名为RateController的组件,它集成在StreamingListener中,负责监控所有作业的onBatchCompleted事件。基于processingDelay,samplingDelay、当前批次记录条数及处理完成事件,RateController可以估算出一个速率。这个速率会被用来更新流的每秒能处理的最大记录数。速率估算器(RateEstimator)有多种实现,但目前Spark 2.2只支持基于PID的速率估算器。

    具体来说,RateController内会存下计算好的最大速率,这个速率在处理完onBatchCompleted事件后会被推送到ReceiverSupervisorImpl。这样接收器就知道每秒能接收多少条数据。需要注意的是,当用户配置了spark.streaming.kafka.maxRatePerPartition时,最终接收的数据量取决于三个值中的最小值,即每个接收器或每个Kafka分区每秒能处理的数据不会超过spark.streaming.receiver.maxRate或spark.streaming.kafka.maxRatePerPartition的值。

    具体流程可以参考下图所示(图中描述了详细的数据流和反压机制的调节过程)。

    如何启用反压机制

    在Spark中启用反压机制非常简单,只需将spark.streaming.backpressure.enabled设置为true即可,默认值为false。

    在使用反压机制时,还需要注意一些相关参数:

    • spark.streaming.backpressure.initialRate:在启用反压机制时,每个接收器接收第一批数据时的最大初始速率,默认值未设置
    • spark.streaming.backpressure.rateEstimator:速率估算器类,默认值为pid,目前Spark只支持这个,用户可以根据需要实现
    • spark.streaming.backpressure.pid.proportional:用于响应错误的权重(最后批次和当前批次之间的变化),默认值为1,只能设置为非负数
    • spark.streaming.backpressure.pid.integral:用于对错误积累做出响应,有抑制作用,默认值为0.2
    • spark.streaming.backpressure.pid.derived:用于对错误趋势做出响应,可能会导致批次大小波动,但也能辅助快速增加或减少处理能力,默认值为0
    • spark.streaming.backpressure.pid.minRate:可以估算的最低费率,,默认值为100,设置为非负数

    通过以上设置,可以有效地利用反压机制来动态调整数据处理速率,最大限度地发挥集群的处理能力,同时避免因为消息堆积而引发的资源浪费或系统故障。

    转载地址:http://tfooz.baihongyu.com/

    你可能感兴趣的文章
    Objective-C实现1000 位斐波那契数算法(附完整源码)
    查看>>
    Objective-C实现2 个数字之间的算术几何平均值算法(附完整源码)
    查看>>
    Objective-C实现2d 表面渲染 3d 点算法(附完整源码)
    查看>>
    Objective-C实现2D变换算法(附完整源码)
    查看>>
    Objective-C实现3n+1猜想(附完整源码)
    查看>>
    Objective-C实现3n+1猜想(附完整源码)
    查看>>
    Objective-C实现9x9乘法表算法(附完整源码)
    查看>>
    Objective-C实现9×9二维数组数独算法(附完整源码)
    查看>>
    Objective-C实现A*(A-Star)算法(附完整源码)
    查看>>
    Objective-C实现A-Star算法(附完整源码)
    查看>>
    Objective-C实现abbreviation缩写算法(附完整源码)
    查看>>
    Objective-C实现ABC人工蜂群算法(附完整源码)
    查看>>
    Objective-C实现activity selection活动选择问题算法(附完整源码)
    查看>>
    Objective-C实现adaboost算法(附完整源码)
    查看>>
    Objective-C实现Adler32算法(附完整源码)
    查看>>
    Objective-C实现AES算法(附完整源码)
    查看>>
    Objective-C实现AffineCipher仿射密码算法(附完整源码)
    查看>>
    Objective-C实现aliquot sum等分求和算法(附完整源码)
    查看>>
    Objective-C实现all combinations所有组合算法(附完整源码)
    查看>>
    Objective-C实现all permutations所有排列算法(附完整源码)
    查看>>