結構化串流 + Kafka 整合指南 (Kafka 代理版本 0.10.0 或更高)

Kafka 0.10 的結構化串流整合,用於從 Kafka 讀取資料和寫入資料到 Kafka。

連結

對於使用 SBT/Maven 專案定義的 Scala/Java 應用程式,請將您的應用程式連結至下列人工製品

groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.12
version = 3.5.1

請注意,要使用標頭功能,您的 Kafka 客戶端版本應為 0.11.0.0 或以上版本。

對於 Python 應用程式,您需要在部署應用程式時加入這個上方的程式庫及其相依性。請參閱下方的 部署 小節。

對於在 spark-shell 上進行實驗,您需要在呼叫 spark-shell 時,加入這個上方的程式庫及其相依性。此外,請參閱下方的 部署 小節。

從 Kafka 讀取資料

為串流查詢建立 Kafka 來源

# Subscribe to 1 topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to 1 topic, with headers
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("includeHeaders", "true") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")

# Subscribe to multiple topics
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to a pattern
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// Subscribe to 1 topic
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to 1 topic, with headers
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("includeHeaders", "true")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
  .as[(String, String, Array[(String, Array[Byte])])]

// Subscribe to multiple topics
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a pattern
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
// Subscribe to 1 topic
Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Subscribe to 1 topic, with headers
Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("includeHeaders", "true")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers");

// Subscribe to multiple topics
Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Subscribe to a pattern
Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

為批次查詢建立 Kafka 來源

如果您有更適合批次處理的用例,您可以為已定義範圍的偏移建立 Dataset/DataFrame。

# Subscribe to 1 topic defaults to the earliest and latest offsets
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to multiple topics, specifying explicit Kafka offsets
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to a pattern, at the earliest and latest offsets
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
// Subscribe to 1 topic defaults to the earliest and latest offsets
Dataset<Row> df = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Subscribe to multiple topics, specifying explicit Kafka offsets
Dataset<Row> df = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
  .option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Subscribe to a pattern, at the earliest and latest offsets
Dataset<Row> df = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

來源中的每列都有下列結構

欄位類型
key 二進位
value 二進位
topic 字串
partition int
offset long
timestamp timestamp
timestampType int
headers (選用) 陣列

必須為批次和串流查詢的 Kafka 來源設定下列選項。

選項value意義
assign json 字串 {"topicA":[0,1],"topicB":[2,4]} 要使用的特定 TopicPartitions。只能為 Kafka 來源指定「assign」、「subscribe」或「subscribePattern」選項之一。
subscribe 逗號分隔的 topic 清單 要訂閱的 topic 清單。只能為 Kafka 來源指定「assign」、「subscribe」或「subscribePattern」選項之一。
subscribePattern Java 正規表示式字串 用於訂閱 topic 的模式。只能為 Kafka 來源指定「assign」、「subscribe」或「subscribePattern」選項之一。
kafka.bootstrap.servers 逗號分隔的 host:port 清單 Kafka 的「bootstrap.servers」設定。

下列設定為選用

選項value預設查詢類型意義
startingTimestamp 時間戳字串,例如「1000」 無 (下一個優先順序為 startingOffsetsByTimestamp) 串流和批次 查詢開始時的時間戳起點,一個字串用於指定訂閱主題中所有分區的起始時間戳。請參閱下方時間戳偏移選項的詳細資料。如果 Kafka 沒有傳回相符的偏移,行為將遵循選項 startingOffsetsByTimestampStrategy 的值

註 1:startingTimestamp 優先於 startingOffsetsByTimestampstartingOffsets

註 2:對於串流查詢,這僅適用於啟動新查詢時,而繼續進行時將始終從查詢中斷處開始。查詢期間新發現的分區將從最早開始。

