reducebykey、groupbykey、aggregatebykey、combinebykeyの違いを説明できる人はいますか?これに関するドキュメントを読みましたが、正確な違いを理解できませんでした。
例を挙げて説明していただけると助かります。
groupByKey:。
構文:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" ") )
.map(word => (word,1))
.groupByKey()
.map((x,y) => (x,sum(y)))
groupByKeyは、データがネットワーク経由で送信され、削減ワーカーで収集されるため、ディスクの問題を引き起こす可能性があります。
reruceByKey:。
構文:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" "))
.map(word => (word,1))
.reduceByKey((x,y)=> (x+y))
データは各パーティションで結合され、各パーティションの1つのキーに対して1つの出力のみがネットワークを介して送信されます。 duceByKeyは、すべての値をまったく同じタイプの別の値に結合する必要がありました。
aggregateByKey:。
初期値を取るduceByKeyと同じです。
入力として3つのパラメーター。 私。 初期値。 ii。 コンバインロジック。 iii。 シーケンスopロジック。
例:。
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
combineByKey:。
入力として3つのパラメーター。
1。 初期値:regatoryByKeyとは異なり、常に定数を渡す必要はありません。新しい値を返す関数を渡すことができます。 2。 マージ機能。 3。 関数を組み合わせます。
例:。
val result = rdd.combineByKey(
(v) => (v,1),
( (acc:(Int,Int),v) => acc._1 +v , acc._2 +1 ) ,
( acc1:(Int,Int),acc2:(Int,Int) => (acc1._1+acc2._1) , (acc1._2+acc2._2))
).map( { case (k,v) => (k,v._1/v._2.toDouble) })
result.collect.foreach(println)
duceByKey、aggregateByKey、combineByKey 優先 groupByKey。
参照:。 groupByKeyを避けてください。
reducebykeyとgroupbykeyはどちらも同じ答えを返します。 reduceByKey の例は、大規模なデータセットでよりよく機能します。それは それは、Sparkが各パーティションで共通のキーで出力を結合できることを知っているからです。 データをシャッフルする前に、各パーティションで共通のキーで出力を結合できることを知っているからです。 それは、Sparkがデータをシャッフルする前に、各パーティションで共通のキーで出力を結合できることを知っているからです; >; 一方、groupByKeyを呼び出すと、すべてのキーと値のペアがシャッフルされます。 gt;はシャッフルされます。これは、多くの不要なデータです。 ネットワーク経由で転送されます。
詳細は、以下のリンクを参照してください。
はグループ化+集約のようなものです。reduceBykey()はdataset.group(...).reduce(...)に等しいと言えます。groupByKey()
と違って、シャッフルされるデータは少なくなる。注 :共通点は、いずれもワイド演算であることである。
どちらも同じ結果が得られますが、両方の機能のパフォーマンスには大きな違いがあります。 reduceByKey()
は、 groupByKey()
と比較すると、より大きなデータセットでより適切に機能します。
reduceByKey()
では、データをシャッフルする前に、同じマシンで同じキーとペアが結合されます( reduceByKey()
に渡される関数を使用することにより)。 次に、関数を再度呼び出して、各パーティションからのすべての値を減らし、1つの最終結果を生成します。
groupByKey()
では、すべてのキーと値のペアがシャッフルされます。 これは、ネットワークを介して転送されるための多くの不要なデータです。
ReduceByKey reduceByKey(func, [numTasks])
-.
データは各パーティションで各キーに対して少なくとも1つの値があるように結合される。 そして、シャッフルが行われ、reduceのようなアクションのために、ネットワーク経由で特定のエクゼキュータに送られる。
GroupByKey - groupByKey([numTasks])
。
これはキーの値をマージするのではなく、直接シャッフル処理を行います。 そして、ここで多くのデータが各パーティションに送られます。
そして、各キーの値のマージはシャッフルの後に行われます。 最終ワーカーノードには大量のデータが保存されるため、メモリ不足の問題が発生します。
AggregateByKey - aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
。
reduceByKeyと似ていますが、集約を行う際に初期値を指定することができます。
reduceByKey`の使用法
reduceByKey`は大規模なデータセットに対して実行する場合に使用できる。
reduceByKeyは、入力値と出力値の型が同じ場合に使用する。 を超える
aggregateByKey` を使用する。
また、groupByKey
は使用せず、reduceByKey
を使用することを推奨する。詳細はこちらを参照されたい。
reduceByKeyと
aggregateByKey`の詳細については、こちらの質問も参照してください。
次に、これら4つとは別に、あります。
foldByKeyは、duceByKeyと同じですが、ユーザーがゼロ値を定義しています。
AggregateByKeyは3つのパラメーターを入力として受け取り、マージするために2つの関数を使用します(1つは同じパーティションでのマージ用、もう1つはパーティション間での値のマージ用です)。 最初のパラメーターはZeroValueです)。
一方。
ReduceBykeyは、マージする関数である1つのパラメーターのみを取ります。
CombineByKeyは3つのパラメーターを取り、3つすべてが関数です。 ZeroValueの関数を持つことができることを除いて、集約Bykeyに似ています。
GroupByKeyはパラメーターを使用せず、すべてをグループ化します。 また、パーティション間でのデータ転送のオーバーヘッドでもあります。