Spark 串流 + Kafka 整合指南(Kafka 仲介版本 0.10.0 或更高)

Kafka 0.10 的 Spark Streaming 整合提供了簡單的並行處理、Kafka 分割區與 Spark 分割區之間的 1:1 對應,以及存取偏移量和元資料。然而,由於較新的整合使用 新的 Kafka 消費者 API,而不是簡單的 API,因此在使用方式上有顯著的差異。

連結

對於使用 SBT/Maven 專案定義的 Scala/Java 應用程式,請將串流應用程式連結至下列人工製品(請參閱主要程式設計指南中的 連結區段 以取得進一步資訊)。

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.12
version = 3.5.1

請勿手動新增對 org.apache.kafka 人工製品(例如 kafka-clients)的相依性。 spark-streaming-kafka-0-10 人工製品已具備適當的傳遞相依性,而不同的版本可能以難以診斷的方式不相容。

建立直接串流

請注意,匯入的命名空間包含版本,org.apache.spark.streaming.kafka010

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream.map(record => (record.key, record.value))

串流中的每個項目都是 ConsumerRecord

import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);

Collection<String> topics = Arrays.asList("topicA", "topicB");

JavaInputDStream<ConsumerRecord<String, String>> stream =
  KafkaUtils.createDirectStream(
    streamingContext,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
  );

stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));

有關可能的 kafkaParams,請參閱 Kafka 消費者設定文件。如果您的 Spark 批次持續時間大於預設的 Kafka 心跳會話逾時(30 秒),請適當地增加 heartbeat.interval.ms 和 session.timeout.ms。對於大於 5 分鐘的批次,這將需要變更代理伺服器上的 group.max.session.timeout.ms。請注意,範例將 enable.auto.commit 設定為 false,有關討論,請參閱下方的 儲存偏移量

位置策略

新的 Kafka 消費者 API 會將訊息預先擷取到緩衝區中。因此,基於效能考量,Spark 整合在執行器上保留快取的消費者(而不是為每個批次重新建立)非常重要,並優先在具有適當消費者的主機位置上排程分割區。

在大多數情況下,您應該如上所示使用 LocationStrategies.PreferConsistent。這會將分割區平均分配到可用的執行器上。如果您的執行器與 Kafka 代理伺服器位於相同的主機上,請使用 PreferBrokers,它會優先在該分割區的 Kafka 領導者上排程分割區。最後,如果分割區之間的負載有顯著的傾斜,請使用 PreferFixed。這讓您可以指定分割區與主機的明確對應(任何未指定的分割區將使用一致的位置)。

消費者的快取預設最大大小為 64。如果您預期要處理超過(64 * 執行器數量)個 Kafka 分割區,您可以透過 spark.streaming.kafka.consumer.cache.maxCapacity 變更此設定。

如果您想停用 Kafka 消費者快取,您可以將 spark.streaming.kafka.consumer.cache.enabled 設為 false

快取以 topicpartition 和 group.id 為鍵,因此請為每次呼叫 createDirectStream 使用獨立group.id

ConsumerStrategies

新的 Kafka 消費者 API 有許多不同的方式來指定主題,其中一些需要大量的物件實例化後設定。 ConsumerStrategies 提供一個抽象,讓 Spark 能在從檢查點重新啟動後取得適當設定的消費者。

如上所示,ConsumerStrategies.Subscribe 讓您可以訂閱固定主題集合。 SubscribePattern 讓您可以使用正規表示法指定感興趣的主題。請注意,與 0.8 整合不同,使用 SubscribeSubscribePattern 應回應在執行串流期間新增分割區。最後,Assign 讓您可以指定固定分割區集合。所有三種策略都有重載建構函式,讓您可以指定特定分割區的起始偏移量。

如果您有上述選項未滿足的特定消費者設定需求,ConsumerStrategy 是您可以延伸的公開類別。

建立 RDD

如果您有更適合批次處理的用例,您可以為定義的偏移量範圍建立 RDD。

// Import dependencies and create kafka params as in Create Direct Stream above

val offsetRanges = Array(
  // topic, partition, inclusive starting offset, exclusive ending offset
  OffsetRange("test", 0, 0, 100),
  OffsetRange("test", 1, 0, 100)
)

val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
// Import dependencies and create kafka params as in Create Direct Stream above

OffsetRange[] offsetRanges = {
  // topic, partition, inclusive starting offset, exclusive ending offset
  OffsetRange.create("test", 0, 0, 100),
  OffsetRange.create("test", 1, 0, 100)
};

JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
  sparkContext,
  kafkaParams,
  offsetRanges,
  LocationStrategies.PreferConsistent()
);

請注意,您無法使用 PreferBrokers,因為沒有串流,因此沒有驅動程式端的消費者可以自動為您查詢仲介資料。如有必要,請使用 PreferFixed 和您自己的資料查詢。

取得偏移量

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd.foreachPartition { iter =>
    val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
    println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
  }
}
stream.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
  rdd.foreachPartition(consumerRecords -> {
    OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
    System.out.println(
      o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
  });
});