startingOffsetsByTimestamp json 字串 """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ 無 (下一個優先順序為 startingOffsets) 串流和批次 查詢開始時的時間戳起點,一個 json 字串用於指定每個 TopicPartition 的起始時間戳。請參閱下方時間戳偏移選項的詳細資料。如果 Kafka 沒有傳回相符的偏移,行為將遵循選項 startingOffsetsByTimestampStrategy 的值

註 1:startingOffsetsByTimestamp 優先於 startingOffsets

註 2:對於串流查詢,這僅適用於啟動新查詢時,而繼續進行時將始終從查詢中斷處開始。查詢期間新發現的分區將從最早開始。

startingOffsets "earliest"、"latest" (僅限串流) 或 json 字串 """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """ "latest" 適用於串流,"earliest" 適用於批次 串流和批次 查詢開始時的起點,可以是 "earliest"(從最早的偏移開始),"latest"(僅從最新的偏移開始),或一個 json 字串用於指定每個 TopicPartition 的起始偏移。在 json 中,-2 作為偏移可以表示最早,-1 表示最新。注意:對於批次查詢,不允許 latest(隱含或在 json 中使用 -1)。對於串流查詢,這僅適用於啟動新查詢時,而繼續進行時將始終從查詢中斷處開始。查詢期間新發現的分區將從最早開始。
endingTimestamp 時間戳字串,例如「1000」 無 (下一個優先順序為 endingOffsetsByTimestamp) 批次查詢 批次查詢結束時的終點,一個 json 字串用於指定訂閱主題中所有分區的結束時間戳。請參閱下方時間戳偏移選項的詳細資料。如果 Kafka 沒有傳回相符的偏移,偏移將設定為 latest。

註:endingTimestamp 優先於 endingOffsetsByTimestampendingOffsets

endingOffsetsByTimestamp json 字串 """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ 無 (下一個優先選項為 endingOffsets) 批次查詢 批次查詢結束時的終點,一個 json 字串,指定每個 TopicPartition 的結束時間戳記。請參閱下方時間戳記偏移選項的詳細資料。如果 Kafka 沒有傳回匹配的偏移,偏移將設定為最新。

注意:endingOffsetsByTimestamp 優先於 endingOffsets

endingOffsets 最新或 json 字串 {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} 最新 批次查詢 批次查詢結束時的終點,可能是「最新」,表示最新,或是一個 json 字串,指定每個 TopicPartition 的結束偏移。在 json 中,-1 作為偏移可以用來表示最新,-2 (最早) 作為偏移是不允許的。
failOnDataLoss true 或 false true 串流和批次 當資料可能遺失 (例如,主題已刪除或偏移超出範圍) 時,是否讓查詢失敗。這可能是一場虛驚。當它沒有如預期般運作時,您可以停用它。
kafkaConsumer.pollTimeoutMs long 120000 串流和批次 在執行器中輪詢 Kafka 資料的逾時時間 (毫秒)。未定義時,它會回退到 spark.network.timeout
fetchOffset.numRetries int 3 串流和批次 放棄擷取 Kafka 偏移之前的重試次數。
fetchOffset.retryIntervalMs long 10 串流和批次 在重試擷取 Kafka 偏移之前要等待的毫秒數
maxOffsetsPerTrigger long 串流查詢 每個觸發間隔處理的最大偏移數量的速率限制。指定的偏移總數將按比例分配到不同容量的 topicPartitions。
minOffsetsPerTrigger long 串流查詢 每個觸發間隔要處理的最小偏移數量。指定的偏移總數將按比例分配到不同容量的 topicPartitions。請注意,如果超過 maxTriggerDelay,即使可用的偏移數量未達到 minOffsetsPerTrigger,也會觸發觸發器。
maxTriggerDelay 帶有單位的時間 15m 串流查詢 觸發器可以在兩個觸發器之間延遲的最大時間量,前提是來源有一些資料可用。此選項僅適用於設定 minOffsetsPerTrigger 的情況。
minPartitions int 串流和批次 希望從 Kafka 讀取的最小分區數。預設情況下,Spark 會將 topicPartitions 與從 Kafka 使用的 Spark 分區進行 1 對 1 的對應。如果您將此選項設定為大於 topicPartitions 的值,Spark 會將大型 Kafka 分區分割成較小的部分。請注意,此組態就像一個 提示:Spark 任務的數量將大約minPartitions。它可能會更少或更多,具體取決於捨入誤差或未收到任何新資料的 Kafka 分區。
groupIdPrefix 字串 spark-kafka-source 串流和批次 結構化串流查詢所產生的消費者群組識別碼 (group.id) 字首。如果設定「kafka.group.id」,則會忽略此選項。
kafka.group.id 字串 串流和批次 從 Kafka 讀取時,在 Kafka 消費者中使用的 Kafka 群組識別碼。請謹慎使用。預設情況下,每個查詢會產生一個唯一的群組識別碼來讀取資料。這可確保每個 Kafka 來源都有自己的消費者群組,不受任何其他消費者的干擾,因此可以讀取其訂閱主題的所有分割區。在某些情況下(例如,基於 Kafka 群組的授權),您可能希望使用特定的授權群組識別碼來讀取資料。您可以選擇設定群組識別碼。但是,請極為謹慎地執行此操作,因為它可能會導致意外行為。同時執行的查詢(批次和串流)或具有相同群組識別碼的來源可能會互相干擾,導致每個查詢只讀取部分資料。當查詢快速連續啟動/重新啟動時,也可能會發生這種情況。若要將此類問題降至最低,請將 Kafka 消費者會話逾時(透過設定選項「kafka.session.timeout.ms」)設定為非常短。設定此選項後,將會忽略選項「groupIdPrefix」。
includeHeaders 布林值 false 串流和批次 是否將 Kafka 標頭包含在列中。
startingOffsetsByTimestampStrategy 「error」或「latest」 「error」 串流和批次 當透過時間戳指定起始偏移量(全域或每個分割區)與 Kafka 傳回的偏移量不符時,將使用此策略。以下是策略名稱和對應說明

