国产精品色哟哟_男男激情3p互攻_色偷偷影院_和女同学厕所做了四次

當(dāng)前位置: 首頁 / 技術(shù)干貨 / 正文
Spark Streaming 反壓機制(Back Pressure)

2022-07-29

速率 spark streaming 處理

Spark Streaming 反壓機制(Back Pressure)

Spark Streaming 反壓機制是1.5版本推出的特性,用來解決處理速度比攝入速度慢的情況,簡單來講就是做流量控制。當(dāng)批處理時間(Batch Processing Time)大于批次間隔(Batch Interval,即 BatchDuration)時,說明處理數(shù)據(jù)的速度小于數(shù)據(jù)攝入的速度,持續(xù)時間過長或源頭數(shù)據(jù)暴增,容易造成數(shù)據(jù)在內(nèi)存中堆積,最終導(dǎo)致Executor OOM。反壓就是來解決這個問題的。

spark streaming的消費數(shù)據(jù)源方式有兩種:

若是基于Receiver的數(shù)據(jù)源,可以通過設(shè)置spark.streaming.receiver.maxRate來控制最大輸入速率;若是基于Direct的數(shù)據(jù)源(如Kafka Direct Stream),則可以通過設(shè)置spark.streaming.kafka.maxRatePerPartition來控制最大輸入速率。

當(dāng)然,在事先經(jīng)過壓測,且流量高峰不會超過預(yù)期的情況下,設(shè)置這些參數(shù)一般沒什么問題。但最大值,不代表是最優(yōu)值,最好還能根據(jù)每個批次處理情況來動態(tài)預(yù)估下個批次最優(yōu)速率。

在Spark 1.5.0以上,就可通過背壓機制來實現(xiàn)。開啟反壓機制,即設(shè)置spark.streaming.backpressure.enabled為true,Spark Streaming會自動根據(jù)處理能力來調(diào)整輸入速率,從而在流量高峰時仍能保證最大的吞吐和性能

Spark Streaming的反壓機制中,有以下幾個重要的組件:

RateController 組件是 JobScheduler 的監(jiān)聽器,主要監(jiān)聽集群所有作業(yè)的提交、運行、完成情況,并從 BatchInfo 實例中獲取以下信息,交給速率估算器(RateEstimator)做速率的估算。
  1. 當(dāng)前批次任務(wù)處理完成的時間戳 (processingEndTime)
  2. 該批次從第一個 job 到最后一個 job 的實際處理時長 (processingDelay)
  3. 該批次的調(diào)度時延,即從被提交到 JobScheduler 到第一個 job 開始處理的時長(schedulingDelay)
  4. 該批次輸入數(shù)據(jù)的總條數(shù)(numRecords)

 

Spark 2.x 只支持基于 PID 的速率估算器,這里只討論這種實現(xiàn)。基于 PID 的速率估算器簡單地說就是它把收集到的數(shù)據(jù)(當(dāng)前批次速率)和一個設(shè)定值(上一批次速率)進行比較,然后用它們之間的差計算新的輸入值,估算出一個合適的用于下一批次的流量閾值。這里估算出來的值就是流量的閾值,用于更新每秒能夠處理的最大記錄數(shù)以上這兩個組件都是在Driver端用于更新最大速度的,而RateLimiter是用于接收到Driver的更新通知之后更新Executor的最大處理速率的組件。RateLimiter是一個抽象類,它并不是Spark本身實現(xiàn)的,而是借助了第三方Google的GuavaRateLimiter來產(chǎn)生的。它實質(zhì)上是一個限流器,也可以叫做令牌,如果Executor中task每秒計算的速度大于該值則阻塞,如果小于該值則通過,將流數(shù)據(jù)加入緩存中進行計算。

* 反壓機制真正起作用時需要至少處理一個批:由于反壓機制需要根據(jù)當(dāng)前批的速率,預(yù)估新批的速率,所以反壓機制真正起作用前,應(yīng)至少保證處理一個批。

 

* 如何保證反壓機制真正起作用前應(yīng)用不會崩潰:要保證反壓機制真正起作用前應(yīng)用不會崩潰,需要控制每個批次最大攝入速率。若為Direct Stream,如Kafka Direct Stream,則可以通過spark.streaming.kafka.maxRatePerPartition參數(shù)來控制。此參數(shù)代表了 每秒每個分區(qū)最大攝入的數(shù)據(jù)條數(shù)。假設(shè)BatchDuration為10秒,spark.streaming.kafka.maxRatePerPartition為12條,kafka topic 分區(qū)數(shù)為3個,則一個批(Batch)最大讀取的數(shù)據(jù)條數(shù)為360條(3*12*10=360)。同時,需要注意,該參數(shù)也代表了整個應(yīng)用生命周期中的最大速率,即使是背壓調(diào)整的最大值也不會超過該參數(shù)。

反壓相關(guān)的參數(shù)

參數(shù)名稱

默認(rèn)值

說明

spark.streaming.backpressure.enabled

false

是否啟用反壓機制

spark.streaming.backpressure.initialRate

初始最大接收速率。只適用于Receiver Stream,不適用于Direct Stream。

spark.streaming.backpressure.rateEstimator

pid

速率控制器,Spark 默認(rèn)只支持此控制器,可自定義。

spark.streaming.backpressure.pid.proportional

1.0

只能為非負(fù)值。當(dāng)前速率與最后一批速率之間的差值對總控制信號貢獻的權(quán)重。用默認(rèn)值即可。

spark.streaming.backpressure.pid.integral

0.2

只能為非負(fù)值。比例誤差累積對總控制信號貢獻的權(quán)重。用默認(rèn)值即可

spark.streaming.backpressure.pid.derived

0

只能為非負(fù)值。比例誤差變化對總控制信號貢獻的權(quán)重。用默認(rèn)值即可

spark.streaming.backpressure.pid.minRate

100

只能為正數(shù),最小速率

 

 

好程序員公眾號

  • · 剖析行業(yè)發(fā)展趨勢
  • · 匯聚企業(yè)項目源碼

好程序員開班動態(tài)

More+
  • HTML5大前端 <高端班>

    開班時間:2021-04-12(深圳)

    開班盛況

    開班時間:2021-05-17(北京)

    開班盛況
  • 大數(shù)據(jù)+人工智能 <高端班>

    開班時間:2021-03-22(杭州)

    開班盛況

    開班時間:2021-04-26(北京)

    開班盛況
  • JavaEE分布式開發(fā) <高端班>

    開班時間:2021-05-10(北京)

    開班盛況

    開班時間:2021-02-22(北京)

    開班盛況
  • Python人工智能+數(shù)據(jù)分析 <高端班>

    開班時間:2021-07-12(北京)

    預(yù)約報名

    開班時間:2020-09-21(上海)

    開班盛況
  • 云計算開發(fā) <高端班>

    開班時間:2021-07-12(北京)

    預(yù)約報名

    開班時間:2019-07-22(北京)

    開班盛況
IT培訓(xùn)IT培訓(xùn)
在線咨詢
IT培訓(xùn)IT培訓(xùn)
試聽
IT培訓(xùn)IT培訓(xùn)
入學(xué)教程
IT培訓(xùn)IT培訓(xùn)
立即報名
IT培訓(xùn)

Copyright 2011-2023 北京千鋒互聯(lián)科技有限公司 .All Right 京ICP備12003911號-5 京公網(wǎng)安備 11010802035720號