Spark 串流自訂接收器
Spark Streaming 可以接收來自任何任意資料來源的串流資料,而這些資料來源並非內建支援(亦即,除了 Kafka、Kinesis、檔案、socket 等)。這需要開發人員實作一個接收器,此接收器會自訂化用於接收來自相關資料來源的資料。本指南將逐步說明實作自訂接收器,並在 Spark Streaming 應用程式中使用它的流程。請注意,自訂接收器可以用 Scala 或 Java 實作。
實作自訂接收器
這從實作一個接收器開始(Scala 說明文件,Java 說明文件)。自訂接收器必須透過實作兩個方法來延伸這個抽象類別
onStart()
:開始接收資料時要執行的動作。onStop()
:停止接收資料時要執行的動作。
onStart()
和 onStop()
都不得無限期地封鎖。通常,onStart()
會啟動負責接收資料的執行緒,而 onStop()
會確保這些接收資料的執行緒會停止。接收執行緒也可以使用 isStopped()
(一個 Receiver
方法)來檢查它們是否應該停止接收資料。
一旦接收資料,就可以透過呼叫 store(data)
(這是接收器類別提供的一個方法)將資料儲存在 Spark 中。有許多種 store()
,允許使用者一次儲存一個接收到的資料記錄,或儲存為整個物件/序列化位元組的集合。請注意,用於實作接收器的 store()
類型會影響其可靠性和容錯語意。這會在 後續更詳細地討論。
接收執行緒中的任何例外都應該被捕捉並適當地處理,以避免接收器靜默失敗。restart(<exception>)
會透過非同步呼叫 onStop()
,然後在延遲後呼叫 onStart()
來重新啟動接收器。stop(<exception>)
會呼叫 onStop()
並終止接收器。此外,reportError(<error>)
會向驅動程式報告錯誤訊息(顯示在記錄和 UI 中),而不會停止/重新啟動接收器。
以下是透過 socket 接收文字串串流的自訂接收器。它將文字串串流中以「\n」分隔的列視為記錄,並將它們儲存在 Spark 中。如果接收執行緒在連線或接收時有任何錯誤,接收器會重新啟動,再嘗試連線。
class CustomReceiver(host: String, port: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
override def run() { receive() }
}.start()
}
def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself if isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private def receive() {
var socket: Socket = null
var userInput: String = null
try {
// Connect to host:port
socket = new Socket(host, port)
// Until stopped or connection broken continue reading
val reader = new BufferedReader(
new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
userInput = reader.readLine()
while(!isStopped && userInput != null) {
store(userInput)
userInput = reader.readLine()
}
reader.close()
socket.close()
// Restart in an attempt to connect again when server is active again
restart("Trying to connect again")
} catch {
case e: java.net.ConnectException =>
// restart if could not connect to server
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
// restart if there is any other error
restart("Error receiving data", t)
}
}
}
public class JavaCustomReceiver extends Receiver<String> {
String host = null;
int port = -1;
public JavaCustomReceiver(String host_ , int port_) {
super(StorageLevel.MEMORY_AND_DISK_2());
host = host_;
port = port_;
}
@Override
public void onStart() {
// Start the thread that receives data over a connection
new Thread(this::receive).start();
}
@Override
public void onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself if isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private void receive() {
Socket socket = null;
String userInput = null;
try {
// connect to the server
socket = new Socket(host, port);
BufferedReader reader = new BufferedReader(
new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
// Until stopped or connection broken continue reading
while (!isStopped() && (userInput = reader.readLine()) != null) {
System.out.println("Received data '" + userInput + "'");
store(userInput);
}
reader.close();
socket.close();
// Restart in an attempt to connect again when server is active again
restart("Trying to connect again");
} catch(ConnectException ce) {
// restart if could not connect to server
restart("Could not connect", ce);
} catch(Throwable t) {
// restart if there is any other error
restart("Error receiving data", t);
}
}
}
在 Spark Streaming 應用程式中使用自訂接收器
自訂接收器可以在 Spark Streaming 應用程式中使用 streamingContext.receiverStream(<自訂接收器實例>)
來使用。這會使用自訂接收器實例接收的資料建立一個輸入 DStream,如下所示
// Assuming ssc is the StreamingContext
val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))
val words = customReceiverStream.flatMap(_.split(" "))
...
完整的原始程式碼在範例 CustomReceiver.scala 中。
// Assuming ssc is the JavaStreamingContext
JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port));
JavaDStream<String> words = customReceiverStream.flatMap(s -> ...);
...
完整的原始碼在範例中 JavaCustomReceiver.java。
接收器可靠性
正如在 Spark 串流程式設計指南 中簡要討論的,根據其可靠性和容錯語義,有兩種接收器。
- 可靠接收器 - 對於允許確認已傳送資料的可靠來源,可靠接收器 會正確地向來源確認資料已在 Spark 中可靠地接收並儲存(也就是說,已成功複製)。通常,實作這個接收器涉及仔細考慮來源確認的語義。
- 不可靠接收器 - 不可靠接收器不會向來源傳送確認。這可以用於不支援確認的來源,甚至對於可靠來源,當你不想或不需要深入確認的複雜性時也可以使用。
若要實作可靠接收器,你必須使用 store(multiple-records)
來儲存資料。這種 store
是阻擋呼叫,只有在所有給定的記錄都儲存在 Spark 內部後才會傳回。如果接收器設定的儲存層級使用複製(預設啟用),則此呼叫會在複製完成後傳回。因此,它確保資料已可靠地儲存,而接收器現在可以適當地確認來源。這可確保當接收器在複製資料過程中發生故障時,不會遺失任何資料 - 緩衝的資料不會被確認,因此稍後會由來源重新傳送。
不可靠接收器 不必實作任何這個邏輯。它可以從來源接收記錄,並使用 store(single-record)
一次插入一個記錄。雖然它沒有獲得 store(multiple-records)
的可靠性保證,但它有以下優點
- 系統負責將資料分塊成適當大小的區塊(在 Spark 串流程式設計指南 中尋找區塊間隔)。
- 如果已指定速率限制,系統負責控制接收速率。
- 由於這兩個原因,不可靠接收器比可靠接收器更容易實作。
下表總結了兩種接收器類型的特性
接收器類型 | 特性 |
---|---|
不可靠接收器 |
易於實作。 系統負責區塊產生和速率控制。沒有容錯保證,接收器故障時可能會丟失資料。 |
可靠接收器 |
強大的容錯保證,可以確保零資料遺失。 區塊產生和速率控制由接收器實作處理。 實作複雜度取決於來源的確認機制。 |