Spark 串流處理程式設計指南

注意

Spark Streaming 是 Spark 串流引擎的前一代。Spark Streaming 已不再更新,而且是舊專案。Spark 中有一個更新且更易於使用的串流引擎,稱為結構化串流。您應針對串流應用程式和管線使用 Spark 結構化串流。請參閱 結構化串流程式設計指南

概觀

Spark Streaming 是核心 Spark API 的延伸,可針對即時資料串流進行可擴充、高通量、容錯的串流處理。資料可以從許多來源擷取,例如 Kafka、Kinesis 或 TCP socket,並可以使用以高階函數表示的複雜演算法進行處理,例如 mapreducejoinwindow。最後,處理後的資料可以推送到檔案系統、資料庫和即時儀表板。事實上,您可以對資料串流套用 Spark 的 機器學習圖形處理 演算法。

Spark Streaming

在內部,其運作方式如下。Spark Streaming 接收即時輸入資料串流,並將資料分成批次,然後由 Spark 引擎處理這些批次,以批次方式產生最終的結果串流。

Spark Streaming

Spark Streaming 提供一個稱為離散化串流DStream的高階抽象,用於表示連續的資料串流。DStream 可以從來源(例如 Kafka 和 Kinesis)的輸入資料串流建立,也可以透過對其他 DStream 套用高階運算建立。在內部,DStream 表示為 RDD 的序列。

本指南說明如何開始使用 DStream 編寫 Spark Streaming 程式。您可以在 Scala、Java 或 Python(在 Spark 1.2 中引入)中編寫 Spark Streaming 程式,本指南中會介紹所有這些語言。您會在本指南中找到多個標籤,讓您可以在不同語言的程式碼片段之間進行選擇。

注意:有少數 API 在 Python 中不同或不可用。在整個指南中,您會看到標籤 Python API,用來強調這些差異。


快速範例

在我們深入探討如何撰寫自己的 Spark Streaming 程式之前,讓我們快速了解一下簡單的 Spark Streaming 程式是什麼樣子。假設我們要計算從監聽 TCP socket 的資料伺服器接收到的文字資料中的字數。您需要執行的步驟如下。

首先,我們匯入 StreamingContext,這是所有串流功能的主要進入點。我們使用兩個執行緒建立一個本機 StreamingContext,批次間隔為 1 秒。

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

使用這個內容,我們可以建立一個 DStream,代表來自 TCP 來源的串流資料,指定為 hostname(例如 localhost)和 port(例如 9999)。

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)

這個 lines DStream 代表將從資料伺服器接收到的資料串流。此 DStream 中的每筆記錄都是一行程文字。接下來,我們想要將這些行按空白字元分割成單字。

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

flatMap 是一個一對多 DStream 作業,透過從來源 DStream 中的每筆記錄產生多筆新記錄來建立新的 DStream。在此情況下,每行將分割成多個單字,單字串流則表示為 words DStream。接下來,我們想要計算這些單字。

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

words DStream 進一步對應(一對一轉換)到 (word, 1) 成對的 DStream,然後縮減以取得每批資料中單字的頻率。最後,wordCounts.pprint() 會列印每秒產生的一些計數。

請注意,當執行這些行時,Spark 串流只會設定它在啟動時執行的運算,而尚未開始實際處理。要在設定所有轉換後開始處理,我們最後呼叫

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

完整的程式碼可以在 Spark 串流範例 NetworkWordCount 中找到。

首先,我們匯入 Spark 串流類別的名稱和一些隱式轉換,從 StreamingContext 到我們的環境,以便將有用的方法新增到我們需要的其他類別(例如 DStream)。StreamingContext 是所有串流功能的主要進入點。我們使用兩個執行緒建立一個本機 StreamingContext,批次間隔為 1 秒。

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

使用這個內容,我們可以建立一個 DStream,代表來自 TCP 來源的串流資料,指定為 hostname(例如 localhost)和 port(例如 9999)。

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

這個 lines DStream 代表將從資料伺服器接收到的資料串流。此 DStream 中的每筆記錄都是一行程文字。接下來,我們想要將這些行按空白字元分割成單字。

// Split each line into words
val words = lines.flatMap(_.split(" "))

flatMap 是一個一對多 DStream 作業,透過從來源 DStream 中的每筆記錄產生多筆新記錄來建立新的 DStream。在此情況下,每行將分割成多個單字,單字串流則表示為 words DStream。接下來,我們想要計算這些單字。

import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

DStream words 進一步對應(一對一轉換)到 DStream 的 (word, 1) 對,然後縮減以取得每一批次資料中字詞的頻率。最後,wordCounts.print() 會列印每秒產生的一些計數。

請注意,當執行這些行時,Spark 串流只會設定它在啟動時執行的運算,而尚未開始實際處理。要在設定所有轉換後開始處理,我們最後呼叫

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

完整的程式碼可以在 Spark 串流範例中找到 NetworkWordCount

首先,我們建立一個 JavaStreamingContext 物件,這是所有串流功能的主要進入點。我們建立一個具有兩個執行緒和 1 秒批次區間的本地串流內容。

import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

使用這個內容,我們可以建立一個 DStream,代表來自 TCP 來源的串流資料,指定為 hostname(例如 localhost)和 port(例如 9999)。

// Create a DStream that will connect to hostname:port, like localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

這個 lines DStream 代表從資料伺服器接收到的資料串流。這個串流中的每一筆記錄都是一行程文字。然後,我們想要將這些行以空格分隔成字詞。

// Split each line into words
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());

flatMap 是 DStream 操作,透過從來源 DStream 中的每筆記錄產生多筆新記錄來建立新的 DStream。在這個情況下,每一行都會分隔成多個字詞,而字詞串流會表示為 words DStream。請注意,我們使用 FlatMapFunction 物件定義轉換。我們會在過程中發現,Java API 中有許多此類方便的類別,有助於定義 DStream 轉換。

接下來,我們想要計算這些字詞。

// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();

DStream words 進一步對應(一對一轉換)到 DStream 的 (word, 1) 對,使用 PairFunction 物件。然後,縮減以取得每一批次資料中字詞的頻率,使用 Function2 物件。最後,wordCounts.print() 會列印每秒產生的一些計數。

請注意,當執行這些行時,Spark 串流只會設定它在啟動後執行的運算,而尚未開始實際處理。要在設定所有轉換後啟動處理,我們最後呼叫 start 方法。

jssc.start();              // Start the computation
jssc.awaitTermination();   // Wait for the computation to terminate

完整的程式碼可以在 Spark 串流範例中找到 JavaNetworkWordCount

如果您已經下載建立Spark,您可以使用以下方式執行此範例。您首先需要使用以下方式執行 Netcat(在多數類 Unix 系統中找到的小型工具程式)作為資料伺服器

$ nc -lk 9999

然後,在不同的終端機中,您可以使用以下方式啟動範例

$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999

然後,在執行 netcat 伺服器的終端機中輸入的任何行都會被計數,並每秒列印在螢幕上。它看起來會像以下內容。

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999

hello world



...
# TERMINAL 2: RUNNING network_wordcount.py

$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
...
-------------------------------------------
Time: 2014-10-14 15:25:21
-------------------------------------------
(hello,1)
(world,1)
...
# TERMINAL 2: RUNNING NetworkWordCount

$ ./bin/run-example streaming.NetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...
# TERMINAL 2: RUNNING JavaNetworkWordCount

$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...


基本概念

接下來,我們將超越簡單的範例,並闡述 Spark Streaming 的基礎知識。

連結

與 Spark 類似,Spark Streaming 可透過 Maven Central 取得。若要撰寫您自己的 Spark Streaming 程式,您必須將以下相依性新增到您的 SBT 或 Maven 專案。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>3.5.1</version>
    <scope>provided</scope>
</dependency>
libraryDependencies += "org.apache.spark" % "spark-streaming_2.12" % "3.5.1" % "provided"