「error」:查詢失敗,且最終使用者必須處理需要手動步驟的解決方案。

「latest」:指派這些分割區的最新偏移量,以便 Spark 可以從這些分割區在後續微批次中讀取較新的記錄。

時間戳偏移量選項的詳細資料

傳回的每個分割區偏移量是時間戳大於或等於對應分割區中指定時間戳的最早偏移量。如果 Kafka 未傳回相符的偏移量,則行為會因選項而異 - 請查看每個選項的說明。

Spark 僅將時間戳記資訊傳遞給 KafkaConsumer.offsetsForTimes,而不會詮釋或推論該值。有關 KafkaConsumer.offsetsForTimes 的更多詳細資料,請參閱 javadoc 以取得詳細資料。此外,此處 timestamp 的意義會根據 Kafka 組態 (log.message.timestamp.type) 而有所不同:請參閱 Kafka 文件 以取得進一步的詳細資料。

時間戳記偏移選項需要 Kafka 0.10.1.0 或更高版本。

偏移擷取

在 Spark 3.0 及之前版本,Spark 使用 KafkaConsumer 進行偏移擷取,這可能會導致驅動程式無限等待。在 Spark 3.1 中,新增了一個組態選項 spark.sql.streaming.kafka.useDeprecatedOffsetFetching (預設值:false),讓 Spark 可以使用新的偏移擷取機制,並使用 AdminClient。(將此設定為 true 以使用舊的偏移擷取方式,並使用 KafkaConsumer。)

當使用新機制時,會套用下列內容。

首先,新方法支援 Kafka 仲介 0.11.0.0+

在 Spark 3.0 及以下版本中,安全的 Kafka 處理需要從驅動程式的角度使用下列 ACL:

自 Spark 3.1 起,可以使用 AdminClient 而不是 KafkaConsumer 來取得偏移,並且需要從驅動程式的角度使用下列 ACL:

由於驅動程式中的 AdminClient 沒有連線到消費者群組,因此基於 group.id 的授權將不再運作 (執行器從未執行基於群組的授權)。值得一提的是,執行器端會像以前一樣運作 (群組前綴和覆寫會運作)。

消費者快取

初始化 Kafka 消費者很耗時,尤其是在串流場景中,處理時間是一個關鍵因素。因此,Spark 會透過利用 Apache Commons Pool 在執行器上集中 Kafka 消費者。

快取金鑰是由下列資訊組成

下列屬性可供設定消費者集中

屬性名稱預設值意義版本
spark.kafka.consumer.cache.capacity 64 快取的消費者最大數量。請注意,這是軟性限制。 3.0.0
spark.kafka.consumer.cache.timeout 5m (5 分鐘) 消費者在池中閒置的最小時間,然後才符合驅逐程式驅逐的資格。 3.0.0
spark.kafka.consumer.cache.evictorThreadRunInterval 1m (1 分鐘) 消費者池閒置驅逐執行緒執行間隔。當非正值時,不會執行任何閒置驅逐執行緒。 3.0.0
spark.kafka.consumer.cache.jmx.enable false 針對使用此組態實例建立的池啟用或停用 JMX。池的統計資料可透過 JMX 實例取得。JMX 名稱的前綴設定為「kafka010-cached-simple-kafka-consumer-pool」。 3.0.0

