Spark 串流 + Kinesis 整合
Amazon Kinesis 是一個完全受控的服務,用於大規模串流資料的即時處理。Kinesis 接收器使用 Amazon 在 Amazon 軟體授權 (ASL) 下提供的 Kinesis Client Library (KCL) 建立輸入 DStream。KCL 建立在 Apache 2.0 授權的 AWS Java SDK 之上,並透過工作人員、檢查點和分片租約的概念,提供負載平衡、容錯和檢查點。以下說明如何設定 Spark Streaming 以接收來自 Kinesis 的資料。
設定 Kinesis
Kinesis 串流可以設定在下列 指南 中的有效 Kinesis 端點之一,每個分片 1 個或更多。
設定 Spark Streaming 應用程式
-
連結:對於使用 SBT/Maven 專案定義的 Scala/Java 應用程式,將您的串流應用程式連結到下列人工製品(進一步資訊請參閱主要程式設計指南中的 連結區段)。
groupId = org.apache.spark artifactId = spark-streaming-kinesis-asl_2.12 version = 3.5.1
對於 Python 應用程式,您必須在部署應用程式時新增此上層函式庫及其相依性。請參閱下方的 部署 子區段。請注意,透過連結到此函式庫,您會將 ASL 授權的程式碼包含在您的應用程式中。
-
程式設計:在串流應用程式程式碼中,匯入
KinesisInputDStream
並建立輸入 DStream 的位元組陣列,如下所示from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream kinesisStream = KinesisUtils.createStream( streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], [region name], [initial position], [checkpoint interval], [metricsLevel.DETAILED], StorageLevel.MEMORY_AND_DISK_2)
請參閱 API 文件 和 範例。請參閱 執行範例 子區段,取得執行範例的說明。
- CloudWatch 指標層級和維度。請參閱 AWS 文件關於監控 KCL 的詳細資訊。預設為 MetricsLevel.DETAILED
import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kinesis.KinesisInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kinesis.KinesisInitialPositions val kinesisStream = KinesisInputDStream.builder .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .metricsLevel([metricsLevel.DETAILED]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build()
import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.kinesis.KinesisInputDStream; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.StreamingContext; import org.apache.spark.streaming.kinesis.KinesisInitialPositions; KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder() .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .metricsLevel([metricsLevel.DETAILED]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build();
您也可以提供下列設定。目前僅支援 Scala 和 Java。
- 「訊息處理器函式」會取得 Kinesis
記錄
並傳回一般物件T
,以防您想使用記錄
中包含的其他資料,例如分割鍵。
import collection.JavaConverters._ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kinesis.KinesisInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kinesis.KinesisInitialPositions import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel val kinesisStream = KinesisInputDStream.builder .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet) .buildWithMessageHandler([message handler])
import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.kinesis.KinesisInputDStream; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.StreamingContext; import org.apache.spark.streaming.kinesis.KinesisInitialPositions; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import scala.collection.JavaConverters; KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder() .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(JavaConverters.asScalaSetConverter(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS).asScala().toSet()) .buildWithMessageHandler([message handler]);
-
串流內容
:包含 Kinesis 使用的應用程式名稱的串流內容,用於將此 Kinesis 應用程式連結到 Kinesis 串流 [Kinesis 應用程式名稱]
:將用於在 DynamoDB 表中檢查點 Kinesis 順序號碼的應用程式名稱。- 應用程式名稱必須針對特定帳戶和區域是唯一的。
- 如果表格存在,但檢查點資訊不正確(針對不同的串流或舊的過期順序號碼),則可能會出現暫時錯誤。
-
[Kinesis 串流名稱]
:此串流應用程式將從中擷取資料的 Kinesis 串流。 -
[端點 URL]
:可以在 這裡 找到有效的 Kinesis 端點 URL。 -
[區域名稱]
:可以在 這裡 找到有效的 Kinesis 區域名稱。 -
[檢查點間隔]
:Kinesis Client Library 儲存其在串流中的位置的間隔(例如,Duration(2000) = 2 秒)。對於初學者,請將其設定為與串流應用程式的批次間隔相同。 -
[初始位置]
:可以是KinesisInitialPositions.TrimHorizon
或KinesisInitialPositions.Latest
或KinesisInitialPositions.AtTimestamp
(請參閱Kinesis 檢查點
區段和Amazon Kinesis API 文件
以取得更多詳細資料)。 [訊息處理器]
:取得 Kinesis記錄
並輸出一般T
的函式。
在其他版本的 API 中,您也可以直接指定 AWS 存取金鑰和秘密金鑰。
-
部署:與任何 Spark 應用程式一樣,
spark-submit
用於啟動您的應用程式。但是,Scala/Java 應用程式和 Python 應用程式的詳細資料略有不同。對於 Scala 和 Java 應用程式,如果您使用 SBT 或 Maven 進行專案管理,請將封裝
spark-streaming-kinesis-asl_2.12
及其相依性封裝到應用程式 JAR 中。請務必將spark-core_2.12
和spark-streaming_2.12
標記為provided
相依性,因為這些相依性已存在於 Spark 安裝中。然後使用spark-submit
啟動您的應用程式(請參閱主要程式設計指南中的 部署區段)。對於缺乏 SBT/Maven 專案管理的 Python 應用程式,
spark-streaming-kinesis-asl_2.12
及其相依性可以使用--packages
直接新增到spark-submit
(請參閱 應用程式提交指南)。也就是說,./bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.5.1 ...
或者,您也可以從 Maven 儲存庫 下載 Maven 製成品
spark-streaming-kinesis-asl-assembly
的 JAR,並使用--jars
將其新增到spark-submit
。執行階段的注意事項
-
Kinesis 資料處理會依分區排序,且每個訊息至少會處理一次。
-
多個應用程式可以從同一個 Kinesis 串流讀取。Kinesis 會在 DynamoDB 中維護應用程式特定的分片和檢查點資訊。
-
一次只會有一個輸入 DStream 處理單一 Kinesis 串流分片。
-
單一 Kinesis 輸入 DStream 可以透過建立多個 KinesisRecordProcessor 執行緒,從 Kinesis 串流的多個分片讀取。
-
在個別程序/執行個體中執行的多個輸入 DStream 可以從 Kinesis 串流讀取。
-
您永遠不需要比 Kinesis 串流分片數量更多的 Kinesis 輸入 DStream,因為每個輸入 DStream 都會建立至少一個處理單一分片的 KinesisRecordProcessor 執行緒。
-
透過新增/移除 Kinesis 輸入 DStream(在單一程序中或跨多個程序/執行個體)來達成橫向擴充,最多可達前一點所述的 Kinesis 串流分片總數。
-
Kinesis 輸入 DStream 會平衡所有 DStream 之間的負載,甚至跨程序/執行個體。
-
由於負載變更,Kinesis 輸入 DStream 會在分片重新分組事件(合併和分割)期間平衡負載。
-
建議您在可能的情況下透過過度配置來避免分片重新分組抖動,這是一種最佳實務。
-
每個 Kinesis 輸入 DStream 都會維護自己的檢查點資訊。有關更多詳細資訊,請參閱 Kinesis 檢查點區段。
-
Kinesis 串流分片數目與輸入 DStream 處理期間在 Spark 群集建立的 RDD 分割/分片數目之間沒有關聯性。這兩個是獨立的分割配置。
-
執行範例
若要執行範例,
-
從 下載網站 下載 Spark 二進位檔。
-
在 AWS 中設定 Kinesis 串流(請參閱前一區段)。請注意 Kinesis 串流名稱和對應於建立串流區域的端點網址。
-
使用您的 AWS 認證設定環境變數
AWS_ACCESS_KEY_ID
和AWS_SECRET_ACCESS_KEY
。 -
在 Spark 根目錄中,以以下方式執行範例
./bin/spark-submit --jars 'connector/kinesis-asl-assembly/target/spark-streaming-kinesis-asl-assembly_*.jar' \ connector/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \ [Kinesis app name] [Kinesis stream name] [endpoint URL] [region name]
./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.5.1 streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.5.1 streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
這將等待從 Kinesis 串流接收資料。
-
若要在另一個終端機中產生隨機字串資料以放入 Kinesis 串流,請執行相關的 Kinesis 資料產生器。
./bin/run-example streaming.KinesisWordProducerASL [Kinesis stream name] [endpoint URL] 1000 10
這會將每秒 1000 行的 10 個隨機數字推送到 Kinesis 串流。然後,執行中的範例應接收並處理這些資料。
記錄去聚合
當使用 Kinesis 產生器程式庫 (KPL) 產生資料時,可能會聚合訊息以節省成本。Spark 串流將在使用期間自動去聚合記錄。
Kinesis 檢查點
-
每個 Kinesis 輸入 DStream 會定期將串流的目前位置儲存在後端 DynamoDB 表格中。這讓系統可以從故障中復原,並從 DStream 中斷的地方繼續處理。
-
過於頻繁地進行檢查點會造成 AWS 檢查點儲存層負載過重,並可能導致 AWS 限制。提供的範例會使用隨機後退重試策略來處理這種限制。
-
如果輸入 DStream 啟動時沒有 Kinesis 檢查點資訊,它將從可用的最舊記錄 (
KinesisInitialPositions.TrimHorizon
) 或最新提示 (KinesisInitialPositions.Latest
) 開始,或(Python 除外)從由提供的 UTC 時間戳記 (KinesisInitialPositions.AtTimestamp(Date timestamp)
) 表示的位置開始。這是可設定的。KinesisInitialPositions.Latest
如果在沒有輸入 DStreams 執行(且沒有儲存檢查點資訊)的情況下將資料新增到串流,可能會導致遺漏記錄。KinesisInitialPositions.TrimHorizon
可能會導致重複處理記錄,其影響取決於檢查點頻率和處理冪等性。
Kinesis 重試設定
spark.streaming.kinesis.retry.waitTime
: 以持續時間字串表示 Kinesis 重試之間的等待時間。在從 Amazon Kinesis 讀取時,使用者可能會遇到ProvisionedThroughputExceededException
,原因是每秒消耗超過 5 個交易,或超過每秒 2 MiB 的最大讀取速率。當擷取失敗時,可以調整此設定來增加擷取之間的休眠時間,以減少這些例外情況。預設為「100ms」。spark.streaming.kinesis.retry.maxAttempts
: Kinesis 擷取的最大重試次數。此設定也可在上述情況中用於處理 KinesisProvisionedThroughputExceededException
。可以增加此設定以增加 Kinesis 讀取的重試次數。預設為 3。