sortWith与sortBy
import org.apache.spark.{SparkConf, SparkContext}/*** 先分组, 后排序<br>* sorted:<br>* sortWith:<br>* sortBy:<br>*/
object GroupTopN {val conf = new SparkConf().setMaster("local").setAppName("RDDTest")val sc = new SparkContext(conf)def main(args: Array[String]): Unit = {testSortBy()sc.stop()}def testSorted() = {val rdd1 = sc.parallelize(List(("a", 1), ("a", 2), ("a", 3), ("a", 4), ("b", 5), ("b", 6), ("b", 7), ("b", 8)))//根据key分组并内部降序rdd1.groupByKey().mapValues(f => {//分组内部排序的两种方式f.toList.sorted.reverse//f.toList.sortWith(_>_)}).foreach(println)}def testSortWith() = {val rdd3 = sc.parallelize(List(("a", (1, 5)), ("a", (2, 4)), ("a", (3, 6)), ("a", (4, 1)),("b", (5, 2)), ("b", (6, 3)), ("b", (7, 9)), ("b", (8, 8))))rdd3.groupByKey().map(t => {val its = t._2val sortResult = its.toList.sortWith(_._2 > _._2).take(5)(t._1, sortResult)}).foreach(println)}def testSortBy() = {println("map === sortBy")val word = sc.parallelize(List(("a", (1, 5)), ("a", (2, 4)), ("a", (3, 6)), ("a", (4, 1)),("b", (5, 2)), ("b", (6, 3)), ("b", (7, 9)), ("b", (8, 8))))val group = word.groupByKey().map(t => {// reverse 降序val topItem = t._2.toArray.sortBy(_._2)(Ordering[Int].reverse)for (tp <- topItem) yield (t._1, tp)}).flatMap(t => for (tp <- t) yield tp).foreach(println)}
}
sortWith与sortBy
import org.apache.spark.{SparkConf, SparkContext}/*** 先分组, 后排序<br>* sorted:<br>* sortWith:<br>* sortBy:<br>*/
object GroupTopN {val conf = new SparkConf().setMaster("local").setAppName("RDDTest")val sc = new SparkContext(conf)def main(args: Array[String]): Unit = {testSortBy()sc.stop()}def testSorted() = {val rdd1 = sc.parallelize(List(("a", 1), ("a", 2), ("a", 3), ("a", 4), ("b", 5), ("b", 6), ("b", 7), ("b", 8)))//根据key分组并内部降序rdd1.groupByKey().mapValues(f => {//分组内部排序的两种方式f.toList.sorted.reverse//f.toList.sortWith(_>_)}).foreach(println)}def testSortWith() = {val rdd3 = sc.parallelize(List(("a", (1, 5)), ("a", (2, 4)), ("a", (3, 6)), ("a", (4, 1)),("b", (5, 2)), ("b", (6, 3)), ("b", (7, 9)), ("b", (8, 8))))rdd3.groupByKey().map(t => {val its = t._2val sortResult = its.toList.sortWith(_._2 > _._2).take(5)(t._1, sortResult)}).foreach(println)}def testSortBy() = {println("map === sortBy")val word = sc.parallelize(List(("a", (1, 5)), ("a", (2, 4)), ("a", (3, 6)), ("a", (4, 1)),("b", (5, 2)), ("b", (6, 3)), ("b", (7, 9)), ("b", (8, 8))))val group = word.groupByKey().map(t => {// reverse 降序val topItem = t._2.toArray.sortBy(_._2)(Ordering[Int].reverse)for (tp <- topItem) yield (t._1, tp)}).flatMap(t => for (tp <- t) yield tp).foreach(println)}
}