對於從 Spark Streaming 核心 API 中不存在的來源(例如 Kafka 和 Kinesis)擷取資料,您必須將對應的成品 spark-streaming-xyz_2.12 新增到相依性。例如,一些常見的成品如下。

來源成品
Kafka spark-streaming-kafka-0-10_2.12
Kinesis
spark-streaming-kinesis-asl_2.12 [Amazon 軟體授權]

如需最新清單,請參閱Maven 存放庫,以取得支援來源和成品的完整清單。


初始化 StreamingContext

若要初始化 Spark Streaming 程式,必須建立一個 StreamingContext 物件,這是所有 Spark Streaming 功能的主要進入點。

可以從SparkContext 物件建立StreamingContext 物件。

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext(master, appName)
ssc = StreamingContext(sc, 1)

appName 參數是應用程式的名稱,會顯示在叢集 UI 上。 masterSpark、Mesos 或 YARN 叢集 URL,或是在本機模式中執行的特殊 “local[*]” 字串。實際上,在叢集上執行時,您不會希望在程式中對 master 硬編碼,而是使用 spark-submit 啟動應用程式,並在那裡接收它。但是,對於本機測試和單元測試,您可以傳遞 “local[*]” 以在處理程序中執行 Spark Streaming(偵測本機系統中的核心數目)。

批次間隔必須根據應用程式的延遲需求和可用的叢集資源設定。請參閱效能調整部分,以取得更多詳細資料。

可以從 StreamingContext 物件建立 SparkConf 物件。

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

appName 參數是應用程式的名稱,會顯示在叢集 UI 上。masterSpark、Mesos、Kubernetes 或 YARN 叢集 URL,或是在本地模式下執行的特殊字串 “local[*]”。實際上,在叢集上執行時,您不會希望在程式中硬編碼 master,而會使用 spark-submit 啟動應用程式,並在那裡接收它。不過,對於本地測試和單元測試,您可以傳遞 “local[*]” 來在處理程序中執行 Spark Streaming。請注意,這會在內部建立一個 SparkContext (所有 Spark 功能的起點),可以透過 ssc.sparkContext 存取。

批次間隔必須根據應用程式的延遲需求和可用的叢集資源設定。請參閱效能調整部分,以取得更多詳細資料。

也可以從現有的 SparkContext 物件建立 StreamingContext 物件。

import org.apache.spark.streaming._

val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

可以從 JavaStreamingContext 物件建立 SparkConf 物件。

import org.apache.spark.*;
import org.apache.spark.streaming.api.java.*;

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));

appName 參數是應用程式的名稱,會顯示在叢集 UI 上。masterSpark、Mesos 或 YARN 叢集 URL,或是在本地模式下執行的特殊字串 “local[*]”。實際上,在叢集上執行時,您不會希望在程式中硬編碼 master,而會使用 spark-submit 啟動應用程式,並在那裡接收它。不過,對於本地測試和單元測試,您可以傳遞 “local[*]” 來在處理程序中執行 Spark Streaming。請注意,這會在內部建立一個 JavaSparkContext (所有 Spark 功能的起點),可以透過 ssc.sparkContext 存取。

批次間隔必須根據應用程式的延遲需求和可用的叢集資源設定。請參閱效能調整部分,以取得更多詳細資料。

也可以從現有的 JavaSparkContext 建立 JavaStreamingContext 物件。

import org.apache.spark.streaming.api.java.*;

JavaSparkContext sc = ...   //existing JavaSparkContext
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));

定義內容後,您必須執行下列動作。

  1. 透過建立輸入 DStreams 來定義輸入來源。
  2. 透過將轉換和輸出操作套用至 DStreams 來定義串流運算。
  3. 使用 streamingContext.start() 開始接收資料並處理資料。
  4. 使用 streamingContext.awaitTermination() 等待處理停止(手動或因任何錯誤)。
  5. 可以使用 streamingContext.stop() 手動停止處理。
注意事項

離散化串流 (DStreams)

離散化串流DStream是 Spark Streaming 提供的基本抽象。它代表連續的資料串流,可能是從來源接收的輸入資料串流,或透過轉換輸入串流而產生的已處理資料串流。在內部,DStream 由連續的 RDD 系列表示,而 RDD 是 Spark 的不可變分散資料集抽象(有關更多詳細資訊,請參閱Spark 程式設計指南)。DStream 中的每個 RDD 都包含來自特定區間的資料,如下圖所示。

Spark Streaming

套用至 DStream 的任何操作都會轉換為對底層 RDD 的操作。例如,在將一串列轉換為字詞的先前範例中,flatMap 操作會套用至 lines DStream 中的每個 RDD,以產生 words DStream 的 RDD。這在下圖中顯示。

Spark Streaming

這些底層 RDD 轉換是由 Spark 引擎計算的。DStream 操作會隱藏大部分這些詳細資訊,並提供較高層級的 API 供開發人員使用以方便操作。這些操作會在後續章節中詳細說明。


輸入 DStreams 和接收器

輸入 DStreams 是表示從串流來源接收的輸入資料串流的 DStreams。在快速範例中,lines 是輸入 DStream,因為它表示從 netcat 伺服器接收的資料串流。每個輸入 DStream(除了檔案串流,會在本節稍後說明)都會與接收器Scala 文件Java 文件)物件關聯,該物件會從來源接收資料,並將其儲存在 Spark 的記憶體中以進行處理。

Spark Streaming 提供兩類內建串流來源。

我們將在稍後的本區段中討論每個類別中的一些來源。

請注意,如果您想要在串流應用程式中並行接收多個資料串流,您可以建立多個輸入 DStreams(在 效能調整 區段中進一步討論)。這將建立多個接收器,它們將同時接收多個資料串流。但請注意,Spark 工作器/執行器是長時間執行的任務,因此它會佔用分配給 Spark 串流應用程式的其中一個核心。因此,重要的是要記住,Spark 串流應用程式需要分配足夠的核心(或執行緒,如果在本地執行)來處理接收的資料,以及執行接收器。

注意事項

基本來源

我們已經在 快速範例 中看過 ssc.socketTextStream(...),它會從透過 TCP Socket 連線接收的文字資料建立 DStream。除了 Socket 之外,StreamingContext API 還提供從檔案建立 DStreams 的方法,作為輸入來源。

檔案串流

若要從與 HDFS API 相容的任何檔案系統(即 HDFS、S3、NFS 等)上的檔案讀取資料,可以使用 StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass] 建立 DStream。

檔案串流不需要執行接收器,因此不需要配置任何核心來接收檔案資料。

對於簡單的文字檔案,最簡單的方法是 StreamingContext.textFileStream(dataDirectory)

fileStream 在 Python API 中不可用;只有 textFileStream 可用。

streamingContext.textFileStream(dataDirectory)
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

對於文字檔案

streamingContext.textFileStream(dataDirectory)
streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory);

對於文字檔案

streamingContext.textFileStream(dataDirectory);
如何監控目錄

Spark 串流會監控目錄 dataDirectory,並處理在該目錄中建立的任何檔案。

使用物件儲存作為資料來源

類似於 HDFS 的「完整」檔案系統,傾向於在建立輸出串流時,設定檔案的修改時間。當開啟檔案時,即使資料尚未完全寫入,也可能會包含在 DStream 中,之後在同一個視窗中更新檔案將會被忽略。也就是說:變更可能會遺漏,資料也會從串流中省略。

若要保證變更會在視窗中被接收,請將檔案寫入未監控的目錄,然後在輸出串流關閉後立即將其重新命名為目標目錄。只要重新命名的檔案在建立的視窗中出現在已掃描的目標目錄中,新的資料就會被接收。

相反地,像是 Amazon S3 和 Azure Storage 等物件儲存體通常有較慢的重新命名作業,因為資料實際上會被複製。此外,重新命名的物件可能會將 rename() 作業的時間作為其修改時間,因此可能不會被視為原始建立時間暗示的視窗的一部分。