池的大小受 spark.kafka.consumer.cache.capacity 限制,但它作為「軟性限制」以不阻擋 Spark 工作。

閒置驅逐執行緒會定期移除未超過指定逾時時間未使用的消費者。當借用時達到此臨界值,它會嘗試移除目前未使用的最少使用項目。

如果無法移除,則池會持續擴增。最糟的情況是,池會擴增到執行器中可執行的並發工作最大數量(也就是工作時段數量)。

如果工作因任何原因失敗,則會使用新建立的 Kafka 消費者執行新工作,以確保安全性。同時,我們會使池中具有相同快取金鑰的所有消費者失效,以移除在執行失敗時使用的消費者。任何其他工作正在使用的消費者不會關閉,但會在歸還到池中時失效。

除了消費者之外,Spark 會分別快取從 Kafka 擷取的記錄,讓 Kafka 消費者在 Spark 的觀點中成為無狀態,並最大化快取效率。它利用與 Kafka 消費者池相同的快取金鑰。請注意,由於特性的差異,它不使用 Apache Commons Pool。

下列屬性可供組態擷取的資料池

屬性名稱預設值意義版本
spark.kafka.consumer.fetchedData.cache.timeout 5m (5 分鐘) 擷取的資料在池中閒置的最小時間,然後才符合驅逐程式驅逐的資格。 3.0.0
spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval 1m (1 分鐘) 擷取的資料池閒置驅逐執行緒執行間隔。當非正值時,不會執行任何閒置驅逐執行緒。 3.0.0

將資料寫入 Kafka

在此,我們說明支援撰寫串流查詢和批次查詢至 Apache Kafka。請注意,Apache Kafka 僅支援至少一次寫入語意。因此,當寫入(串流查詢或批次查詢)至 Kafka 時,某些記錄可能會重複;例如,如果 Kafka 需要重試未經仲介確認的訊息,即使該仲介已接收並寫入訊息記錄,也可能會發生這種情況。由於這些 Kafka 寫入語意,結構化串流無法防止此類重複發生。但是,如果查詢寫入成功,則您可以假設查詢輸出至少寫入一次。在讀取已寫入資料時,移除重複項目的可能解決方案是,引入一個主鍵(唯一鍵),該鍵可用於在讀取時執行重複資料刪除。

寫入 Kafka 的資料框應該在架構中具有下列欄位

欄位類型
key(選用) 字串或二進位
value(必填) 字串或二進位
headers (選用) 陣列
topic(*選用) 字串
partition(選用) int

* 如果未指定「topic」組態選項,則需要 topic 欄位。

value 欄位是唯一必填選項。如果未指定 key 欄位,則會自動新增值為 null 的 key 欄位(請參閱 Kafka 語意,瞭解如何處理值為 null 的 key 值)。如果存在 topic 欄位,則在將給定列寫入 Kafka 時,其值會用作 topic,除非已設定「topic」組態選項,亦即「topic」組態選項會覆寫 topic 欄位。如果未指定「partition」欄位(或其值為 null),則會由 Kafka 產生器計算 partition。可以在 Spark 中透過設定 kafka.partitioner.class 選項來指定 Kafka 分割器。如果不存在,則會使用 Kafka 預設分割器。

必須為批次和串流查詢設定 Kafka 接收器下列選項。

選項value意義
kafka.bootstrap.servers 逗號分隔的 host:port 清單 Kafka 的「bootstrap.servers」設定。

下列設定為選用

選項value預設查詢類型意義
topic 字串 串流和批次 設定所有列將寫入 Kafka 中的 topic。此選項會覆寫資料中可能存在的任何 topic 欄位。
includeHeaders 布林值 false 串流和批次 是否將 Kafka 標頭包含在列中。

為串流查詢建立 Kafka 接收器

# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
ds = df \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .start()

# Write key-value data from a DataFrame to Kafka using a topic specified in the data
ds = df \
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .start()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
val ds = df
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
StreamingQuery ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start();

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
StreamingQuery ds = df
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start();

將批次查詢的輸出寫入 Kafka

# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .save()

# Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .save()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save()

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save();

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save();

產生器快取

由於 Kafka 產生器執行個體被設計為執行緒安全,Spark 會初始化 Kafka 產生器執行個體,並在相同快取金鑰的各個工作中共同使用。

快取金鑰是由下列資訊組成

這包含授權設定,Spark 會在使用委派權杖時自動包含。即使我們考慮授權,您也可以預期在相同的 Kafka 產生器設定中會使用相同的 Kafka 產生器執行個體。當委派權杖更新時,它會使用不同的 Kafka 產生器;舊委派權杖的 Kafka 產生器執行個體會根據快取原則逐出。

以下屬性可供設定產生器池

屬性名稱預設值意義版本
spark.kafka.producer.cache.timeout 10m(10 分鐘) 產生器在池中閒置的最小時間,然後才有資格被逐出器逐出。 2.2.1
spark.kafka.producer.cache.evictorThreadRunInterval 1m (1 分鐘) 產生器池的閒置逐出器執行緒執行間隔。如果是非正數,則不會執行任何閒置逐出器執行緒。 3.0.0

閒置逐出執行緒會定期移除未使用超過指定逾時時間的產生器。請注意,產生器是共用的且同時使用,因此最後使用時間戳記是由產生器執行個體傳回且參考計數為 0 的時刻決定的。

Kafka 特定設定

Kafka 自身的設定可透過 DataStreamReader.option 設定,加上 kafka. 前置詞,例如 stream.option("kafka.bootstrap.servers", "host:port")。有關可能的 kafka 參數,請參閱 Kafka 消費者設定文件,以取得與讀取資料相關的參數,以及 Kafka 產生器設定文件,以取得與寫入資料相關的參數。

請注意,無法設定以下 Kafka 參數,而且 Kafka 來源或接收器會擲回例外

部署

與任何 Spark 應用程式一樣,spark-submit 用於啟動您的應用程式。 spark-sql-kafka-0-10_2.12 及其相依性可以使用 --packages 直接新增到 spark-submit,例如:

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 ...

對於在 spark-shell 上進行實驗,您也可以使用 --packages 直接新增 spark-sql-kafka-0-10_2.12 及其相依性。

./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 ...

請參閱 應用程式提交指南,以取得有關提交具有外部相依性的應用程式的更多詳細資料。

安全性

Kafka 0.9.0.0 引入了多項功能,可提升叢集中的安全性。如需關於這些可能性的詳細說明,請參閱 Kafka 安全性文件

值得注意的是,安全性是可選的,且預設為關閉。

Spark 支援下列方式來對抗 Kafka 叢集進行驗證

委派權杖

透過這種方式,應用程式可透過 Spark 參數進行組態,且可能不需要 JAAS 登入組態(Spark 可以使用 Kafka 的動態 JAAS 組態功能)。如需關於委派權杖的更多資訊,請參閱 Kafka 委派權杖文件

此程序是由 Spark 的 Kafka 委派權杖提供者啟動的。當設定 spark.kafka.clusters.${cluster}.auth.bootstrap.servers 時,Spark 會依據喜好順序考量下列登入選項

Kafka 委派權杖提供者可透過將 spark.security.credentials.kafka.enabled 設定為 false(預設:true)來關閉。

Spark 可設定為使用下列驗證協定來取得權杖(它必須與 Kafka 仲介組態相符)

在成功取得委派權杖後,Spark 會將它分發到各節點,並適時更新。委派權杖使用 SCRAM 登入模組進行驗證,因此必須組態適當的 spark.kafka.clusters.${cluster}.sasl.token.mechanism(預設:SCRAM-SHA-512)。此外,此參數必須與 Kafka 仲介組態相符。

當委派權杖在執行器上可用時,Spark 會依下列順序考慮登入選項

當以上皆不適用時,則假設為非安全連線。

設定

委派權杖可以從多個叢集取得,而 ${cluster} 是任意唯一的識別碼,用於將不同的設定分組。

