Spark 串流 + Kafka 整合指南(Kafka 仲介版本 0.10.0 或更高)
Kafka 0.10 的 Spark Streaming 整合提供了簡單的並行處理、Kafka 分割區與 Spark 分割區之間的 1:1 對應,以及存取偏移量和元資料。然而,由於較新的整合使用 新的 Kafka 消費者 API,而不是簡單的 API,因此在使用方式上有顯著的差異。
連結
對於使用 SBT/Maven 專案定義的 Scala/Java 應用程式,請將串流應用程式連結至下列人工製品(請參閱主要程式設計指南中的 連結區段 以取得進一步資訊)。
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.12
version = 3.5.1
請勿手動新增對 org.apache.kafka
人工製品(例如 kafka-clients
)的相依性。 spark-streaming-kafka-0-10
人工製品已具備適當的傳遞相依性,而不同的版本可能以難以診斷的方式不相容。
建立直接串流
請注意,匯入的命名空間包含版本,org.apache.spark.streaming.kafka010
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
串流中的每個項目都是 ConsumerRecord
import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("topicA", "topicB");
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
有關可能的 kafkaParams,請參閱 Kafka 消費者設定文件。如果您的 Spark 批次持續時間大於預設的 Kafka 心跳會話逾時(30 秒),請適當地增加 heartbeat.interval.ms 和 session.timeout.ms。對於大於 5 分鐘的批次,這將需要變更代理伺服器上的 group.max.session.timeout.ms。請注意,範例將 enable.auto.commit 設定為 false,有關討論,請參閱下方的 儲存偏移量。
位置策略
新的 Kafka 消費者 API 會將訊息預先擷取到緩衝區中。因此,基於效能考量,Spark 整合在執行器上保留快取的消費者(而不是為每個批次重新建立)非常重要,並優先在具有適當消費者的主機位置上排程分割區。
在大多數情況下,您應該如上所示使用 LocationStrategies.PreferConsistent
。這會將分割區平均分配到可用的執行器上。如果您的執行器與 Kafka 代理伺服器位於相同的主機上,請使用 PreferBrokers
,它會優先在該分割區的 Kafka 領導者上排程分割區。最後,如果分割區之間的負載有顯著的傾斜,請使用 PreferFixed
。這讓您可以指定分割區與主機的明確對應(任何未指定的分割區將使用一致的位置)。
消費者的快取預設最大大小為 64。如果您預期要處理超過(64 * 執行器數量)個 Kafka 分割區,您可以透過 spark.streaming.kafka.consumer.cache.maxCapacity
變更此設定。
如果您想停用 Kafka 消費者快取,您可以將 spark.streaming.kafka.consumer.cache.enabled
設為 false
。
快取以 topicpartition 和 group.id 為鍵,因此請為每次呼叫 createDirectStream
使用獨立的 group.id
。
ConsumerStrategies
新的 Kafka 消費者 API 有許多不同的方式來指定主題,其中一些需要大量的物件實例化後設定。 ConsumerStrategies
提供一個抽象,讓 Spark 能在從檢查點重新啟動後取得適當設定的消費者。
如上所示,ConsumerStrategies.Subscribe
讓您可以訂閱固定主題集合。 SubscribePattern
讓您可以使用正規表示法指定感興趣的主題。請注意,與 0.8 整合不同,使用 Subscribe
或 SubscribePattern
應回應在執行串流期間新增分割區。最後,Assign
讓您可以指定固定分割區集合。所有三種策略都有重載建構函式,讓您可以指定特定分割區的起始偏移量。
如果您有上述選項未滿足的特定消費者設定需求,ConsumerStrategy
是您可以延伸的公開類別。
建立 RDD
如果您有更適合批次處理的用例,您可以為定義的偏移量範圍建立 RDD。
// Import dependencies and create kafka params as in Create Direct Stream above
val offsetRanges = Array(
// topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange("test", 0, 0, 100),
OffsetRange("test", 1, 0, 100)
)
val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
// Import dependencies and create kafka params as in Create Direct Stream above
OffsetRange[] offsetRanges = {
// topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange.create("test", 0, 0, 100),
OffsetRange.create("test", 1, 0, 100)
};
JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
sparkContext,
kafkaParams,
offsetRanges,
LocationStrategies.PreferConsistent()
);
請注意,您無法使用 PreferBrokers
,因為沒有串流,因此沒有驅動程式端的消費者可以自動為您查詢仲介資料。如有必要,請使用 PreferFixed
和您自己的資料查詢。
取得偏移量
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
}
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
rdd.foreachPartition(consumerRecords -> {
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
System.out.println(
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
});
});
請注意,將型別轉換為 HasOffsetRanges
僅在 createDirectStream
結果的第一個方法中執行時才會成功,而不是在方法鏈的後面。請注意,RDD 分割區和 Kafka 分割區之間的一對一對應關係在任何洗牌或重新分割區的方法(例如 reduceByKey() 或 window())後都不會保留。
儲存偏移量
發生錯誤時 Kafka 傳遞語意取決於偏移量儲存的方式和時間。Spark 輸出作業是 至少一次。因此,如果您想要等同於完全一次的語意,您必須在冪等輸出後儲存偏移量,或在輸出時在原子交易中儲存偏移量。透過此整合,您有 3 個選項(依可靠性(和程式碼複雜度)增加)來儲存偏移量。
檢查點
如果您啟用 Spark 檢查點,偏移量將儲存在檢查點中。這很容易啟用,但有缺點。您的輸出作業必須是冪等的,因為您會得到重複的輸出;交易不是一個選項。此外,如果您的應用程式程式碼已變更,您無法從檢查點中復原。對於計畫中的升級,您可以透過同時執行新程式碼和舊程式碼來減輕此問題(由於輸出需要是冪等的,因此它們不應衝突)。但是,對於需要程式碼變更的意外失敗,除非您有其他方法來識別已知的良好起始偏移量,否則您將會遺失資料。
Kafka 本身
Kafka 有一個偏移量提交 API,用於將偏移量儲存在一個特殊的 Kafka 主題中。預設情況下,新的消費者會定期自動提交偏移量。這幾乎可以確定不是你想要的,因為消費者成功輪詢到的訊息可能尚未產生 Spark 輸出操作,導致語意未定義。這就是上述串流範例將「enable.auto.commit」設為 false 的原因。但是,你可以使用 commitAsync
API,在你確定輸出已儲存後,將偏移量提交到 Kafka。與檢查點相比,優點是 Kafka 是一個耐用的儲存,不受應用程式程式碼變更的影響。但是,Kafka 不是事務性的,所以你的輸出仍必須是冪等的。
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
與 HasOffsetRanges 一樣,轉換為 CanCommitOffsets 僅在對 createDirectStream 的結果呼叫時才會成功,而不是在轉換之後。commitAsync 呼叫是執行緒安全的,但如果你想要有意義的語意,則必須在輸出後執行。
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// some time later, after outputs have completed
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
});
你自己的資料儲存
對於支援事務的資料儲存,在與結果相同的交易中儲存偏移量,即使在失敗情況下,也能讓兩者保持同步。如果你小心地偵測重複或略過的偏移量範圍,則回滾交易可以防止重複或遺失的訊息影響結果。這提供了等同於一次性語意的效果。即使對於通常難以冪等的聚合所產生的輸出,也可以使用此策略。
// The details depend on your data store, but the general idea looks like this
// begin from the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val results = yourCalculation(rdd)
// begin your transaction
// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly
// end your transaction
}
// The details depend on your data store, but the general idea looks like this
// begin from the offsets committed to the database
Map<TopicPartition, Long> fromOffsets = new HashMap<>();
for (resultSet : selectOffsetsFromYourDatabase)
fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
}
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
);
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
Object results = yourCalculation(rdd);
// begin your transaction
// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly
// end your transaction
});
SSL / TLS
新的 Kafka 消費者 支援 SSL。若要啟用它,請在傳遞給 createDirectStream
/ createRDD
之前,適當地設定 kafkaParams。請注意,這僅適用於 Spark 和 Kafka 代理之間的通訊;你仍有責任另外 保護 Spark 節點間的通訊。
val kafkaParams = Map[String, Object](
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
"security.protocol" -> "SSL",
"ssl.truststore.location" -> "/some-directory/kafka.client.truststore.jks",
"ssl.truststore.password" -> "test1234",
"ssl.keystore.location" -> "/some-directory/kafka.client.keystore.jks",
"ssl.keystore.password" -> "test1234",
"ssl.key.password" -> "test1234"
)
Map<String, Object> kafkaParams = new HashMap<String, Object>();
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.truststore.location", "/some-directory/kafka.client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "test1234");
kafkaParams.put("ssl.keystore.location", "/some-directory/kafka.client.keystore.jks");
kafkaParams.put("ssl.keystore.password", "test1234");
kafkaParams.put("ssl.key.password", "test1234");
部署
與任何 Spark 應用程式一樣,spark-submit
用於啟動你的應用程式。
對於 Scala 和 Java 應用程式,如果你使用 SBT 或 Maven 進行專案管理,請將 spark-streaming-kafka-0-10_2.12
套件及其相依性封裝到應用程式 JAR 中。請確定 spark-core_2.12
和 spark-streaming_2.12
標記為 provided
相依性,因為這些已經存在於 Spark 安裝中。然後使用 spark-submit
啟動你的應用程式(請參閱主要程式設計指南中的 部署區段)。
安全性
請參閱 結構化串流安全性。
其他注意事項
- Kafka 原生接收器不可用,因此委派權杖僅用於消費者端。