需要針對目標物件儲存體進行仔細的測試,以驗證儲存體的時間戳記行為是否與 Spark Streaming 預期的一致。直接寫入目標目錄可能是透過所選物件儲存體串流資料的適當策略。

有關此主題的更多詳細資訊,請參閱 Hadoop 檔案系統規格

基於自訂接收器的串流

DStreams 可以使用透過自訂接收器接收的資料串流來建立。有關更多詳細資訊,請參閱 自訂接收器指南

RDD 的佇列作為串流

若要使用測試資料測試 Spark Streaming 應用程式,也可以使用 streamingContext.queueStream(queueOfRDDs),根據 RDD 的佇列建立 DStream。推入佇列的每個 RDD 都會被視為 DStream 中的一批資料,並像串流一樣進行處理。

有關從 socket 和檔案串流的更多詳細資訊,請參閱 Scala 中 StreamingContext、Java 中 JavaStreamingContext,以及 Python 中 StreamingContext 的相關函式的 API 文件。

進階來源

Python API 從 Spark 3.5.1 開始,在這些來源中,Kafka 和 Kinesis 可在 Python API 中使用。

此類別的來源需要與外部非 Spark 函式庫進行介面,其中一些具有複雜的相依性(例如 Kafka)。因此,為將與相依性版本衝突相關的問題降至最低,從這些來源建立 DStreams 的功能已移至可於必要時明確連結的獨立函式庫。

請注意,這些進階來源在 Spark shell 中不可用,因此無法在 shell 中測試基於這些進階來源的應用程式。如果您真的想在 Spark shell 中使用它們,您必須下載對應的 Maven 人工製品的 JAR 以及其相依性,並將其新增至類別路徑。

以下列出一些進階來源。

自訂來源

Python API Python 中尚未支援此功能。

輸入 DStreams 也可以從自訂資料來源建立。您只需實作一個使用者定義的接收器(請參閱下一部分以了解其定義)即可從自訂來源接收資料並將其推送到 Spark。請參閱自訂接收器指南以取得詳細資訊。

接收器可靠性

根據可靠性,資料來源可以分為兩種。來源(例如 Kafka)允許確認傳輸的資料。如果從這些可靠來源接收資料的系統正確確認已接收的資料,則可以確保不會因任何類型的故障而遺失資料。這會產生兩種接收器

  1. 可靠接收器 - 當資料已接收並儲存在 Spark 中且具有複本時,可靠接收器會正確地向可靠來源傳送確認。
  2. 不可靠接收器 - 不可靠接收器不會向來源傳送確認。這可用於不支援確認的來源,甚至可用於可靠來源,當您不想或不需要深入了解確認的複雜性時。

如何撰寫可靠接收器的詳細資訊已在自訂接收器指南中說明。


DStreams 的轉換

與 RDD 類似,轉換允許修改輸入 DStream 的資料。DStreams 支援許多適用於一般 Spark RDD 的轉換。以下是其中一些常見的轉換。

轉換意義
map(func) 透過將來源 DStream 的每個元素傳遞給函數 func 來傳回新的 DStream。
flatMap(func) 類似於 map,但每個輸入項目可以對應到 0 個或多個輸出項目。
filter(func) 透過僅選取來源 DStream 中 func 傳回 true 的記錄來傳回新的 DStream。
repartition(numPartitions) 透過建立更多或更少的分割區,來變更此 DStream 的平行處理層級。
union(otherStream) 傳回新的 DStream,其中包含來源 DStream 和 otherDStream 中元素的聯集。
count() 透過計算來源 DStream 中每個 RDD 的元素數量,來傳回單一元素 RDD 的新 DStream。
reduce(func) 透過使用函數 func (需要兩個引數並傳回一個引數) 來彙總來源 DStream 中每個 RDD 的元素,傳回單一元素 RDD 的新 DStream。此函數應具備結合律和交換律,才能平行運算。
countByValue() 在 K 類型的元素 DStream 上呼叫時,傳回新的 DStream,其中包含 (K, Long) 配對,而每個金鑰的值都是來源 DStream 中每個 RDD 中的頻率。
reduceByKey(func, [numTasks]) 在 (K, V) 配對的 DStream 上呼叫時,傳回新的 DStream,其中每個金鑰的值會使用指定的縮減函數進行彙總。注意:預設情況下,這會使用 Spark 預設的平行任務數量(本機模式為 2,而叢集模式的數量則由設定屬性 spark.default.parallelism 決定)來進行分組。您可以傳遞選用的 numTasks 引數來設定不同的任務數量。
join(otherStream, [numTasks]) 在兩個 (K, V) 和 (K, W) 配對的 DStream 上呼叫時,傳回新的 DStream,其中包含每個金鑰的所有元素配對的 (K, (V, W)) 配對。
cogroup(otherStream, [numTasks]) 在 (K, V) 和 (K, W) 配對的 DStream 上呼叫時,傳回新的 DStream,其中包含 (K, Seq[V], Seq[W]) 元組。
transform(func) 透過對來源 DStream 的每個 RDD 套用 RDD 轉 RDD 函數,來傳回新的 DStream。這可以用於對 DStream 執行任意的 RDD 操作。
updateStateByKey(func) 傳回一個新的「狀態」DStream,其中每個金鑰的狀態會透過對金鑰的先前狀態和金鑰的新值套用指定函式來更新。這可以用來維護每個金鑰的任意狀態資料。

其中一些轉換值得更詳細地討論。

UpdateStateByKey 操作

updateStateByKey 操作允許您維護任意狀態,同時持續使用新資訊更新它。若要使用此功能,您必須執行兩個步驟。

  1. 定義狀態 - 狀態可以是任意資料類型。
  2. 定義狀態更新函式 - 使用函式指定如何使用先前狀態和輸入串流中的新值來更新狀態。

在每個批次中,Spark 會對所有現有金鑰套用狀態更新函式,無論它們在批次中是否有新資料。如果更新函式傳回 None,則會消除金鑰值配對。

讓我們用一個範例來說明這一點。假設您想維護在文字資料串流中看到的每個字詞的執行計數。在此,執行計數是狀態,它是一個整數。我們將更新函式定義為

def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)  # add the new values with the previous running count to get the new count

這套用在包含字詞的 DStream 上(例如,在 先前範例 中包含 (word, 1) 配對的 pairs DStream)。

runningCounts = pairs.updateStateByKey(updateFunction)

將對每個字詞呼叫更新函式,其中 newValues 有一個 1 的序列(來自 (word, 1) 配對),而 runningCount 則有先前的計數。有關完整的 Python 程式碼,請參閱範例 stateful_network_wordcount.py

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}

這套用在包含字詞的 DStream 上(例如,在 先前範例 中包含 (word, 1) 配對的 pairs DStream)。

val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

將對每個字詞呼叫更新函式,其中 newValues 有一個 1 的序列(來自 (word, 1) 配對),而 runningCount 則有先前的計數。

Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
  (values, state) -> {
    Integer newSum = ...  // add the new values with the previous running count to get the new count
    return Optional.of(newSum);
  };

這套用在包含字詞的 DStream 上(例如,在 快速範例 中包含 (word, 1) 配對的 pairs DStream)。

JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction);

將對每個字詞呼叫更新函式,其中 newValues 有一個 1 的序列(來自 (word, 1) 配對),而 runningCount 則有先前的計數。有關完整的 Java 程式碼,請參閱範例 JavaStatefulNetworkWordCount.java

請注意,使用 updateStateByKey 需要設定檢查點目錄,這會在 檢查點 區段中詳細說明。

轉換操作

transform 操作(以及其變化,例如 transformWith)允許在 DStream 上套用任意的 RDD-to-RDD 函數。它可用於套用 DStream API 中未公開的任何 RDD 操作。例如,將資料串流中的每個批次與另一個資料集結合的功能並未直接公開在 DStream API 中。不過,你可以輕鬆使用 transform 來執行此操作。這能實現非常強大的可能性。例如,可以透過將輸入資料串流與預先計算的垃圾訊息資訊(可能也使用 Spark 產生)結合,然後根據它進行篩選,來進行即時資料清理。

