Spark 串流自訂接收器

Spark Streaming 可以接收來自任何任意資料來源的串流資料,而這些資料來源並非內建支援(亦即,除了 Kafka、Kinesis、檔案、socket 等)。這需要開發人員實作一個接收器,此接收器會自訂化用於接收來自相關資料來源的資料。本指南將逐步說明實作自訂接收器,並在 Spark Streaming 應用程式中使用它的流程。請注意,自訂接收器可以用 Scala 或 Java 實作。

實作自訂接收器

這從實作一個接收器開始(Scala 說明文件Java 說明文件)。自訂接收器必須透過實作兩個方法來延伸這個抽象類別

onStart()onStop() 都不得無限期地封鎖。通常,onStart() 會啟動負責接收資料的執行緒,而 onStop() 會確保這些接收資料的執行緒會停止。接收執行緒也可以使用 isStopped()(一個 Receiver 方法)來檢查它們是否應該停止接收資料。

一旦接收資料,就可以透過呼叫 store(data)(這是接收器類別提供的一個方法)將資料儲存在 Spark 中。有許多種 store(),允許使用者一次儲存一個接收到的資料記錄,或儲存為整個物件/序列化位元組的集合。請注意,用於實作接收器的 store() 類型會影響其可靠性和容錯語意。這會在 後續更詳細地討論。

接收執行緒中的任何例外都應該被捕捉並適當地處理,以避免接收器靜默失敗。restart(<exception>) 會透過非同步呼叫 onStop(),然後在延遲後呼叫 onStart() 來重新啟動接收器。stop(<exception>) 會呼叫 onStop() 並終止接收器。此外,reportError(<error>) 會向驅動程式報告錯誤訊息(顯示在記錄和 UI 中),而不會停止/重新啟動接收器。

以下是透過 socket 接收文字串串流的自訂接收器。它將文字串串流中以「\n」分隔的列視為記錄,並將它們儲存在 Spark 中。如果接收執行緒在連線或接收時有任何錯誤,接收器會重新啟動,再嘗試連線。

class CustomReceiver(host: String, port: Int)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself if isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private def receive() {
    var socket: Socket = null
    var userInput: String = null
    try {
      // Connect to host:port
      socket = new Socket(host, port)

      // Until stopped or connection broken continue reading
      val reader = new BufferedReader(
        new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
      userInput = reader.readLine()
      while(!isStopped && userInput != null) {
        store(userInput)
        userInput = reader.readLine()
      }
      reader.close()
      socket.close()

      // Restart in an attempt to connect again when server is active again
      restart("Trying to connect again")
    } catch {
      case e: java.net.ConnectException =>
        // restart if could not connect to server
        restart("Error connecting to " + host + ":" + port, e)
      case t: Throwable =>
        // restart if there is any other error
        restart("Error receiving data", t)
    }
  }
}
public class JavaCustomReceiver extends Receiver<String> {

  String host = null;
  int port = -1;

  public JavaCustomReceiver(String host_ , int port_) {
    super(StorageLevel.MEMORY_AND_DISK_2());
    host = host_;
    port = port_;
  }

  @Override
  public void onStart() {
    // Start the thread that receives data over a connection
    new Thread(this::receive).start();
  }

  @Override
  public void onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself if isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private void receive() {
    Socket socket = null;
    String userInput = null;

    try {
      // connect to the server
      socket = new Socket(host, port);

      BufferedReader reader = new BufferedReader(
        new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));

      // Until stopped or connection broken continue reading
      while (!isStopped() && (userInput = reader.readLine()) != null) {
        System.out.println("Received data '" + userInput + "'");
        store(userInput);
      }
      reader.close();
      socket.close();

      // Restart in an attempt to connect again when server is active again
      restart("Trying to connect again");
    } catch(ConnectException ce) {
      // restart if could not connect to server
      restart("Could not connect", ce);
    } catch(Throwable t) {
      // restart if there is any other error
      restart("Error receiving data", t);
    }
  }
}

在 Spark Streaming 應用程式中使用自訂接收器

自訂接收器可以在 Spark Streaming 應用程式中使用 streamingContext.receiverStream(<自訂接收器實例>) 來使用。這會使用自訂接收器實例接收的資料建立一個輸入 DStream,如下所示

// Assuming ssc is the StreamingContext
val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))
val words = customReceiverStream.flatMap(_.split(" "))
...

完整的原始程式碼在範例 CustomReceiver.scala 中。

// Assuming ssc is the JavaStreamingContext
JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port));
JavaDStream<String> words = customReceiverStream.flatMap(s -> ...);
...

完整的原始碼在範例中 JavaCustomReceiver.java

接收器可靠性

正如在 Spark 串流程式設計指南 中簡要討論的,根據其可靠性和容錯語義,有兩種接收器。

  1. 可靠接收器 - 對於允許確認已傳送資料的可靠來源可靠接收器 會正確地向來源確認資料已在 Spark 中可靠地接收並儲存(也就是說,已成功複製)。通常,實作這個接收器涉及仔細考慮來源確認的語義。
  2. 不可靠接收器 - 不可靠接收器不會向來源傳送確認。這可以用於不支援確認的來源,甚至對於可靠來源,當你不想或不需要深入確認的複雜性時也可以使用。

若要實作可靠接收器,你必須使用 store(multiple-records) 來儲存資料。這種 store 是阻擋呼叫,只有在所有給定的記錄都儲存在 Spark 內部後才會傳回。如果接收器設定的儲存層級使用複製(預設啟用),則此呼叫會在複製完成後傳回。因此,它確保資料已可靠地儲存,而接收器現在可以適當地確認來源。這可確保當接收器在複製資料過程中發生故障時,不會遺失任何資料 - 緩衝的資料不會被確認,因此稍後會由來源重新傳送。

不可靠接收器 不必實作任何這個邏輯。它可以從來源接收記錄,並使用 store(single-record) 一次插入一個記錄。雖然它沒有獲得 store(multiple-records) 的可靠性保證,但它有以下優點

下表總結了兩種接收器類型的特性

接收器類型 特性
不可靠接收器 易於實作。
系統負責區塊產生和速率控制。沒有容錯保證,接收器故障時可能會丟失資料。
可靠接收器 強大的容錯保證,可以確保零資料遺失。
區塊產生和速率控制由接收器實作處理。
實作複雜度取決於來源的確認機制。