請注意,將型別轉換為 HasOffsetRanges 僅在 createDirectStream 結果的第一個方法中執行時才會成功,而不是在方法鏈的後面。請注意,RDD 分割區和 Kafka 分割區之間的一對一對應關係在任何洗牌或重新分割區的方法(例如 reduceByKey() 或 window())後都不會保留。

儲存偏移量

發生錯誤時 Kafka 傳遞語意取決於偏移量儲存的方式和時間。Spark 輸出作業是 至少一次。因此,如果您想要等同於完全一次的語意,您必須在冪等輸出後儲存偏移量,或在輸出時在原子交易中儲存偏移量。透過此整合,您有 3 個選項(依可靠性(和程式碼複雜度)增加)來儲存偏移量。

檢查點

如果您啟用 Spark 檢查點,偏移量將儲存在檢查點中。這很容易啟用,但有缺點。您的輸出作業必須是冪等的,因為您會得到重複的輸出;交易不是一個選項。此外,如果您的應用程式程式碼已變更,您無法從檢查點中復原。對於計畫中的升級,您可以透過同時執行新程式碼和舊程式碼來減輕此問題(由於輸出需要是冪等的,因此它們不應衝突)。但是,對於需要程式碼變更的意外失敗,除非您有其他方法來識別已知的良好起始偏移量,否則您將會遺失資料。

Kafka 本身

Kafka 有一個偏移量提交 API,用於將偏移量儲存在一個特殊的 Kafka 主題中。預設情況下,新的消費者會定期自動提交偏移量。這幾乎可以確定不是你想要的,因為消費者成功輪詢到的訊息可能尚未產生 Spark 輸出操作,導致語意未定義。這就是上述串流範例將「enable.auto.commit」設為 false 的原因。但是,你可以使用 commitAsync API,在你確定輸出已儲存後,將偏移量提交到 Kafka。與檢查點相比,優點是 Kafka 是一個耐用的儲存,不受應用程式程式碼變更的影響。但是,Kafka 不是事務性的,所以你的輸出仍必須是冪等的。

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

與 HasOffsetRanges 一樣,轉換為 CanCommitOffsets 僅在對 createDirectStream 的結果呼叫時才會成功,而不是在轉換之後。commitAsync 呼叫是執行緒安全的,但如果你想要有意義的語意,則必須在輸出後執行。

stream.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

  // some time later, after outputs have completed
  ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
});

你自己的資料儲存

對於支援事務的資料儲存,在與結果相同的交易中儲存偏移量,即使在失敗情況下,也能讓兩者保持同步。如果你小心地偵測重複或略過的偏移量範圍,則回滾交易可以防止重複或遺失的訊息影響結果。這提供了等同於一次性語意的效果。即使對於通常難以冪等的聚合所產生的輸出,也可以使用此策略。

// The details depend on your data store, but the general idea looks like this

// begin from the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
  new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  val results = yourCalculation(rdd)

  // begin your transaction

  // update results
  // update offsets where the end of existing offsets matches the beginning of this batch of offsets
  // assert that offsets were updated correctly

  // end your transaction
}
// The details depend on your data store, but the general idea looks like this

// begin from the offsets committed to the database
Map<TopicPartition, Long> fromOffsets = new HashMap<>();
for (resultSet : selectOffsetsFromYourDatabase)
  fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
}

JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
  streamingContext,
  LocationStrategies.PreferConsistent(),
  ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
);

stream.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
  
  Object results = yourCalculation(rdd);

  // begin your transaction

  // update results
  // update offsets where the end of existing offsets matches the beginning of this batch of offsets
  // assert that offsets were updated correctly

  // end your transaction
});

SSL / TLS

新的 Kafka 消費者 支援 SSL。若要啟用它,請在傳遞給 createDirectStream / createRDD 之前,適當地設定 kafkaParams。請注意,這僅適用於 Spark 和 Kafka 代理之間的通訊;你仍有責任另外 保護 Spark 節點間的通訊。

val kafkaParams = Map[String, Object](
  // the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
  "security.protocol" -> "SSL",
  "ssl.truststore.location" -> "/some-directory/kafka.client.truststore.jks",
  "ssl.truststore.password" -> "test1234",
  "ssl.keystore.location" -> "/some-directory/kafka.client.keystore.jks",
  "ssl.keystore.password" -> "test1234",
  "ssl.key.password" -> "test1234"
)
Map<String, Object> kafkaParams = new HashMap<String, Object>();
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.truststore.location", "/some-directory/kafka.client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "test1234");
kafkaParams.put("ssl.keystore.location", "/some-directory/kafka.client.keystore.jks");
kafkaParams.put("ssl.keystore.password", "test1234");
kafkaParams.put("ssl.key.password", "test1234");

部署

與任何 Spark 應用程式一樣,spark-submit 用於啟動你的應用程式。

對於 Scala 和 Java 應用程式,如果你使用 SBT 或 Maven 進行專案管理,請將 spark-streaming-kafka-0-10_2.12 套件及其相依性封裝到應用程式 JAR 中。請確定 spark-core_2.12spark-streaming_2.12 標記為 provided 相依性,因為這些已經存在於 Spark 安裝中。然後使用 spark-submit 啟動你的應用程式(請參閱主要程式設計指南中的 部署區段)。

安全性

請參閱 結構化串流安全性

其他注意事項