spamInfoRDD = sc.pickleFile(...)  # RDD containing spam information

# join data stream with spam information to do data cleaning
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform { rdd =>
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
}
import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);

JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(rdd -> {
  rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
  ...
});

請注意,提供的函數會在每個批次區間中呼叫。這允許你執行時變 RDD 操作,也就是 RDD 操作、分割區數量、廣播變數等可以在批次之間變更。

視窗操作

Spark 串流也提供視窗運算,允許你在資料的滑動視窗上套用轉換。下圖說明了這個滑動視窗。

Spark Streaming

如圖所示,每次視窗在來源 DStream 上滑動時,會將落在視窗內的來源 RDD 結合並進行操作,以產生視窗化 DStream 的 RDD。在此特定情況下,操作會套用在資料的最後 3 個時間單位上,並以 2 個時間單位滑動。這表示任何視窗操作都需要指定兩個參數。

這兩個參數必須是來源 DStream 的批次區間的倍數(圖中的 1)。

讓我們用一個範例來說明視窗操作。假設你想要擴充 先前的範例,每 10 秒產生過去 30 秒資料的字詞計數。為此,我們必須在過去 30 秒資料的 pairs DStream 的 (word, 1) 配對上套用 reduceByKey 操作。這是使用 reduceByKeyAndWindow 操作來完成的。

# Reduce last 30 seconds of data, every 10 seconds
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
// Reduce last 30 seconds of data, every 10 seconds
JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2, Durations.seconds(30), Durations.seconds(10));

以下列出一些常見的視窗操作。這些操作都會使用兩個指定的參數 - windowLengthslideInterval

轉換意義
window(windowLength, slideInterval) 傳回一個新的 DStream,它是根據來源 DStream 的視窗批次計算而得。
countByWindow(windowLength, slideInterval) 傳回串流中元素的滑動視窗計數。
reduceByWindow(func, windowLength, slideInterval) 傳回一個新的單一元素串流,它是使用 func 在滑動區間內聚合串流中的元素而建立的。該函數應具有關聯性和交換性,才能在平行運算中正確計算。
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 當呼叫 (K, V) 成對的 DStream 時,傳回一個新的 (K, V) 成對的 DStream,其中每個金鑰的值會使用指定的縮減函數 func 在滑動視窗中的批次中進行聚合。注意:預設情況下,這會使用 Spark 的預設平行任務數量(區域模式為 2,群集模式的數量則由設定屬性 spark.default.parallelism 決定)來進行分組。您可以傳遞一個選用的 numTasks 參數來設定不同的任務數量。
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

上述 reduceByKeyAndWindow() 的更高效率版本,其中每個視窗的縮減值會使用前一個視窗的縮減值遞增計算。這是透過縮減進入滑動視窗的新資料,並「反向縮減」離開視窗的舊資料來完成的。一個範例是當視窗滑動時「新增」和「減去」金鑰的計數。但是,這只適用於「可反向縮減函數」,也就是那些具有對應「反向縮減函數」(取為參數 invFunc)的縮減函數。就像在 reduceByKeyAndWindow 中,縮減任務的數量可透過選用參數進行設定。請注意,必須啟用 檢查點 才能使用這個操作。

countByValueAndWindow(windowLength, slideInterval, [numTasks]) 當呼叫 (K, V) 成對的 DStream 時,傳回一個新的 (K, Long) 成對的 DStream,其中每個金鑰的值是其在滑動視窗中的頻率。就像在 reduceByKeyAndWindow 中,縮減任務的數量可透過選用參數進行設定。

加入操作

最後,值得強調的是,您可以在 Spark Streaming 中輕鬆執行不同類型的聯結。

串流串流聯結

串流可以很容易地與其他串流聯結。

stream1 = ...
stream2 = ...
joinedStream = stream1.join(stream2)
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
JavaPairDStream<String, String> stream1 = ...
JavaPairDStream<String, String> stream2 = ...
JavaPairDStream<String, Tuple2<String, String>> joinedStream = stream1.join(stream2);

在這裡,在每個批次區間中,由 stream1 生成的 RDD 將與由 stream2 生成的 RDD 聯結。您還可以執行 leftOuterJoinrightOuterJoinfullOuterJoin。此外,在串流的視窗上執行聯結通常非常有用。這也很容易。

windowedStream1 = stream1.window(20)
windowedStream2 = stream2.window(60)
joinedStream = windowedStream1.join(windowedStream2)
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
JavaPairDStream<String, String> windowedStream1 = stream1.window(Durations.seconds(20));
JavaPairDStream<String, String> windowedStream2 = stream2.window(Durations.minutes(1));
JavaPairDStream<String, Tuple2<String, String>> joinedStream = windowedStream1.join(windowedStream2);
串流資料集聯結

這已在先前說明 DStream.transform 操作時顯示。以下是將視窗串流與資料集聯結的另一個範例。

dataset = ... # some RDD
windowedStream = stream.window(20)
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))
val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
JavaPairRDD<String, String> dataset = ...
JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
JavaPairDStream<String, String> joinedStream = windowedStream.transform(rdd -> rdd.join(dataset));

事實上,您還可以動態變更要聯結的資料集。提供給 transform 的函數會在每個批次區間評估,因此會使用 dataset 參照點指向的目前資料集。

DStream 轉換的完整清單可在 API 文件中取得。對於 Scala API,請參閱 DStreamPairDStreamFunctions。對於 Java API,請參閱 JavaDStreamJavaPairDStream。對於 Python API,請參閱 DStream


DStreams 的輸出操作

輸出操作允許將 DStream 的資料推送到外部系統,例如資料庫或檔案系統。由於輸出操作實際上允許外部系統使用轉換後的資料,因此它們會觸發所有 DStream 轉換的實際執行(類似於 RDD 的動作)。目前,已定義下列輸出操作

輸出操作意義
列印() 列印執行串流應用程式的驅動程式節點中 DStream 中每個資料批次的頭十個元素。這對於開發和除錯很有用。
Python API Python API 中稱為 pprint()
saveAsTextFiles(prefix, [suffix]) 將此 DStream 的內容儲存為文字檔。每個批次區間的檔案名稱根據 prefixsuffix 產生:"prefix-TIME_IN_MS[.suffix]"
saveAsObjectFiles(prefix, [suffix]) 將此 DStream 的內容儲存為序列化 Java 物件的 SequenceFiles。每個批次區間的檔案名稱根據 prefixsuffix 產生:"prefix-TIME_IN_MS[.suffix]"
Python API Python API 中沒有此功能。
saveAsHadoopFiles(prefix, [suffix]) 將此 DStream 的內容儲存為 Hadoop 檔案。每個批次區間的檔案名稱根據 prefixsuffix 產生:"prefix-TIME_IN_MS[.suffix]"
Python API Python API 中沒有此功能。
foreachRDD(func) 最通用的輸出運算子,將函式 func 套用至串流產生的每個 RDD。此函式應將每個 RDD 中的資料推播至外部系統,例如將 RDD 儲存至檔案,或透過網路寫入資料庫。請注意,函式 func 在執行串流應用程式的驅動程式處理序中執行,通常會包含 RDD 動作,以強制計算串流 RDD。

使用 foreachRDD 的設計模式

dstream.foreachRDD 是強大的原始元件,允許將資料傳送至外部系統。然而,了解如何正確且有效率地使用此原始元件非常重要。以下列出一些常見的錯誤,應避免發生。

通常,將資料寫入外部系統需要建立連線物件(例如與遠端伺服器的 TCP 連線),並使用它將資料傳送至遠端系統。為此,開發人員可能會無意間嘗試在 Spark 驅動程式中建立連線物件,然後嘗試在 Spark 工作器中使用它來儲存 RDD 中的記錄。例如(在 Scala 中),

def sendRecord(rdd):
    connection = createNewConnection()  # executed at the driver
    rdd.foreach(lambda record: connection.send(record))
    connection.close()

