Spark 串流 + Kinesis 整合

Amazon Kinesis 是一個完全受控的服務,用於大規模串流資料的即時處理。Kinesis 接收器使用 Amazon 在 Amazon 軟體授權 (ASL) 下提供的 Kinesis Client Library (KCL) 建立輸入 DStream。KCL 建立在 Apache 2.0 授權的 AWS Java SDK 之上,並透過工作人員、檢查點和分片租約的概念,提供負載平衡、容錯和檢查點。以下說明如何設定 Spark Streaming 以接收來自 Kinesis 的資料。

設定 Kinesis

Kinesis 串流可以設定在下列 指南 中的有效 Kinesis 端點之一,每個分片 1 個或更多。

設定 Spark Streaming 應用程式

  1. 連結:對於使用 SBT/Maven 專案定義的 Scala/Java 應用程式,將您的串流應用程式連結到下列人工製品(進一步資訊請參閱主要程式設計指南中的 連結區段)。

     groupId = org.apache.spark
     artifactId = spark-streaming-kinesis-asl_2.12
     version = 3.5.1
    

    對於 Python 應用程式,您必須在部署應用程式時新增此上層函式庫及其相依性。請參閱下方的 部署 子區段。請注意,透過連結到此函式庫,您會將 ASL 授權的程式碼包含在您的應用程式中。

  2. 程式設計:在串流應用程式程式碼中,匯入 KinesisInputDStream 並建立輸入 DStream 的位元組陣列,如下所示

         from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
    
         kinesisStream = KinesisUtils.createStream(
             streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
             [region name], [initial position], [checkpoint interval], [metricsLevel.DETAILED], StorageLevel.MEMORY_AND_DISK_2)
    

    請參閱 API 文件範例。請參閱 執行範例 子區段,取得執行範例的說明。

         import org.apache.spark.storage.StorageLevel
         import org.apache.spark.streaming.kinesis.KinesisInputDStream
         import org.apache.spark.streaming.{Seconds, StreamingContext}
         import org.apache.spark.streaming.kinesis.KinesisInitialPositions
    
         val kinesisStream = KinesisInputDStream.builder
             .streamingContext(streamingContext)
             .endpointUrl([endpoint URL])
             .regionName([region name])
             .streamName([streamName])
             .initialPosition([initial position])
             .checkpointAppName([Kinesis app name])
             .checkpointInterval([checkpoint interval])
             .metricsLevel([metricsLevel.DETAILED])
             .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
             .build()
    

    請參閱 API 文件範例。請參閱 執行範例 子區段,取得執行範例的說明。

         import org.apache.spark.storage.StorageLevel;
         import org.apache.spark.streaming.kinesis.KinesisInputDStream;
         import org.apache.spark.streaming.Seconds;
         import org.apache.spark.streaming.StreamingContext;
         import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
    
         KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
             .streamingContext(streamingContext)
             .endpointUrl([endpoint URL])
             .regionName([region name])
             .streamName([streamName])
             .initialPosition([initial position])
             .checkpointAppName([Kinesis app name])
             .checkpointInterval([checkpoint interval])
             .metricsLevel([metricsLevel.DETAILED])
             .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
             .build();
    

    請參閱 API 文件範例。請參閱 執行範例 子區段,取得執行範例的說明。

    您也可以提供下列設定。目前僅支援 Scala 和 Java。

    • 「訊息處理器函式」會取得 Kinesis 記錄並傳回一般物件 T,以防您想使用 記錄中包含的其他資料,例如分割鍵。
             import collection.JavaConverters._
             import org.apache.spark.storage.StorageLevel
             import org.apache.spark.streaming.kinesis.KinesisInputDStream
             import org.apache.spark.streaming.{Seconds, StreamingContext}
             import org.apache.spark.streaming.kinesis.KinesisInitialPositions
             import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
             import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
    
             val kinesisStream = KinesisInputDStream.builder
                 .streamingContext(streamingContext)
                 .endpointUrl([endpoint URL])
                 .regionName([region name])
                 .streamName([streamName])
                 .initialPosition([initial position])
                 .checkpointAppName([Kinesis app name])
                 .checkpointInterval([checkpoint interval])
                 .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
                 .metricsLevel(MetricsLevel.DETAILED)
                 .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
                 .buildWithMessageHandler([message handler])
    
             import org.apache.spark.storage.StorageLevel;
             import org.apache.spark.streaming.kinesis.KinesisInputDStream;
             import org.apache.spark.streaming.Seconds;
             import org.apache.spark.streaming.StreamingContext;
             import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
             import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
             import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
             import scala.collection.JavaConverters;
    
             KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
                 .streamingContext(streamingContext)
                 .endpointUrl([endpoint URL])
                 .regionName([region name])
                 .streamName([streamName])
                 .initialPosition([initial position])
                 .checkpointAppName([Kinesis app name])
                 .checkpointInterval([checkpoint interval])
                 .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
                 .metricsLevel(MetricsLevel.DETAILED)
                 .metricsEnabledDimensions(JavaConverters.asScalaSetConverter(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS).asScala().toSet())
                 .buildWithMessageHandler([message handler]);
    
    • 串流內容:包含 Kinesis 使用的應用程式名稱的串流內容,用於將此 Kinesis 應用程式連結到 Kinesis 串流

    • [Kinesis 應用程式名稱]:將用於在 DynamoDB 表中檢查點 Kinesis 順序號碼的應用程式名稱。
      • 應用程式名稱必須針對特定帳戶和區域是唯一的。
      • 如果表格存在,但檢查點資訊不正確(針對不同的串流或舊的過期順序號碼),則可能會出現暫時錯誤。
    • [Kinesis 串流名稱]:此串流應用程式將從中擷取資料的 Kinesis 串流。

    • [端點 URL]:可以在 這裡 找到有效的 Kinesis 端點 URL。

    • [區域名稱]:可以在 這裡 找到有效的 Kinesis 區域名稱。

    • [檢查點間隔]:Kinesis Client Library 儲存其在串流中的位置的間隔(例如,Duration(2000) = 2 秒)。對於初學者,請將其設定為與串流應用程式的批次間隔相同。

    • [初始位置]:可以是 KinesisInitialPositions.TrimHorizonKinesisInitialPositions.LatestKinesisInitialPositions.AtTimestamp(請參閱 Kinesis 檢查點 區段和 Amazon Kinesis API 文件 以取得更多詳細資料)。

    • [訊息處理器]:取得 Kinesis 記錄並輸出一般 T 的函式。

    在其他版本的 API 中,您也可以直接指定 AWS 存取金鑰和秘密金鑰。

  3. 部署:與任何 Spark 應用程式一樣,spark-submit 用於啟動您的應用程式。但是,Scala/Java 應用程式和 Python 應用程式的詳細資料略有不同。

    對於 Scala 和 Java 應用程式,如果您使用 SBT 或 Maven 進行專案管理,請將封裝 spark-streaming-kinesis-asl_2.12 及其相依性封裝到應用程式 JAR 中。請務必將 spark-core_2.12spark-streaming_2.12 標記為 provided 相依性,因為這些相依性已存在於 Spark 安裝中。然後使用 spark-submit 啟動您的應用程式(請參閱主要程式設計指南中的 部署區段)。

    對於缺乏 SBT/Maven 專案管理的 Python 應用程式,spark-streaming-kinesis-asl_2.12 及其相依性可以使用 --packages 直接新增到 spark-submit(請參閱 應用程式提交指南)。也就是說,

     ./bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.5.1 ...
    

    或者,您也可以從 Maven 儲存庫 下載 Maven 製成品 spark-streaming-kinesis-asl-assembly 的 JAR,並使用 --jars 將其新增到 spark-submit

    Spark Streaming Kinesis Architecture

    執行階段的注意事項

    • Kinesis 資料處理會依分區排序,且每個訊息至少會處理一次。

    • 多個應用程式可以從同一個 Kinesis 串流讀取。Kinesis 會在 DynamoDB 中維護應用程式特定的分片和檢查點資訊。

    • 一次只會有一個輸入 DStream 處理單一 Kinesis 串流分片。

    • 單一 Kinesis 輸入 DStream 可以透過建立多個 KinesisRecordProcessor 執行緒,從 Kinesis 串流的多個分片讀取。

    • 在個別程序/執行個體中執行的多個輸入 DStream 可以從 Kinesis 串流讀取。

    • 您永遠不需要比 Kinesis 串流分片數量更多的 Kinesis 輸入 DStream,因為每個輸入 DStream 都會建立至少一個處理單一分片的 KinesisRecordProcessor 執行緒。

    • 透過新增/移除 Kinesis 輸入 DStream(在單一程序中或跨多個程序/執行個體)來達成橫向擴充,最多可達前一點所述的 Kinesis 串流分片總數。

    • Kinesis 輸入 DStream 會平衡所有 DStream 之間的負載,甚至跨程序/執行個體。

    • 由於負載變更,Kinesis 輸入 DStream 會在分片重新分組事件(合併和分割)期間平衡負載。

    • 建議您在可能的情況下透過過度配置來避免分片重新分組抖動,這是一種最佳實務。

    • 每個 Kinesis 輸入 DStream 都會維護自己的檢查點資訊。有關更多詳細資訊,請參閱 Kinesis 檢查點區段。

    • Kinesis 串流分片數目與輸入 DStream 處理期間在 Spark 群集建立的 RDD 分割/分片數目之間沒有關聯性。這兩個是獨立的分割配置。

執行範例

若要執行範例,

記錄去聚合

當使用 Kinesis 產生器程式庫 (KPL) 產生資料時,可能會聚合訊息以節省成本。Spark 串流將在使用期間自動去聚合記錄。

Kinesis 檢查點

Kinesis 重試設定