本文共 2246 字,大约阅读时间需要 7 分钟。
在默认配置下,Spark Streaming通过接收器(或者Direct方式)以生产者生成数据的速率接收数据。当批处理时间(batch processing time)超过批处理间隔(batch interval)的时候,即每个批次数据处理的时间都比Spark Streaming批处理间隔时间长;随着越来越多的数据被接收,而数据的处理速度无法跟上,系统可能会出现数据堆积,进一步可能导致Executor端内存溢出(OOM)的问题从而引发失败。
在Spark 1.5版本之前,为了解决这个问题,对于基于接收器的数据接收器,我们可以通过配置spark.streaming.receiver.maxRate参数来限制每个接收器每秒钟最大可以接收的记录数量;对于Direct Approach的数据接收,我们可以通过配置spark.streaming.kafka.maxRatePerPartition参数来限制每次作业中每个Kafka分区最少读取的记录条数。通过这种方式限制接收速率,能够合理配合处理能力。然而,这种方法还有几个问题:
那么是否有办法不需要人工干预让Spark Streaming系统自动处理这些问题呢?是的!Spark 1.5版本引入了反压(Back Pressure)机制,其通过动态收集系统的一些数据来自动适配集群的数据处理能力。
之前的Spark Streaming架构
在Spark 1.5版本之前,Spark Streaming的架构如下:
Spark 1.5及以后的架构
为了实现对数据传输速率的自动调节,在原有架构上新增了一个名为RateController的组件,它集成在StreamingListener中,负责监控所有作业的onBatchCompleted事件。基于processingDelay,samplingDelay、当前批次记录条数及处理完成事件,RateController可以估算出一个速率。这个速率会被用来更新流的每秒能处理的最大记录数。速率估算器(RateEstimator)有多种实现,但目前Spark 2.2只支持基于PID的速率估算器。
具体来说,RateController内会存下计算好的最大速率,这个速率在处理完onBatchCompleted事件后会被推送到ReceiverSupervisorImpl。这样接收器就知道每秒能接收多少条数据。需要注意的是,当用户配置了spark.streaming.kafka.maxRatePerPartition时,最终接收的数据量取决于三个值中的最小值,即每个接收器或每个Kafka分区每秒能处理的数据不会超过spark.streaming.receiver.maxRate或spark.streaming.kafka.maxRatePerPartition的值。
具体流程可以参考下图所示(图中描述了详细的数据流和反压机制的调节过程)。
在Spark中启用反压机制非常简单,只需将spark.streaming.backpressure.enabled设置为true即可,默认值为false。
在使用反压机制时,还需要注意一些相关参数:
通过以上设置,可以有效地利用反压机制来动态调整数据处理速率,最大限度地发挥集群的处理能力,同时避免因为消息堆积而引发的资源浪费或系统故障。
转载地址:http://tfooz.baihongyu.com/