dstream.foreachRDD(sendRecord)
dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}
dstream.foreachRDD(rdd -> {
  Connection connection = createNewConnection(); // executed at the driver
  rdd.foreach(record -> {
    connection.send(record); // executed at the worker
  });
});

這是錯誤的,因為這需要將連線物件序列化並從驅動程式傳送至工作器。此類連線物件很少可以在機器之間傳輸。此錯誤可能會顯示為序列化錯誤(連線物件無法序列化)、初始化錯誤(連線物件需要在工作器中初始化)等。正確的解決方案是在工作器中建立連線物件。

然而,這可能會導致另一個常見的錯誤 - 為每筆記錄建立新的連線。例如,

def sendRecord(record):
    connection = createNewConnection()
    connection.send(record)
    connection.close()

dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}
dstream.foreachRDD(rdd -> {
  rdd.foreach(record -> {
    Connection connection = createNewConnection();
    connection.send(record);
    connection.close();
  });
});

通常,建立連線物件需要時間和資源開銷。因此,為每筆記錄建立和銷毀連線物件可能會造成不必要的過高開銷,並大幅降低系統的整體處理量。更好的解決方案是使用 rdd.foreachPartition - 建立單一連線物件,並使用該連線傳送 RDD 分割區中的所有記錄。

def sendPartition(iter):
    connection = createNewConnection()
    for record in iter:
        connection.send(record)
    connection.close()

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}
dstream.foreachRDD(rdd -> {
  rdd.foreachPartition(partitionOfRecords -> {
    Connection connection = createNewConnection();
    while (partitionOfRecords.hasNext()) {
      connection.send(partitionOfRecords.next());
    }
    connection.close();
  });
});

這會將連線建立的開銷攤提至多筆記錄。

最後,這可透過在多個 RDD/批次中重複使用連線物件進一步最佳化。當多個批次的 RDD 推送到外部系統時,可以維護一個連線物件的靜態池,然後重複使用,進而進一步降低開銷。

def sendPartition(iter):
    # ConnectionPool is a static, lazily initialized pool of connections
    connection = ConnectionPool.getConnection()
    for record in iter:
        connection.send(record)
    # return to the pool for future reuse
    ConnectionPool.returnConnection(connection)

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}
dstream.foreachRDD(rdd -> {
  rdd.foreachPartition(partitionOfRecords -> {
    // ConnectionPool is a static, lazily initialized pool of connections
    Connection connection = ConnectionPool.getConnection();
    while (partitionOfRecords.hasNext()) {
      connection.send(partitionOfRecords.next());
    }
    ConnectionPool.returnConnection(connection); // return to the pool for future reuse
  });
});

請注意,池中的連線應根據需要延遲建立,如果一段時間未使用,應予以逾時。這可達成將資料傳送到外部系統的最高效率。

其他注意事項

資料框和 SQL 操作

您可以輕鬆對串流資料使用 資料框和 SQL 操作。您必須使用 StreamingContext 所使用的 SparkContext 建立 SparkSession。此外,必須這樣做才能在驅動程式發生故障時重新啟動。這可透過建立 SparkSession 的延遲實例化單例執行個體來完成。以下範例中顯示此方法。它修改先前的 字數計算範例,以使用資料框和 SQL 產生字數。每個 RDD 會轉換成資料框,註冊為暫時資料表,然後使用 SQL 查詢。

# Lazily instantiated global instance of SparkSession
def getSparkSessionInstance(sparkConf):
    if ("sparkSessionSingletonInstance" not in globals()):
        globals()["sparkSessionSingletonInstance"] = SparkSession \
            .builder \
            .config(conf=sparkConf) \
            .getOrCreate()
    return globals()["sparkSessionSingletonInstance"]

...

# DataFrame operations inside your streaming program

words = ... # DStream of strings

def process(time, rdd):
    print("========= %s =========" % str(time))
    try:
        # Get the singleton instance of SparkSession
        spark = getSparkSessionInstance(rdd.context.getConf())

        # Convert RDD[String] to RDD[Row] to DataFrame
        rowRdd = rdd.map(lambda w: Row(word=w))
        wordsDataFrame = spark.createDataFrame(rowRdd)

        # Creates a temporary view using the DataFrame
        wordsDataFrame.createOrReplaceTempView("words")

        # Do word count on table using SQL and print it
        wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
        wordCountsDataFrame.show()
    except:
        pass

words.foreachRDD(process)

請參閱完整的 原始碼

/** DataFrame operations inside your streaming program */

val words: DStream[String] = ...

words.foreachRDD { rdd =>

  // Get the singleton instance of SparkSession
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  import spark.implicits._

  // Convert RDD[String] to DataFrame
  val wordsDataFrame = rdd.toDF("word")

  // Create a temporary view
  wordsDataFrame.createOrReplaceTempView("words")

  // Do word count on DataFrame using SQL and print it
  val wordCountsDataFrame =
    spark.sql("select word, count(*) as total from words group by word")
  wordCountsDataFrame.show()
}

請參閱完整的 原始碼

/** Java Bean class for converting RDD to DataFrame */
public class JavaRow implements java.io.Serializable {
  private String word;

  public String getWord() {
    return word;
  }

  public void setWord(String word) {
    this.word = word;
  }
}

...

/** DataFrame operations inside your streaming program */

JavaDStream<String> words = ...

words.foreachRDD((rdd, time) -> {
  // Get the singleton instance of SparkSession
  SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate();

  // Convert RDD[String] to RDD[case class] to DataFrame
  JavaRDD<JavaRow> rowRDD = rdd.map(word -> {
    JavaRow record = new JavaRow();
    record.setWord(word);
    return record;
  });
  DataFrame wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class);

  // Creates a temporary view using the DataFrame
  wordsDataFrame.createOrReplaceTempView("words");

  // Do word count on table using SQL and print it
  DataFrame wordCountsDataFrame =
    spark.sql("select word, count(*) as total from words group by word");
  wordCountsDataFrame.show();
});

請參閱完整的 原始碼

您也可以從不同的執行緒(即非同步執行 StreamingContext)對串流資料上定義的資料表執行 SQL 查詢。只要確保將 StreamingContext 設定為記住足夠的串流資料,讓查詢可以執行即可。否則,StreamingContext 不會知道任何非同步 SQL 查詢,會在查詢完成前刪除舊的串流資料。例如,如果您想要查詢最後一個批次,但查詢需要 5 分鐘才能執行,請呼叫 streamingContext.remember(Minutes(5))(在 Scala 中,或其他語言中的等效呼叫)。

請參閱 資料框和 SQL 指南,以進一步了解資料框。


MLlib 運算

您也可以輕鬆使用 MLlib 提供的機器學習演算法。首先,有串流機器學習演算法(例如 串流線性回歸串流 KMeans 等),它可以同時從串流資料學習,並將模型套用於串流資料。除此之外,對於更多類型的機器學習演算法,您可以在離線(即使用歷史資料)學習一個學習模型,然後在線上將模型套用於串流資料。請參閱 MLlib 指南以取得更多詳細資訊。


快取/持久性

與 RDD 類似,DStream 也允許開發人員將串流資料保留在記憶體中。也就是說,在 DStream 上使用 persist() 方法會自動將該 DStream 的每個 RDD 保存在記憶體中。如果 DStream 中的資料會被計算多次(例如,對同一資料執行多個作業),這會很有用。對於像 reduceByWindowreduceByKeyAndWindow 等基於視窗的作業,以及像 updateStateByKey 等基於狀態的作業,這是隱含為真的。因此,由基於視窗的作業產生的 DStream 會自動保留在記憶體中,而開發人員不必呼叫 persist()

對於透過網路接收資料的輸入串流(例如 Kafka、socket 等),預設保留層級設定為將資料複製到兩個節點以確保容錯。

請注意,與 RDD 不同,DStream 的預設保留層級會將資料序列化保留在記憶體中。這在 效能調整 部分中有進一步討論。您可以在 Spark 程式設計指南 中找到更多有關不同保留層級的資訊。


