GraphX 程式設計指南
概觀
GraphX 是 Spark 中的一個新元件,用於圖形和圖形平行運算。在高層級中,GraphX 透過引入新的 Graph 抽象來延伸 Spark RDD:一個有向多重圖,其中每個頂點和邊緣都附加了屬性。為了支援圖形運算,GraphX 公開了一組基本運算子(例如 subgraph、joinVertices 和 aggregateMessages),以及 Pregel API 的最佳化變體。此外,GraphX 包含越來越多的圖形 演算法 和 建構函數,以簡化圖形分析任務。
入門
要開始使用,您首先需要將 Spark 和 GraphX 匯入您的專案,如下所示
import org.apache.spark._
import org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
import org.apache.spark.rdd.RDD
如果您沒有使用 Spark shell,您還需要一個 SparkContext
。若要深入了解如何開始使用 Spark,請參閱 Spark 快速入門指南。
屬性圖
屬性圖形 是有向多重圖,其中每個頂點和邊緣都附加了使用者定義的物件。有向多重圖是有向圖,其中可能有許多平行邊緣共用相同的來源和目標頂點。支援平行邊緣的能力簡化了建模場景,在這種場景中,同一個頂點之間可以有多重關係(例如同事和朋友)。每個頂點都以唯一的 64 位元長識別碼 (VertexId
) 為鍵。GraphX 沒有對頂點識別碼施加任何排序限制。類似地,邊緣有對應的來源和目標頂點識別碼。
屬性圖形針對頂點 (VD
) 和邊緣 (ED
) 類型進行參數化。這些分別是與每個頂點和邊緣關聯的物件類型。
當頂點和邊緣類型是原始資料類型(例如 int、double 等)時,GraphX 會最佳化其表示,透過將它們儲存在專用陣列中來減少記憶體使用量。
在某些情況下,可能需要在同一個圖形中使用具有不同屬性類型的頂點。這可以透過繼承來完成。例如,若要將使用者和產品建模為二部圖,我們可以執行下列操作
class VertexProperty()
case class UserProperty(val name: String) extends VertexProperty
case class ProductProperty(val name: String, val price: Double) extends VertexProperty
// The graph might then have the type:
var graph: Graph[VertexProperty, String] = null
與 RDD 類似,屬性圖形是不可變的、分散式的和容錯的。透過產生具有所需變更的新圖形,可以變更圖形的數值或結構。請注意,原始圖形的大部分(即不受影響的結構、屬性和索引)會在新的圖形中重複使用,以降低此內在函數資料結構的成本。圖形會使用範圍的頂點分割啟發法在執行器之間進行分割。與 RDD 一樣,圖形的每個分割都可以在發生故障時在不同的機器上重新建立。
在邏輯上,屬性圖形對應到一對編碼每個頂點和邊緣屬性的類型化集合(RDD)。因此,圖形類別包含成員來存取圖形的頂點和邊緣
class Graph[VD, ED] {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
}
類別 VertexRDD[VD]
和 EdgeRDD[ED]
分別延伸和最佳化了 RDD[(VertexId, VD)]
和 RDD[Edge[ED]]
。 VertexRDD[VD]
和 EdgeRDD[ED]
都提供建立在圖形運算周圍的額外功能,並利用內部最佳化。我們在 頂點和邊緣 RDD 部分中更詳細地討論 VertexRDD
VertexRDD 和 EdgeRDD
EdgeRDD API,但目前可以將它們視為單純的形式為 RDD[(VertexId, VD)]
和 RDD[Edge[ED]]
的 RDD。
屬性圖範例
假設我們想要建立一個由 GraphX 專案的各種協作者組成的屬性圖形。頂點屬性可能包含使用者名稱和職業。我們可以用字串註解邊緣,描述協作者之間的關係。
產生的圖形會有類型簽章
val userGraph: Graph[(String, String), String]
有許多方法可以從原始檔案、RDD 甚至合成產生器建立屬性圖形,這些方法在 圖形建立器 部分中會更詳細地討論。最通用的方法可能是使用 Graph 物件。例如,以下程式碼從 RDD 集合建立一個圖形
// Assume the SparkContext has already been constructed
val sc: SparkContext
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
在上面的範例中,我們使用 Edge
案例類別。邊緣有 srcId
和 dstId
,對應到來源和目的地頂點識別碼。此外,Edge
類別有一個 attr
成員,用來儲存邊緣屬性。
我們可以使用 graph.vertices
和 graph.edges
成員分別將圖形解構為各自的頂點和邊緣檢視。
val graph: Graph[(String, String), String] // Constructed from above
// Count all users which are postdocs
graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
// Count all the edges where src > dst
graph.edges.filter(e => e.srcId > e.dstId).count
請注意
graph.vertices
會傳回VertexRDD[(String, String)]
,它會延伸RDD[(VertexId, (String, String))]
,因此我們使用 Scalacase
表達式來解構元組。另一方面,graph.edges
會傳回EdgeRDD
,其中包含Edge[String]
物件。我們也可以使用案例類別類型建構函數,如下所示
graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count
除了屬性圖表的頂點和邊緣檢視之外,GraphX 也會公開一個三元組檢視。三元組檢視會在邏輯上結合頂點和邊緣屬性,產生 RDD[EdgeTriplet[VD, ED]]
,其中包含 EdgeTriplet
類別的執行個體。這個「結合」可以用下列 SQL 表達式表示
SELECT src.id, dst.id, src.attr, e.attr, dst.attr
FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst
ON e.srcId = src.Id AND e.dstId = dst.Id
或以圖形表示為
EdgeTriplet
類別會透過新增 srcAttr
和 dstAttr
成員來延伸 Edge
類別,這些成員分別包含來源和目的地屬性。我們可以使用圖表的「三元組檢視」來呈現描述使用者之間關係的字串集合。
val graph: Graph[(String, String), String] // Constructed from above
// Use the triplets view to create an RDD of facts.
val facts: RDD[String] =
graph.triplets.map(triplet =>
triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)
facts.collect.foreach(println(_))
圖形運算子
就像 RDD 具有 map
、filter
和 reduceByKey
等基本操作,屬性圖表也有一組基本運算子,它們會採用使用者定義的函數,並產生具有轉換屬性和結構的新圖表。在 Graph
中定義了具有最佳化實作的核心運算子,並在 GraphOps
中定義了方便的運算子,它們表示為核心運算子的組合。不過,由於有 Scala 隱含式,GraphOps
中的運算子會自動成為 Graph
的成員。例如,我們可以透過下列方式計算每個頂點的入度(在 GraphOps
中定義)
val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
val inDegrees: VertexRDD[Int] = graph.inDegrees
區分核心圖形操作和 GraphOps
的原因是為了能夠在未來支援不同的圖形表示。每個圖形表示都必須提供核心操作的實作,並重複使用 GraphOps
中定義的許多有用操作。
運算子摘要清單
以下是 Graph
和 GraphOps
中定義的功能的快速摘要,但為了簡潔起見,它們被表示為 Graph 的成員。請注意,某些函數簽章已簡化(例如,預設參數和類型約束已移除),並且某些更進階的功能已移除,因此請參閱 API 文件以取得官方操作清單。
/** Summary of the functionality in the property graph */
class Graph[VD, ED] {
// Information about the Graph ===================================================================
val numEdges: Long
val numVertices: Long
val inDegrees: VertexRDD[Int]
val outDegrees: VertexRDD[Int]
val degrees: VertexRDD[Int]
// Views of the graph as collections =============================================================
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
val triplets: RDD[EdgeTriplet[VD, ED]]
// Functions for caching graphs ==================================================================
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
def cache(): Graph[VD, ED]
def unpersistVertices(blocking: Boolean = false): Graph[VD, ED]
// Change the partitioning heuristic ============================================================
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
// Transform vertex and edge attributes ==========================================================
def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
: Graph[VD, ED2]
// Modify the graph structure ====================================================================
def reverse: Graph[VD, ED]
def subgraph(
epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
vpred: (VertexId, VD) => Boolean = ((v, d) => true))
: Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
// Join RDDs with the graph ======================================================================
def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])
(mapFunc: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
// Aggregate information about adjacent triplets =================================================
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
def aggregateMessages[Msg: ClassTag](
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[A]
// Iterative graph-parallel computation ==========================================================
def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED]
// Basic graph algorithms ========================================================================
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
def connectedComponents(): Graph[VertexId, ED]
def triangleCount(): Graph[Int, ED]
def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
}
屬性運算子
與 RDD map
算子一樣,屬性圖包含下列內容
class Graph[VD, ED] {
def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}
這些算子各會產生一個新的圖形,其頂點或邊緣屬性已由使用者定義的 map
函數修改。
請注意,在每種情況下,圖形結構都不受影響。這是這些算子的關鍵功能,它允許結果圖形重複使用原始圖形的結構索引。下列程式碼片段在邏輯上是等效的,但第一個不會保留結構索引,也無法從 GraphX 系統最佳化中受益
val newVertices = graph.vertices.map { case (id, attr) => (id, mapUdf(id, attr)) }
val newGraph = Graph(newVertices, graph.edges)
請改用
mapVertices
來保留索引
val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))
這些算子通常用於初始化圖形以進行特定運算或將不必要的屬性投影出去。例如,假設有一個圖形,其外圍度是頂點屬性(我們稍後會說明如何建構此類圖形),我們會將其初始化為 PageRank
// Given a graph where the vertex property is the out degree
val inputGraph: Graph[Int, String] =
graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0))
// Construct a graph where each edge contains the weight
// and each vertex is the initial PageRank
val outputGraph: Graph[Double, Double] =
inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)
結構運算子
目前 GraphX 僅支援一組常用的簡單結構算子,我們預計未來會新增更多算子。以下是基本結構算子的清單。
class Graph[VD, ED] {
def reverse: Graph[VD, ED]
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}
reverse
算子會傳回一個新圖形,其中所有邊緣方向都反轉。這在例如嘗試運算反向 PageRank 時會很有用。由於反轉操作不會修改頂點或邊緣屬性,也不會變更邊緣數量,因此可以在沒有資料移動或複製的情況下有效率地實作。
subgraph
算子會取得頂點和邊緣謂詞,並傳回僅包含滿足頂點謂詞(評估為 true)的頂點和滿足邊緣謂詞並連接滿足頂點謂詞的頂點的圖形。subgraph
算子可用於許多情況,例如將圖形限制為感興趣的頂點和邊緣,或消除損毀的連結。例如,在下列程式碼中,我們會移除損毀的連結
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
(4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users
// 4 (peter) and 5 (franklin).
graph.triplets.map(
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// The valid subgraph will disconnect users 4 and 5 by removing user 0
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
請注意,在上述範例中,僅提供頂點謂詞。如果未提供頂點或邊緣謂詞,
subgraph
算子會預設為true
。
mask
算子會透過傳回包含也在輸入圖形中找到的頂點和邊緣的圖形來建構子圖形。這可用於搭配 subgraph
算子,根據另一個相關圖形中的屬性來限制圖形。例如,我們可以使用包含遺失頂點的圖形來執行連接元件,然後將答案限制為有效的子圖形。
// Run Connected Components
val ccGraph = graph.connectedComponents() // No longer contains missing field
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// Restrict the answer to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)
運算子 groupEdges
會合併多重圖中的平行邊緣(亦即,頂點對之間的重複邊緣)。在許多數值應用中,平行邊緣可以「加總」(其權重合併)成單一邊緣,進而減少圖形的大小。
聯結運算子
在許多情況下,有必要將外部集合(RDD)中的資料與圖形結合。例如,我們可能有一些額外的使用者屬性,想要將其與現有圖形合併,或者我們可能想要將頂點屬性從一個圖形拉入另一個圖形。這些任務可以使用「結合」運算子來完成。以下是我們列出的主要結合運算子
class Graph[VD, ED] {
def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
: Graph[VD, ED]
def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
}
運算子 joinVertices
會將頂點與輸入 RDD 結合,並傳回一個新的圖形,其頂點屬性是透過將使用者定義的 map
函式套用至結合頂點的結果而取得。在 RDD 中沒有匹配值的頂點會保留其原始值。
請注意,如果 RDD 包含給定頂點的兩個以上值,只會使用一個值。因此,建議使用下列方式將輸入 RDD 設定為唯一值,這也會「預先索引」結果值,以大幅加速後續結合。
val nonUniqueCosts: RDD[(VertexId, Double)]
val uniqueCosts: VertexRDD[Double] =
graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b)
val joinedGraph = graph.joinVertices(uniqueCosts)(
(id, oldCost, extraCost) => oldCost + extraCost)
更通用的 outerJoinVertices
的行為類似於 joinVertices
,但使用者定義的 map
函式會套用至所有頂點,而且可以變更頂點屬性類型。由於並非所有頂點在輸入 RDD 中都可能有匹配值,因此 map
函式會採用 Option
類型。例如,我們可以透過使用其 outDegree
初始化頂點屬性,為 PageRank 設定圖形。
val outDegrees: VertexRDD[Int] = graph.outDegrees
val degreeGraph = graph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt) =>
outDegOpt match {
case Some(outDeg) => outDeg
case None => 0 // No outDegree means zero outDegree
}
}
您可能已注意到在上述範例中使用了多個參數清單(例如,
f(a)(b)
)的函式化函式模式。雖然我們可以將f(a)(b)
等同寫成f(a,b)
,這表示b
的類型推論不會依賴於a
。因此,使用者需要提供使用者定義函式的類型註解
val joinedGraph = graph.joinVertices(uniqueCosts,
(id: VertexId, oldCost: Double, extraCost: Double) => oldCost + extraCost)
鄰域聚合
在許多圖形分析任務中,一個關鍵步驟是彙總每個頂點鄰域的資訊。例如,我們可能想要知道每個使用者有多少追蹤者,或每個使用者的追蹤者的平均年齡。許多反覆式圖形演算法(例如,PageRank、最短路徑和連通元件)會重複彙總鄰近頂點的屬性(例如,目前的 PageRank 值、至來源的最短路徑,以及最小的可到達頂點 ID)。
為改善效能,主要聚合運算子已從
graph.mapReduceTriplets
變更為新的graph.AggregateMessages
。儘管 API 中的變更相對較小,但我們在下方提供轉換指南。
聚合訊息 (aggregateMessages)
GraphX 中的核心聚合運算為 aggregateMessages
。此運算子會將使用者定義的 sendMsg
函式套用至圖形中的每個「邊緣三元組」,然後使用 mergeMsg
函式在目的頂點處聚合這些訊息。
class Graph[VD, ED] {
def aggregateMessages[Msg: ClassTag](
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[Msg]
}
使用者定義的 sendMsg
函式會採用 EdgeContext
,公開來源和目的屬性以及邊緣屬性和函式 (sendToSrc
和 sendToDst
),以將訊息傳送至來源和目的屬性。將 sendMsg
視為 map-reduce 中的「map」函式。使用者定義的 mergeMsg
函式會採用傳送至同一個頂點的兩個訊息,並產生單一訊息。將 mergeMsg
視為 map-reduce 中的「reduce」函式。 aggregateMessages
運算子會傳回 VertexRDD[Msg]
,其中包含傳送至每個頂點的聚合訊息 (類型為 Msg
)。未收到訊息的頂點不會包含在傳回的 VertexRDD
VertexRDD 中。
此外,aggregateMessages
會採用一個選用的 tripletsFields
,用以指出在 EdgeContext
中存取哪些資料(例如,來源頂點屬性,而非目的地頂點屬性)。tripletsFields
的可能選項已在 TripletFields
中定義,而預設值為 TripletFields.All
,表示使用者定義的 sendMsg
函式可以存取 EdgeContext
中的任何欄位。tripletsFields
參數可用於通知 GraphX,僅需要 EdgeContext
的一部分,讓 GraphX 能夠選取最佳化的聯合策略。例如,如果我們要計算每個使用者的追蹤者平均年齡,我們只需要來源欄位,因此我們會使用 TripletFields.Src
來表示我們只需要來源欄位
在 GraphX 的早期版本中,我們使用位元組碼檢查來推斷
TripletFields
,但我們發現位元組碼檢查有點不可靠,因此改為選擇更明確的使用者控制。
在以下範例中,我們使用 aggregateMessages
算子來計算每個使用者的資深追蹤者平均年齡。
import org.apache.spark.graphx.{Graph, VertexRDD}
import org.apache.spark.graphx.util.GraphGenerators
// Create a graph with "age" as the vertex property.
// Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
triplet => { // Map Function
if (triplet.srcAttr > triplet.dstAttr) {
// Send message to destination vertex containing counter and age
triplet.sendToDst((1, triplet.srcAttr))
}
},
// Add counter and age
(a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
// Divide total age by number of older followers to get average age of older followers
val avgAgeOfOlderFollowers: VertexRDD[Double] =
olderFollowers.mapValues( (id, value) =>
value match { case (count, totalAge) => totalAge / count } )
// Display the results
avgAgeOfOlderFollowers.collect.foreach(println(_))
當訊息(和訊息總和)是固定大小時(例如,浮點數和加法,而非清單和串接),
aggregateMessages
作業會執行最佳化。
Map Reduce 三元組轉換指南(舊版)
在 GraphX 的早期版本中,鄰近聚合是使用 mapReduceTriplets
算子來完成的
class Graph[VD, ED] {
def mapReduceTriplets[Msg](
map: EdgeTriplet[VD, ED] => Iterator[(VertexId, Msg)],
reduce: (Msg, Msg) => Msg)
: VertexRDD[Msg]
}
mapReduceTriplets
算子會採用使用者定義的對應函式,套用於每個三元組,並能產生使用使用者定義的 reduce
函式聚合的訊息。然而,我們發現返回的迭代器使用起來很昂貴,而且阻礙了我們套用其他最佳化(例如,區域頂點重新編號)。在 aggregateMessages
中,我們引入了 EdgeContext,用以公開三元組欄位,並函式化明確傳送訊息至來源和目的地頂點。此外,我們移除了位元組碼檢查,改為要求使用者指出三元組中哪些欄位是實際需要的。
使用 mapReduceTriplets
的下列程式碼區塊
val graph: Graph[Int, Float] = ...
def msgFun(triplet: Triplet[Int, Float]): Iterator[(Int, String)] = {
Iterator((triplet.dstId, "Hi"))
}
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.mapReduceTriplets[String](msgFun, reduceFun)
可以使用 aggregateMessages
改寫成
val graph: Graph[Int, Float] = ...
def msgFun(triplet: EdgeContext[Int, Float, String]) {
triplet.sendToDst("Hi")
}
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.aggregateMessages[String](msgFun, reduceFun)
計算度數資訊
常見的聚合任務是計算每個頂點的度數:與每個頂點相鄰的邊數。在有向圖的脈絡中,通常需要知道每個頂點的入度、出度和總度數。 GraphOps
類別包含一組運算子,用於計算每個頂點的度數。例如,在以下範例中,我們計算最大入度、出度和總度數
// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 > b._2) a else b
}
// Compute the max degrees
val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)
收集鄰居
在某些情況下,透過收集每個頂點的鄰近頂點及其屬性,可能會更容易表達計算。這可以使用 collectNeighborIds
和 collectNeighbors
運算子輕鬆完成。
class GraphOps[VD, ED] {
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]
}
這些運算子可能會相當昂貴,因為它們會重複資訊,而且需要大量通訊。如果可能,請嘗試使用
aggregateMessages
運算子直接表達相同的計算。
快取和取消快取
在 Spark 中,預設情況下 RDD 沒有持續儲存在記憶體中。為避免重新計算,在多次使用 RDD 時必須明確快取它們(請參閱 Spark 程式設計指南)。GraphX 中的圖形行為方式相同。在多次使用圖形時,請務必先呼叫 Graph.cache()
。
在反覆運算中,取消快取也可能是最佳效能所必需的。預設情況下,快取的 RDD 和圖形會保留在記憶體中,直到記憶體壓力迫使它們按 LRU 順序被驅逐為止。對於反覆運算,前一次反覆運算的中間結果會填滿快取。儘管它們最終會被驅逐,但儲存在記憶體中的不必要資料會減慢垃圾回收速度。在不再需要中間結果時,取消快取會更有效率。這涉及在每次反覆運算中實體化(快取和強制)圖形或 RDD,取消快取所有其他資料集,並僅在後續反覆運算中使用實體化資料集。但是,由於圖形由多個 RDD 組成,因此正確取消它們的持久性可能會很困難。對於反覆運算,我們建議使用 Pregel API,它可以正確取消中間結果的持久性。
Pregel API
圖形本質上是遞迴資料結構,因為頂點的屬性取決於其鄰居的屬性,而鄰居的屬性又取決於其鄰居的屬性。因此,許多重要的圖形演算法會反覆運算每個頂點的屬性,直到達到固定點條件為止。已經提出了一系列圖形平行抽象來表達這些反覆運算。GraphX 公開 Pregel API 的變體。
在高層級,GraphX 中的 Pregel 算子是一種大規模同步平行訊息傳遞抽象,受限於圖形的拓撲結構。Pregel 算子以一系列超級步驟執行,其中頂點接收來自前一超級步驟的入站訊息的總和,為頂點屬性計算一個新值,然後在下一超級步驟中將訊息傳送給鄰近頂點。與 Pregel 不同,訊息會平行計算為邊三元組的函數,而訊息運算可以存取來源和目的地頂點屬性。在超級步驟中,不會收到訊息的頂點會被略過。當沒有訊息剩餘時,Pregel 算子會終止反覆運算並傳回最終圖形。
請注意,與更標準的 Pregel 實作不同,GraphX 中的頂點只能將訊息傳送給鄰近頂點,而訊息建構會使用使用者定義的訊息傳送函數平行執行。這些限制允許在 GraphX 中進行額外的最佳化。
以下是 Pregel 算子 的型別簽章,以及其實作的草圖(注意:為了避免因長譜系鏈而導致 stackOverflowError,pregel 支援定期檢查點圖形和訊息,方法是將 “spark.graphx.pregel.checkpointInterval” 設定為正數,例如 10。並使用 SparkContext.setCheckpointDir(directory: String) 設定檢查點目錄)
class GraphOps[VD, ED] {
def pregel[A]
(initialMsg: A,
maxIter: Int = Int.MaxValue,
activeDir: EdgeDirection = EdgeDirection.Out)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
// Receive the initial message at each vertex
var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
// compute the messages
var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
var activeMessages = messages.count()
// Loop until no messages remain or maxIterations is achieved
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// Receive the messages and update the vertices.
g = g.joinVertices(messages)(vprog).cache()
val oldMessages = messages
// Send new messages, skipping edges where neither side received a message. We must cache
// messages so it can be materialized on the next line, allowing us to uncache the previous
// iteration.
messages = GraphXUtils.mapReduceTriplets(
g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
activeMessages = messages.count()
i += 1
}
g
}
}
請注意,Pregel 會採用兩個引數清單(即 graph.pregel(list1)(list2)
)。第一個引數清單包含組態參數,包括初始訊息、最大迭代次數,以及傳送訊息的邊緣方向(預設為沿著外緣)。第二個引數清單包含用於接收訊息(頂點程式 vprog
)、計算訊息 (sendMsg
) 和合併訊息 mergeMsg
) 的使用者定義函數。
我們可以使用 Pregel 算子來表達計算,例如以下範例中的單一來源最短路徑。
import org.apache.spark.graphx.{Graph, VertexId}
import org.apache.spark.graphx.util.GraphGenerators
// A graph with edge attributes containing distances
val graph: Graph[Long, Double] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) =>
if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
triplet => { // Send Message
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a, b) => math.min(a, b) // Merge Message
)
println(sssp.vertices.collect.mkString("\n"))
圖形建構函數
GraphX 提供多種方法,可從 RDD 或磁碟中的頂點和邊緣集合建立圖形。預設情況下,沒有任何圖形建構器會重新分割圖形的邊緣;而是將邊緣保留在預設分割區(例如 HDFS 中的原始區塊)。Graph.groupEdges
需要重新分割圖形,因為它假設相同的邊緣會並置在同一個分割區,因此您必須在呼叫 groupEdges
之前呼叫 Graph.partitionBy
。
object GraphLoader {
def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
minEdgePartitions: Int = 1)
: Graph[Int, Int]
}
GraphLoader.edgeListFile
提供一種方法,可從磁碟上的邊緣清單載入圖形。它會剖析下列形式的(來源頂點 ID、目標頂點 ID)配對的鄰接清單,略過以 #
開頭的註解行
# This is a comment
2 1
4 1
1 2
它從指定的邊緣建立一個 圖
,自動建立邊緣提到的任何頂點。所有頂點和邊緣屬性預設為 1。canonicalOrientation
參數允許將邊緣重新定向到正向 (srcId < dstId
),這是 連接元件 演算法所需要的。minEdgePartitions
參數指定要產生的邊緣分區的最小數量;如果例如 HDFS 檔案有更多區塊,可能會產生比指定的更多邊緣分區。
object Graph {
def apply[VD, ED](
vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD = null)
: Graph[VD, ED]
def fromEdges[VD, ED](
edges: RDD[Edge[ED]],
defaultValue: VD): Graph[VD, ED]
def fromEdgeTuples[VD](
rawEdges: RDD[(VertexId, VertexId)],
defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int]
}
Graph.apply
允許從頂點和邊緣的 RDD 建立圖形。重複的頂點會任意挑選,而邊緣 RDD 中找到但頂點 RDD 中找不到的頂點會指定預設屬性。
Graph.fromEdges
允許僅從邊緣的 RDD 建立圖形,自動建立邊緣提到的任何頂點,並指定它們預設值。
Graph.fromEdgeTuples
允許僅從邊緣元組的 RDD 建立圖形,指定邊緣值為 1,並自動建立邊緣提到的任何頂點,並指定它們預設值。它也支援重複邊緣的去重;若要進行去重,請傳遞 Some
的 PartitionStrategy
作為 uniqueEdges
參數(例如,uniqueEdges = Some(PartitionStrategy.RandomVertexCut)
)。分割策略對於將相同的邊緣並置在同一個分區上是必要的,這樣才能進行去重。
頂點和邊緣 RDD
GraphX 公開儲存在圖形中的頂點和邊緣的 RDD
檢視。然而,由於 GraphX 在最佳化的資料結構中維護頂點和邊緣,而這些資料結構提供額外的功能,因此頂點和邊緣會分別傳回為 VertexRDD
VertexRDD 和 EdgeRDD
EdgeRDD。在本節中,我們將檢閱這些類型中一些額外的有用功能。請注意,這只是一個不完整的清單,請參閱 API 文件以取得官方操作清單。
VertexRDD
VertexRDD[A]
擴充了 RDD[(VertexId, A)]
,並增加了每個 VertexId
僅出現 一次 的額外限制。此外,VertexRDD[A]
代表一組頂點,每個頂點都具有類型為 A
的屬性。在內部,這會透過將頂點屬性儲存在可重複使用的雜湊映射資料結構中來達成。因此,如果兩個 VertexRDD
是從相同的基礎 VertexRDD
VertexRDD(例如,透過 filter
或 mapValues
)衍生的,則可以在沒有雜湊評估的情況下以恆定時間將它們結合。為了利用這個索引資料結構,VertexRDD
VertexRDD 揭露了以下其他功能
class VertexRDD[VD] extends RDD[(VertexId, VD)] {
// Filter the vertex set but preserves the internal index
def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
// Transform the values without changing the ids (preserves the internal index)
def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
// Show only vertices unique to this set based on their VertexId's
def minus(other: RDD[(VertexId, VD)])
// Remove vertices from this set that appear in the other set
def diff(other: VertexRDD[VD]): VertexRDD[VD]
// Join operators that take advantage of the internal indexing to accelerate joins (substantially)
def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
}
例如,請注意 filter
算子如何傳回 VertexRDD
VertexRDD。Filter 實際上是使用 BitSet
實作的,因此可以重複使用索引並保留與其他 VertexRDD
進行快速結合的能力。同樣地,mapValues
算子不允許 map
函式變更 VertexId
,因此可以重複使用相同的 HashMap
資料結構。leftJoin
和 innerJoin
都可以在結合從相同的 HashMap
衍生的兩個 VertexRDD
時進行辨識,並透過線性掃描來實作結合,而不是昂貴的點查詢。
aggregateUsingIndex
算子對於從 RDD[(VertexId, A)]
有效地建構新的 VertexRDD
VertexRDD 很有用。在概念上,如果我在一組頂點上建構了 VertexRDD[B]
,這是某些 RDD[(VertexId, A)]
中頂點的超集,則我可以重複使用索引來聚合,然後再索引 RDD[(VertexId, A)]
。例如
val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
// There should be 200 entries in rddB
rddB.count
val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
// There should be 100 entries in setB
setB.count
// Joining A and B should now be fast!
val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
EdgeRDD
EdgeRDD[ED]
擴充了 RDD[Edge[ED]]
,它會使用在 PartitionStrategy
中定義的各種分割策略之一,將邊緣組織成區塊進行分割。在每個分割中,邊緣屬性和鄰接結構會分開儲存,以便在變更屬性值時能達到最大的重複使用。
由 EdgeRDD
EdgeRDD 揭露的另外三項功能為
// Transform the edge attributes while preserving the structure
def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
// Reverse the edges reusing both attributes and structure
def reverse: EdgeRDD[ED]
// Join two `EdgeRDD`s partitioned using the same partitioning strategy.
def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
在大部分應用程式中,我們發現 EdgeRDD
EdgeRDD 上的操作是透過圖形運算子完成,或依賴於基本 RDD
類別中定義的操作。
最佳化表示
雖然本指南的範圍不包括對 GraphX 分散式圖形表示法中所使用最佳化的詳細說明,但一些高層級的了解可能有助於設計可擴充演算法,以及最佳化使用 API。GraphX 採用頂點切分方法來進行分散式圖形分割
GraphX 不是沿著邊緣分割圖形,而是沿著頂點分割圖形,這可以減少通訊和儲存的負擔。在邏輯上,這相當於將邊緣指定給機器,並允許頂點橫跨多個機器。指定邊緣的確切方法取決於 PartitionStrategy
,而且各種啟發法之間有許多取捨。使用者可以透過使用 Graph.partitionBy
運算子,以重新分割圖形來選擇不同的策略。預設的分割策略是使用在圖形建構時提供的邊緣初始分割。不過,使用者可以輕鬆切換到 2D 分割或 GraphX 中包含的其他啟發法。
邊緣分割後,有效圖形平行運算的主要挑戰是有效地將頂點屬性與邊緣結合。由於真實世界的圖形通常有比頂點更多的邊緣,因此我們將頂點屬性移到邊緣。由於並非所有分割都包含與所有頂點相鄰的邊緣,因此我們在內部維護一個路由表,用於在實作 triplets
和 aggregateMessages
等操作所需的結合時,識別廣播頂點的位置。
圖形演算法
GraphX 包含一組圖形演算法,用於簡化分析任務。這些演算法包含在 org.apache.spark.graphx.lib
套件中,而且可以透過 GraphOps
,直接作為 Graph
上的方法來存取。本節說明這些演算法,以及它們的使用方式。
PageRank
PageRank 衡量圖形中每個頂點的重要性,假設從 u 到 v 的邊緣表示 u 對 v 的重要性背書。例如,如果一位 Twitter 使用者被許多其他人追蹤,那麼這位使用者的排名就會很高。
GraphX 隨附 PageRank
物件 上的方法,提供 PageRank 的靜態和動態實作。靜態 PageRank 會執行固定次數的疊代,而動態 PageRank 會執行到排名收斂為止(即,變動幅度低於指定的容忍度)。GraphOps
允許直接呼叫這些演算法,作為 Graph
上的方法。
GraphX 也包含一個範例社群網路資料集,我們可以在上面執行 PageRank。一組使用者提供在 data/graphx/users.txt
,一組使用者之間的關係提供在 data/graphx/followers.txt
。我們計算每個使用者的 PageRank 如下
import org.apache.spark.graphx.GraphLoader
// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
case (id, (username, rank)) => (username, rank)
}
// Print the result
println(ranksByUsername.collect().mkString("\n"))
連接元件
連通元件演算法會標記圖表的每個連通元件,其 ID 為編號最小的頂點。例如,在社群網路中,連通元件可以近似群集。GraphX 在 ConnectedComponents
物件中包含演算法的實作,我們從 PageRank 區段 計算範例社群網路資料集的連通元件,如下所示
import org.apache.spark.graphx.GraphLoader
// Load the graph as in the PageRank example
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Find the connected components
val cc = graph.connectedComponents().vertices
// Join the connected components with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ccByUsername = users.join(cc).map {
case (id, (username, cc)) => (username, cc)
}
// Print the result
println(ccByUsername.collect().mkString("\n"))
三角形計數
當一個頂點有兩個相鄰頂點,且它們之間有邊緣時,該頂點就是三角形的一部分。GraphX 在 TriangleCount
物件中實作三角形計數演算法,它會決定通過每個頂點的三角形數量,提供群集的測量。我們計算 PageRank 區段 中社群網路資料集的三角形數量。請注意,TriangleCount
需要邊緣為正規方向 (srcId < dstId
),且圖表使用 Graph.partitionBy
分割。
import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}
// Load the edges in canonical order and partition the graph for triangle count
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true)
.partitionBy(PartitionStrategy.RandomVertexCut)
// Find the triangle count for each vertex
val triCounts = graph.triangleCount().vertices
// Join the triangle counts with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
(username, tc)
}
// Print the result
println(triCountByUsername.collect().mkString("\n"))
範例
假設我想從一些文字檔建立圖表,將圖表限制在重要的關係和使用者,在子圖上執行 PageRank,然後最後傳回與頂尖使用者相關的屬性。我可以在 GraphX 中用幾行程式碼完成這一切
import org.apache.spark.graphx.GraphLoader
// Load my user data and parse into tuples of user id and attribute list
val users = (sc.textFile("data/graphx/users.txt")
.map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
// Parse the edge data which is already in userId -> userId format
val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Attach the user attributes
val graph = followerGraph.outerJoinVertices(users) {
case (uid, deg, Some(attrList)) => attrList
// Some users may not have attributes so we set them as empty
case (uid, deg, None) => Array.empty[String]
}
// Restrict the graph to users with usernames and names
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
// Compute the PageRank
val pagerankGraph = subgraph.pageRank(0.001)
// Get the attributes of the top pagerank users
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
case (uid, attrList, Some(pr)) => (pr, attrList.toList)
case (uid, attrList, None) => (0.0, attrList.toList)
}
println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))