博客
关于我
sparkstreaming消费kafka数据,如果发生消息积压,如何处理?
阅读量:634 次
发布时间:2019-03-14

本文共 2246 字,大约阅读时间需要 7 分钟。

为什么会出现消息积压?

原因分析

在默认配置下,Spark Streaming通过接收器(或者Direct方式)以生产者生成数据的速率接收数据。当批处理时间(batch processing time)超过批处理间隔(batch interval)的时候,即每个批次数据处理的时间都比Spark Streaming批处理间隔时间长;随着越来越多的数据被接收,而数据的处理速度无法跟上,系统可能会出现数据堆积,进一步可能导致Executor端内存溢出(OOM)的问题从而引发失败。

在Spark 1.5版本之前,为了解决这个问题,对于基于接收器的数据接收器,我们可以通过配置spark.streaming.receiver.maxRate参数来限制每个接收器每秒钟最大可以接收的记录数量;对于Direct Approach的数据接收,我们可以通过配置spark.streaming.kafka.maxRatePerPartition参数来限制每次作业中每个Kafka分区最少读取的记录条数。通过这种方式限制接收速率,能够合理配合处理能力。然而,这种方法还有几个问题:

  • 需要手动估算集群的处理速度以及消息数据的产生速度
  • 这两种方式都需要人工干预,修改好相关参数后,还需要手动重启Spark Streaming
  • 如果当前集群的处理能力高于我们配置的maxRate,而且生产者产生的数据量高于maxRate,这样会导致集群资源利用率低下,并且也会导致数据无法及时处理
  • 反压机制的介绍

    那么是否有办法不需要人工干预让Spark Streaming系统自动处理这些问题呢?是的!Spark 1.5版本引入了反压(Back Pressure)机制,其通过动态收集系统的一些数据来自动适配集群的数据处理能力。

    之前的Spark Streaming架构

    在Spark 1.5版本之前,Spark Streaming的架构如下:

  • 数据源源不断通过接收器接收,当数据被接收到后,它会被存储到Block Manager中;为了避免数据丢失,还会备份到其他的Block Manager中
  • Receiver Tracker接收到这些存储的Block IDs,并维护一个时间到这些Block IDs的关系
  • Job Generator每隔batchInterval的时间就生成一个JobSet
  • Job Scheduler运行上面生成的JobSet
  • Spark 1.5及以后的架构

    为了实现对数据传输速率的自动调节,在原有架构上新增了一个名为RateController的组件,它集成在StreamingListener中,负责监控所有作业的onBatchCompleted事件。基于processingDelay,samplingDelay、当前批次记录条数及处理完成事件,RateController可以估算出一个速率。这个速率会被用来更新流的每秒能处理的最大记录数。速率估算器(RateEstimator)有多种实现,但目前Spark 2.2只支持基于PID的速率估算器。

    具体来说,RateController内会存下计算好的最大速率,这个速率在处理完onBatchCompleted事件后会被推送到ReceiverSupervisorImpl。这样接收器就知道每秒能接收多少条数据。需要注意的是,当用户配置了spark.streaming.kafka.maxRatePerPartition时,最终接收的数据量取决于三个值中的最小值,即每个接收器或每个Kafka分区每秒能处理的数据不会超过spark.streaming.receiver.maxRate或spark.streaming.kafka.maxRatePerPartition的值。

    具体流程可以参考下图所示(图中描述了详细的数据流和反压机制的调节过程)。

    如何启用反压机制

    在Spark中启用反压机制非常简单,只需将spark.streaming.backpressure.enabled设置为true即可,默认值为false。

    在使用反压机制时,还需要注意一些相关参数:

    • spark.streaming.backpressure.initialRate:在启用反压机制时,每个接收器接收第一批数据时的最大初始速率,默认值未设置
    • spark.streaming.backpressure.rateEstimator:速率估算器类,默认值为pid,目前Spark只支持这个,用户可以根据需要实现
    • spark.streaming.backpressure.pid.proportional:用于响应错误的权重(最后批次和当前批次之间的变化),默认值为1,只能设置为非负数
    • spark.streaming.backpressure.pid.integral:用于对错误积累做出响应,有抑制作用,默认值为0.2
    • spark.streaming.backpressure.pid.derived:用于对错误趋势做出响应,可能会导致批次大小波动,但也能辅助快速增加或减少处理能力,默认值为0
    • spark.streaming.backpressure.pid.minRate:可以估算的最低费率,,默认值为100,设置为非负数

    通过以上设置,可以有效地利用反压机制来动态调整数据处理速率,最大限度地发挥集群的处理能力,同时避免因为消息堆积而引发的资源浪费或系统故障。

    转载地址:http://tfooz.baihongyu.com/

    你可能感兴趣的文章
    pandas :检测一个DF和另一个DF之间缺失的列
    查看>>
    Pandas-从具有嵌套列表列表的现有列创建动态列时出错
    查看>>
    Pandas-通过对列和索引的值求和来合并两个数据框
    查看>>
    pandas.columns、get_dummies等用法
    查看>>
    pandas.DataFrame.copy(deep=True) 实际上并不创建深拷贝
    查看>>
    pandas.read_csv()的详解-ChatGPT4o作答
    查看>>
    PANDAS.READ_EXCEL()输出‘;溢出错误:日期值超出范围‘;而不存在日期列
    查看>>
    pandas100个骚操作:再见 for 循环!速度提升315倍!
    查看>>
    Pandas:对给定列求和 DataFrame 行
    查看>>
    Pandas、Matplotlib、Pyecharts数据分析实践
    查看>>
    Pandas中文官档~基础用法2
    查看>>
    Pandas中文官档~基础用法5
    查看>>
    Pandas中文官档~基础用法6
    查看>>
    Pandas中的GROUP BY AND SUM不丢失列
    查看>>
    pandas交换两列
    查看>>
    pandas介绍-ChatGPT4o作答
    查看>>
    pandas去除Nan值
    查看>>
    pandas实战:电商平台用户分析
    查看>>
    Pandas库常用方法、函数集合
    查看>>
    pandas打乱数据的顺序
    查看>>