檢查點

串流應用程式必須全天候執行,因此必須能承受與應用程式邏輯無關的故障(例如系統故障、JVM 崩潰等)。為了做到這一點,Spark Streaming 需要將足夠的資訊檢查點到容錯儲存系統,以便它可以從故障中復原。有兩種資料會被檢查點。

總之,元資料檢查點主要用於從驅動程式故障中復原,而資料或 RDD 檢查點則在使用有狀態轉換時,甚至對於基本功能也是必要的。

何時啟用檢查點

對於具有下列任何需求的應用程式,必須啟用檢查點

請注意,沒有上述有狀態轉換的簡單串流應用程式可以在不啟用檢查點的情況下執行。在這種情況下,從驅動程式故障中復原也會是部分的(一些已接收但未處理的資料可能會遺失)。這通常是可以接受的,而且許多人以這種方式執行 Spark 串流應用程式。預計未來對非 Hadoop 環境的支援會有所改善。

如何設定檢查點

可以在容錯、可靠的檔案系統(例如 HDFS、S3 等)中設定目錄來啟用檢查點,檢查點資訊將儲存至該目錄。這是透過使用 streamingContext.checkpoint(checkpointDirectory) 來完成的。這將允許您使用上述有狀態轉換。此外,如果您想讓應用程式從驅動程式故障中復原,您應該重新撰寫您的串流應用程式,使其具有下列行為。

透過使用 StreamingContext.getOrCreate,可以簡化此行為。其使用方式如下。

# Function to create and setup a new StreamingContext
def functionToCreateContext():
    sc = SparkContext(...)  # new context
    ssc = StreamingContext(...)
    lines = ssc.socketTextStream(...)  # create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)  # set checkpoint directory
    return ssc

# Get StreamingContext from checkpoint data or create a new one
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

# Do additional setup on context that needs to be done,
# irrespective of whether it is being started or restarted
context. ...

# Start the context
context.start()
context.awaitTermination()

如果 checkpointDirectory 存在,則會從檢查點資料重新建立內容。如果目錄不存在(即第一次執行),則會呼叫函數 functionToCreateContext 來建立新的內容並設定 DStreams。請參閱 Python 範例 recoverable_network_wordcount.py。此範例會將網路資料的字詞計數附加到檔案中。

您也可以從檢查點資料明確建立 StreamingContext,並使用 StreamingContext.getOrCreate(checkpointDirectory, None) 啟動運算。

透過使用 StreamingContext.getOrCreate,可以簡化此行為。其使用方式如下。

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
  val ssc = new StreamingContext(...)   // new context
  val lines = ssc.socketTextStream(...) // create DStreams
  ...
  ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
  ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

如果 checkpointDirectory 存在,則會從檢查點資料重新建立內容。如果目錄不存在(即第一次執行),則會呼叫函式 functionToCreateContext 來建立新的內容並設定 DStreams。請參閱 Scala 範例 RecoverableNetworkWordCount。此範例會將網路資料的字數計數附加到檔案中。

使用 JavaStreamingContext.getOrCreate 可以簡化此行為。使用方法如下。

// Create a factory object that can create and setup a new JavaStreamingContext
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
  @Override public JavaStreamingContext create() {
    JavaStreamingContext jssc = new JavaStreamingContext(...);  // new context
    JavaDStream<String> lines = jssc.socketTextStream(...);     // create DStreams
    ...
    jssc.checkpoint(checkpointDirectory);                       // set checkpoint directory
    return jssc;
  }
};

// Get JavaStreamingContext from checkpoint data or create a new one
JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start();
context.awaitTermination();

如果 checkpointDirectory 存在,則會從檢查點資料重新建立內容。如果目錄不存在(即第一次執行),則會呼叫函式 contextFactory 來建立新的內容並設定 DStreams。請參閱 Java 範例 JavaRecoverableNetworkWordCount。此範例會將網路資料的字數計數附加到檔案中。

除了使用 getOrCreate 之外,還需要確保驅動程式處理序在發生故障時會自動重新啟動。這只能由用於執行應用程式的部署基礎架構來完成。這會在 部署 區段中進一步說明。

請注意,RDD 的檢查點會產生儲存到可靠儲存體的成本。這可能會增加 RDD 進行檢查點的那些批次的處理時間。因此,需要仔細設定檢查點的間隔。在批次大小較小(例如 1 秒)時,每批次進行檢查點可能會顯著降低作業處理量。反之,檢查點的頻率太低會導致譜系和任務大小增加,這可能會產生不利影響。對於需要 RDD 檢查點的狀態轉換,預設間隔是批次間隔的倍數,至少為 10 秒。可以使用 dstream.checkpoint(checkpointInterval) 進行設定。通常,DStream 的 5 - 10 個滑動間隔的檢查點間隔是值得嘗試的良好設定。


累加器、廣播變數和檢查點

Spark Streaming 無法從檢查點中復原 累加器廣播變數。如果您啟用檢查點並使用 累加器廣播變數,則必須為 累加器廣播變數 建立延遲實例化的單例實例,以便在驅動程式因故障重新啟動後可以重新實例化它們。以下範例顯示此做法。

def getWordExcludeList(sparkContext):
    if ("wordExcludeList" not in globals()):
        globals()["wordExcludeList"] = sparkContext.broadcast(["a", "b", "c"])
    return globals()["wordExcludeList"]

def getDroppedWordsCounter(sparkContext):
    if ("droppedWordsCounter" not in globals()):
        globals()["droppedWordsCounter"] = sparkContext.accumulator(0)
    return globals()["droppedWordsCounter"]

def echo(time, rdd):
    # Get or register the excludeList Broadcast
    excludeList = getWordExcludeList(rdd.context)
    # Get or register the droppedWordsCounter Accumulator
    droppedWordsCounter = getDroppedWordsCounter(rdd.context)

    # Use excludeList to drop words and use droppedWordsCounter to count them
    def filterFunc(wordCount):
        if wordCount[0] in excludeList.value:
            droppedWordsCounter.add(wordCount[1])
            False
        else:
            True

    counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())

wordCounts.foreachRDD(echo)

請參閱完整的 原始碼

object WordExcludeList {

  @volatile private var instance: Broadcast[Seq[String]] = null

  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          val wordExcludeList = Seq("a", "b", "c")
          instance = sc.broadcast(wordExcludeList)
        }
      }
    }
    instance
  }
}

object DroppedWordsCounter {

  @volatile private var instance: LongAccumulator = null

  def getInstance(sc: SparkContext): LongAccumulator = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = sc.longAccumulator("DroppedWordsCounter")
        }
      }
    }
    instance
  }
}

wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
  // Get or register the excludeList Broadcast
  val excludeList = WordExcludeList.getInstance(rdd.sparkContext)
  // Get or register the droppedWordsCounter Accumulator
  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
  // Use excludeList to drop words and use droppedWordsCounter to count them
  val counts = rdd.filter { case (word, count) =>
    if (excludeList.value.contains(word)) {
      droppedWordsCounter.add(count)
      false
    } else {
      true
    }
  }.collect().mkString("[", ", ", "]")
  val output = "Counts at time " + time + " " + counts
})

請參閱完整的 原始碼

class JavaWordExcludeList {

  private static volatile Broadcast<List<String>> instance = null;

  public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
    if (instance == null) {
      synchronized (JavaWordExcludeList.class) {
        if (instance == null) {
          List<String> wordExcludeList = Arrays.asList("a", "b", "c");
          instance = jsc.broadcast(wordExcludeList);
        }
      }
    }
    return instance;
  }
}

class JavaDroppedWordsCounter {

  private static volatile LongAccumulator instance = null;

  public static LongAccumulator getInstance(JavaSparkContext jsc) {
    if (instance == null) {
      synchronized (JavaDroppedWordsCounter.class) {
        if (instance == null) {
          instance = jsc.sc().longAccumulator("DroppedWordsCounter");
        }
      }
    }
    return instance;
  }
}