屬性名稱預設值意義版本
spark.kafka.clusters.${cluster}.auth.bootstrap.servers 用於建立與 Kafka 叢集的初始連線的主機/埠對清單,以逗號分隔。有關進一步的詳細資料,請參閱 Kafka 文件。僅用於取得委派權杖。 3.0.0
spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex .* 正規表示式,用於比對應用程式中來源和儲存槽的 bootstrap.servers 設定。如果伺服器位址與此正規表示式相符,則連線時會使用從各自的引導伺服器取得的委派權杖。如果多個叢集與位址相符,則會擲回例外,且查詢不會啟動。Kafka 的安全和非安全監聽器會繫結到不同的埠。當同時使用時,安全監聽器埠必須是正規表示式的一部分。 3.0.0
spark.kafka.clusters.${cluster}.security.protocol SASL_SSL 用於與仲介通訊的通訊協定。有關進一步的詳細資料,請參閱 Kafka 文件。通訊協定套用於所有來源和儲存槽,預設為 bootstrap.servers 設定相符(有關進一步的詳細資料,請參閱 spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex),且可以透過在來源或儲存槽上設定 kafka.security.protocol 來覆寫。 3.0.0
spark.kafka.clusters.${cluster}.sasl.kerberos.service.name kafka Kafka 以其身分執行的 Kerberos 主體名稱。這可以在 Kafka 的 JAAS 設定或 Kafka 的設定中定義。有關進一步的詳細資料,請參閱 Kafka 文件。僅用於取得委派權杖。 3.0.0
spark.kafka.clusters.${cluster}.ssl.truststore.type 信任儲存檔檔案的檔案格式。有關進一步的詳細資料,請參閱 Kafka 文件。僅用於取得委派權杖。 3.2.0
spark.kafka.clusters.${cluster}.ssl.truststore.location 信任儲存檔檔案的位置。有關進一步的詳細資料,請參閱 Kafka 文件。僅用於取得委派權杖。 3.0.0
spark.kafka.clusters.${cluster}.ssl.truststore.password 信任儲存檔檔案的儲存密碼。這是選用的,僅在設定 spark.kafka.clusters.${cluster}.ssl.truststore.location 時才需要。有關進一步的詳細資料,請參閱 Kafka 文件。僅用於取得委派權杖。 3.0.0
spark.kafka.clusters.${cluster}.ssl.keystore.type 金鑰儲存檔案的檔案格式。這對客戶端來說是選配的。有關進一步的詳細資料,請參閱 Kafka 文件。僅用於取得委派權杖。 3.2.0
spark.kafka.clusters.${cluster}.ssl.keystore.location 金鑰儲存檔案的位置。這對客戶端來說是選配的,可用於客戶端的雙向驗證。有關進一步的詳細資料,請參閱 Kafka 文件。僅用於取得委派權杖。 3.0.0
spark.kafka.clusters.${cluster}.ssl.keystore.password 金鑰儲存檔案的儲存密碼。這項是選配的,僅在設定 spark.kafka.clusters.${cluster}.ssl.keystore.location 時需要。有關進一步的詳細資料,請參閱 Kafka 文件。僅用於取得委派權杖。 3.0.0
spark.kafka.clusters.${cluster}.ssl.key.password 金鑰儲存檔案中私人金鑰的密碼。這對客戶端來說是選配的。有關進一步的詳細資料,請參閱 Kafka 文件。僅用於取得委派權杖。 3.0.0
spark.kafka.clusters.${cluster}.sasl.token.mechanism SCRAM-SHA-512 使用委派權杖進行客戶端連線的 SASL 機制。由於用於驗證的 SCRAM 登入模組,因此必須在此設定相容的機制。有關進一步的詳細資料,請參閱 Kafka 文件 (sasl.mechanism)。僅用於使用委派權杖對 Kafka 代理進行驗證。 3.0.0

Kafka 特定設定

Kafka 的自有設定可以使用 kafka. 前置詞來設定,例如,--conf spark.kafka.clusters.${cluster}.kafka.retries=1。有關可能的 Kafka 參數,請參閱 Kafka adminclient 設定文件

注意事項

JAAS 登入組態

JAAS 登入設定必須放置在 Spark 嘗試存取 Kafka 群集的所有節點上。這提供了套用任何自訂驗證邏輯的可能性,但維護成本較高。這可以用多種方式來完成。一種可能性是提供額外的 JVM 參數,例如,

./bin/spark-submit \
    --driver-java-options "-Djava.security.auth.login.config=/path/to/custom_jaas.conf" \
    --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/custom_jaas.conf \
    ...