wordCounts.foreachRDD((rdd, time) -> {
  // Get or register the excludeList Broadcast
  Broadcast<List<String>> excludeList = JavaWordExcludeList.getInstance(new JavaSparkContext(rdd.context()));
  // Get or register the droppedWordsCounter Accumulator
  LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
  // Use excludeList to drop words and use droppedWordsCounter to count them
  String counts = rdd.filter(wordCount -> {
    if (excludeList.value().contains(wordCount._1())) {
      droppedWordsCounter.add(wordCount._2());
      return false;
    } else {
      return true;
    }
  }).collect().toString();
  String output = "Counts at time " + time + " " + counts;
}

請參閱完整的 原始碼


部署應用程式

本節討論部署 Spark Streaming 應用程式的步驟。

需求

若要執行 Spark Streaming 應用程式,您需要具備下列條件。

升級應用程式程式碼

如果正在執行的 Spark 串流應用程式需要使用新的應用程式程式碼升級,則有兩種可能的機制。


監控應用程式

除了 Spark 的 監控功能 之外,還有特定於 Spark 串流的其他功能。當使用 StreamingContext 時,Spark 網頁 UI 會顯示一個額外的 串流 標籤,其中會顯示有關正在執行的接收器(接收器是否活躍、接收的記錄數、接收器錯誤等)和已完成批次(批次處理時間、排隊延遲等)的統計資料。這可用於監控串流應用程式的進度。

網頁 UI 中的以下兩個指標特別重要

如果批次處理時間持續超過批次間隔和/或排隊延遲持續增加,則表示系統無法以產生批次的速度處理批次,並且正在落後。在這種情況下,請考慮 減少 批次處理時間。

Spark Streaming 程式的進度也可以使用 StreamingListener 介面監控,它允許您取得接收器狀態和處理時間。請注意,這是一個開發人員 API,未來可能會改進(即報告更多資訊)。



效能調整

要在叢集上發揮 Spark Streaming 應用程式的最佳效能,需要進行一些調整。本節說明許多可調整的參數和組態,以改善應用程式的效能。在高層級,您需要考慮兩件事

  1. 透過有效使用叢集資源來減少每一批次資料的處理時間。

  2. 設定正確的批次大小,以便可以快速處理資料批次,就像它們被接收一樣(即資料處理趕得上資料擷取)。

縮短批次處理時間

可以在 Spark 中進行許多最佳化,以將每一批次的處理時間降到最低。這些已在 調整指南 中詳細討論。本節重點說明一些最重要的部分。

資料接收中的並行層級

透過網路接收資料(例如 Kafka、socket 等)需要將資料反序列化並儲存在 Spark 中。如果資料接收成為系統中的瓶頸,請考慮將資料接收並行化。請注意,每個輸入 DStream 會建立一個接收器(在工作機上執行),接收單一資料串流。因此,可以透過建立多個輸入 DStream 並將它們組態為從來源接收資料串流的不同分割區,來接收多個資料串流。例如,接收兩個資料主題的單一 Kafka 輸入 DStream 可以分割成兩個 Kafka 輸入串流,每個串流只接收一個主題。這將執行兩個接收器,允許並行接收資料,進而增加整體吞吐量。這些多個 DStream 可以合併在一起,以建立單一 DStream。然後,套用在單一輸入 DStream 上的轉換可以套用在統一的串流上。這會以下列方式進行。

numStreams = 5
kafkaStreams = [KafkaUtils.createStream(...) for _ in range (numStreams)]
unifiedStream = streamingContext.union(*kafkaStreams)
unifiedStream.pprint()
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<>(numStreams);
for (int i = 0; i < numStreams; i++) {
  kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();

另一個應該考慮的參數是接收器的區塊區間,它由 組態參數 spark.streaming.blockInterval 決定。對於大多數接收器,接收到的資料會合併成資料區塊,然後再儲存在 Spark 的記憶體中。每個批次中的區塊數目會決定將用於處理 map 類似轉換中接收資料的任務數目。每個接收器在每個批次中的任務數目大約會是(批次區間/區塊區間)。例如,200 毫秒的區塊區間會在 2 秒批次中建立 10 個任務。如果任務數目太少(即少於每部電腦的核心數目),那麼它會很沒效率,因為所有可用的核心不會用於處理資料。若要增加特定批次區間的任務數目,請減少區塊區間。不過,建議的區塊區間最小值約為 50 毫秒,低於此值時,任務啟動的開銷可能會成為問題。

接收資料的另一種方式是使用多個輸入串流/接收器,或明確重新分割輸入資料串流(使用 inputStream.repartition(<分割區數目>))。這會在進一步處理之前,將接收到的資料批次分佈到叢集中的指定電腦數目。

對於直接串流,請參閱 Spark 串流 + Kafka 整合指南

資料處理中的平行度

如果在運算的任何階段中使用的平行任務數不夠高,則叢集資源可能會使用不足。例如,對於 reduceByKeyreduceByKeyAndWindow 等分散式縮減運算,平行任務的預設數量由 spark.default.parallelism 組態屬性 控制。您可以傳遞平行度作為引數(請參閱 PairDStreamFunctions 文件),或設定 spark.default.parallelism 組態屬性 以變更預設值。

資料序列化

透過調整序列化格式,可以減少資料序列化的開銷。在串流的情況下,有兩種正在序列化的資料類型。

在這兩種情況下,使用 Kryo 序列化都可以減少 CPU 和記憶體開銷。有關更多詳細資訊,請參閱 Spark 調整指南。對於 Kryo,請考慮註冊自訂類別,並停用物件參考追蹤(請參閱 組態指南 中與 Kryo 相關的組態)。

在特定情況下,如果串流應用程式需要保留的資料量不大,則可以將資料(兩種類型)持續存在為取消序列化的物件,而不會產生過多的 GC 開銷。例如,如果您使用數秒的批次區間且沒有視窗運算,則可以透過明確設定儲存層級來停用持續資料中的序列化。這將減少因序列化而產生的 CPU 開銷,在沒有太多 GC 開銷的情況下潛在提升效能。

工作啟動開銷

如果每秒啟動的工作數量很高(例如每秒 50 個以上),則將工作傳送給執行器的開銷可能會很大,而且會讓難以達到次秒延遲。可以透過下列變更來減少開銷

這些變更可能會將批次處理時間減少數百毫秒,因此讓次秒批次大小可行。


設定正確的批次間隔

對於在叢集上執行的 Spark 串流應用程式要穩定,系統應能夠以接收資料的速度處理資料。換句話說,資料批次應以其產生速度處理。應用程式是否如此,可以透過 監控 串流網頁 UI 中的處理時間來得知,其中批次處理時間應小於批次間隔。

根據串流運算的性質,所使用的批次間隔可能會對應用程式在固定叢集資源組上可以維持的資料速率產生重大影響。例如,讓我們考慮先前的 WordCountNetwork 範例。對於特定的資料速率,系統可能可以跟上每 2 秒報告一次字數(即 2 秒的批次間隔),但無法每 500 毫秒報告一次。因此,需要設定批次間隔,以便可以維持預期的生產資料速率。

找出應用程式正確批次大小的良好方法是使用保守的批次間隔(例如 5-10 秒)和低資料速率來測試它。若要驗證系統是否能夠跟上資料速率,您可以查看每個已處理批次所經歷的端對端延遲值(在 Spark 驅動程式 log4j 記錄中尋找「總延遲」,或使用 StreamingListener 介面)。如果延遲維持在與批次大小相當的範圍內,則系統是穩定的。否則,如果延遲持續增加,表示系統無法跟上,因此不穩定。一旦您對穩定組態有所概念,就可以嘗試增加資料速率和/或減少批次大小。請注意,由於暫時資料速率增加而導致延遲暫時增加可能是可以的,只要延遲會降低回低值(即小於批次大小)即可。


記憶體調整

調整指南 中已經詳細討論調整 Spark 應用程式的記憶體使用量和 GC 行為。強烈建議您閱讀該指南。在本節中,我們將特別針對 Spark 串流應用程式討論一些調整參數。

Spark 串流應用程式所需的叢集記憶體量在很大程度上取決於所使用的轉換類型。例如,如果您想對過去 10 分鐘的資料使用視窗運算,則您的叢集應有足夠的記憶體來儲存 10 分鐘的資料。或者,如果您想對大量金鑰使用 updateStateByKey,則所需的記憶體會很高。相反地,如果您想執行簡單的 map-filter-store 運算,則所需的記憶體會很低。

一般來說,由於透過接收器接收的資料會儲存在 StorageLevel.MEMORY_AND_DISK_SER_2 中,因此不符合記憶體大小的資料會溢位到磁碟。這可能會降低串流應用程式的效能,因此建議提供串流應用程式所需的足夠記憶體。最好嘗試並查看小規模的記憶體使用量,並據此估計。

記憶體調整的另一個面向是垃圾回收。對於需要低延遲的串流應用程式,不希望有 JVM 垃圾回收造成的長時間暫停。

有幾個參數可以幫助您調整記憶體使用量和 GC 負擔


要記住的重要事項


容錯語意

在本節中,我們將討論 Spark 串流應用程式在發生故障時的行為。

背景

為了了解 Spark 串流提供的語意,讓我們記住 Spark RDD 的基本容錯語意。

  1. RDD 是不可變的、可確定地重新計算的、分散式資料集。每個 RDD 都會記住用於在容錯輸入資料集上建立它的確定性運算的譜系。
  2. 如果 RDD 的任何分割區因工作節點故障而遺失,則可以使用運算譜系從原始容錯資料集重新計算該分割區。
  3. 假設所有 RDD 轉換都是確定性的,最終轉換後的 RDD 中的資料將始終相同,與 Spark 集群中的故障無關。

Spark 在容錯檔案系統(例如 HDFS 或 S3)中對資料進行操作。因此,從容錯資料產生的所有 RDD 也是容錯的。然而,對於 Spark Streaming 來說並非如此,因為在大多數情況下,資料都是透過網路接收的(除非使用 fileStream)。為了對所有產生的 RDD 達到相同的容錯屬性,接收到的資料會在叢集中的工作節點中的多個 Spark executor 之間複製(預設複製係數為 2)。這會導致系統中出現兩種資料,在發生故障時需要復原

  1. 已接收並複製的資料 - 由於其中一個副本存在於其他節點上,因此這種資料可以在單一工作節點發生故障時存活下來。
  2. 已接收但緩衝以進行複製的資料 - 由於這類資料未複製,因此復原這種資料的唯一方法是從來源再次取得。

此外,我們應該關注兩種故障

  1. 工作節點故障 - 執行 executor 的任何工作節點都可能發生故障,這些節點上的所有記憶體中資料都將遺失。如果任何接收器在發生故障的節點上執行,則其緩衝資料將遺失。
  2. 驅動程式節點故障 - 如果執行 Spark Streaming 應用程式的驅動程式節點發生故障,則 SparkContext 顯然會遺失,並且所有 executor 及其記憶體中資料都會遺失。

有了這些基本知識,讓我們瞭解 Spark Streaming 的容錯語意。

定義

串流系統的語意通常以系統可以處理每個記錄的次數來描述。系統可以在所有可能的作業條件下(儘管發生故障等)提供三種類型的保證

  1. 最多一次:每個記錄將被處理一次或根本不處理。
  2. 至少一次:每個記錄將被處理一次或多次。這比最多一次更強,因為它確保不會遺失任何資料。但可能會出現重複。
  3. 完全一次:每個記錄將被處理完全一次 - 沒有資料會遺失,也沒有資料會被處理多次。這顯然是三者中最強的保證。

基本語意

在任何串流處理系統中,廣義來說,處理資料有三個步驟。

  1. 接收資料:資料使用接收器或其他方式從來源接收。

  2. 轉換資料:使用 DStream 和 RDD 轉換轉換接收到的資料。

  3. 傳送資料:最終轉換後的資料會傳送到外部系統,例如檔案系統、資料庫、儀表板等。

如果串流應用程式必須實現端對端的完全一次保證,則每個步驟都必須提供完全一次保證。也就是說,每個記錄都必須接收一次、轉換一次,並傳送到下游系統一次。讓我們瞭解 Spark Streaming 背景下這些步驟的語意。

  1. 接收資料:不同的輸入來源提供不同的保證。這將在下一個小節中詳細討論。

  2. 資料轉換:所有已接收的資料將會被處理一次且僅一次,這要歸功於 RDD 提供的保證。即使發生故障,只要接收到的輸入資料可以存取,最後轉換的 RDD 內容將始終相同。

  3. 輸出資料:輸出操作預設會確保至少一次的語意,因為它取決於輸出操作的類型(冪等或非冪等)以及下游系統的語意(是否支援交易)。但使用者可以實作自己的交易機制來達成一次且僅一次的語意。本節稍後將會更詳細地討論。

接收資料的語意

不同的輸入來源提供不同的保證,從至少一次一次且僅一次。請閱讀以取得更多詳細資料。

使用檔案

如果所有輸入資料已經存在於容錯檔案系統(例如 HDFS)中,Spark 串流處理可以從任何故障中復原並處理所有資料。這會提供一次且僅一次的語意,表示所有資料將會處理一次且僅一次,不論發生什麼故障。

使用基於接收器的來源

對於基於接收器的輸入來源,容錯語意取決於故障情境和接收器的類型。正如我們先前討論的,有兩種接收器

  1. 可靠接收器 - 這些接收器僅在確保已接收的資料已複製後才會確認可靠來源。如果此類接收器發生故障,來源將不會收到已緩衝(未複製)資料的確認。因此,如果接收器重新啟動,來源將會重新傳送資料,而且不會因為故障而遺失任何資料。
  2. 不可靠接收器 - 此類接收器不會傳送確認,因此可能會在因工作節點或驅動程式故障而發生故障時遺失資料。

我們會根據使用哪種類型的接收器來達成以下語意。如果工作節點發生故障,則使用可靠接收器不會遺失資料。使用不可靠接收器時,已接收但未複製的資料可能會遺失。如果驅動程式節點發生故障,則除了這些遺失的資料之外,所有過去已接收並複製到記憶體中的資料都將會遺失。這會影響有狀態轉換的結果。

為了避免遺失過去已接收的資料,Spark 1.2 引入了寫入前記錄,將已接收的資料儲存到容錯儲存體中。在啟用寫入前記錄並使用可靠接收器的情況下,不會遺失任何資料。在語意的部分,它提供至少一次的保證。

下表摘要說明故障下的語意

部署情境 工作器故障 驅動程式故障
Spark 1.1 或更早版本,
Spark 1.2 或更新版本,但沒有寫入前記錄
使用不可靠接收器時,會遺失緩衝資料
使用可靠接收器時,不會遺失資料
至少一次語意
使用不可靠接收器時,會遺失緩衝資料
使用所有接收器時,會遺失過去的資料
未定義語意
Spark 1.2 或更新版本,且有寫入前記錄 使用可靠接收器時,不會遺失資料
至少一次語意
使用可靠接收器和檔案時,不會遺失資料
至少一次語意

使用 Kafka Direct API

在 Spark 1.3 中,我們引進了新的 Kafka Direct API,可確保 Spark Streaming 僅接收一次所有 Kafka 資料。此外,如果您實作一次性輸出作業,就能達成端對端的一次性保證。此方法在 Kafka 整合指南 中有更進一步的說明。

輸出作業的語意

輸出作業(例如 foreachRDD)具有至少一次語意,也就是說,在工作器故障時,轉換後的資料可能會寫入外部實體多次。雖然這對於使用 saveAs***Files 作業儲存到檔案系統是可以接受的(因為檔案會被相同資料覆寫),但可能需要額外的努力才能達成一次性語意。有兩種方法。



後續步驟