消息关闭
    暂无新消息!
使用spark 1.6 

原始数据如下 


val baseDF=hiveContext.sql(newSql)



ID    ID2   C1   C2   C3   C4   C5 .....C33
CM1 a 1 1 1 0 0
CM2 a 1 1 0 1 0
CM3 a 1 0 1 1 1
CM4 a 1 1 1 1 1
CM5 a 1 1 1 1 1
1k2    b    0    0    1    1    1
1K3    b    1    1    1    1    1
1K1    b    0    0    0    0    1








ID    ID2   C1   C2   C3   C4   C5 .....C33
CM1 a 1 1 1 0 0
CM2 a 0 0 0 1 0
CM3 a 0 0 0 0 1
CM4 a 0 0 0 0 0
CM5 a 0 0 0 0 0
1K1    b    0    0    0    0    1
1k2    b    0    0    1    1    0
1K3    b    1    1    0    0    0



逻辑是根据ID2做groupby  然后找ID 最小同时Cn 为1  的设置为1  ,其他的设置为0
而Cn 最多为C33 
如果用case class 会超过上限

这是我目前尝试的 ,出来结果是错的 ,理解过后应该是用错方法 应该要用group


case class testGoods(ID: String, ID2: String, C1 : String, C2 : String)

val cartMap = new HashMap[String, Set[(String,String,String)]] with MultiMap[String,(String,String,String)]

val baseDF=hiveContext.sql(newSql)

val testRDD=baseDF.mapPartitions( partition => {
  while (partition.hasNext) {
    val record = partition.next()
    val ID = record.getString(0)
    if (ID != null && ID != "null") {
      val ID2=record.getString(1)
      val C1=record.getString(2)
      val C2=record.getString(3)
      cartMap.addBinding(ID2, (ID,C1,C2))
    }
  }
  cartMap.iterator
})

val recordList = new mutable.ListBuffer[testGoods]()
val testRDD1=testRDD.mapPartitions( partition => {
  while (partition.hasNext) {
    val record = partition.next()
    val ID2=record._1
    val recordRow= record._2
    val sortedRecordRow = TreeSet[(String,String,String)]() ++ recordRow
    val dic=new mutable.HashMap[String,String]


    for(v<-sortedRecordRow) {
      val ID = v._1
      val C1 = v._2
      val C2 = v._3

      if (dic.contains(ID2)){
        val goodsValue=dic.get(ID2)
        if("1".equals(goodsValue)){
          recordList.append(new testGoods(ID, ID2, "0", C2))
        }else{
          dic.put(ID2,C1)
          recordList.append(new testGoods(ID, ID2, C1,C2))
        }
      }else{
        dic.put(ID2,C1)
        recordList.append(new testGoods(ID, ID2, C1, C2))
      }
    }
  }
  recordList.iterator
})


val searchToItemNewDF = hiveContext.createDataFrame(testRDD1).repartition(1)
  .rdd.map { r => r.mkString("\t") }
  .saveAsTextFile("/data/testRDD1")


查了网路上 好像大家都用 groupBy +agg  实现 自己也尝试使用


baseDF.groupBy("ID2").agg((collect_list($"ID"), collect_list($"ID2")))


但试了好久都没办法把上面使用mapPartitions的逻辑用上
请问如何使用DataFrame实现?



4个回答

︿ 2

case class TestGoods(ID:String,ID2:String,C1:Int,C2:Int)
  case class CompressedRows(id2:String,ids:Array[String],indexs:Array[Int])

  def main(args: Array[String]) {
    import session.implicits._
    val random = new Random(10)
    val generator: () => Int = () => {
      if (random.nextBoolean()) 1
      else 0
    }
    val datasource = (1 to 100).map(idx => TestGoods(s"ID-${idx}", s"ID2-${idx % 20}", generator(), generator()))

    val df = session.sparkContext.makeRDD(datasource).toDF("id", "id2", "c1", "c2")
    df.show(false)
    import session.implicits.newStringEncoder
    df.groupByKey { case Row(_, id2: String, _, _) => id2 }
      .mapGroups {
        case (id2, rows) =>
          val cs: Array[(Int, Boolean)] = (1 to 2).map(_ => (-1, false)).toArray
          val sorted = rows.toList.sortBy(row => row(0).hashCode())
          (0 until sorted.length).foreach { idx =>
            val row = sorted(idx)
            row match {
              case Row(id, id2, c1: Int, c2: Int) =>
                if (!cs(0)._2 && c1 == 1) cs(0) = (idx -> true)
                if (!cs(1)._2 && c2 == 1) cs(1) = (idx -> true)
            }
          }
          val compressedIdx = cs.map(r => if (r._2) r._1 else sorted.length - 1)
          CompressedRows(id2,sorted.map(r=>r(0).toString).toArray,compressedIdx)
      }.show()
  }
︿ 1
Don't know what you try to do. You need more detail example. 是我的中文不够好?
︿ 0
不好意思 我讲的不太清楚 新增测试数据

目标是根据ID2 先分组  然后把分组下的ID 从小到大排列 
然后分别看 C1 ....C33  每一列 找到ID首先出现 1的留下   其他的设为0

原始数据 
ID	ID2	C1	C2 ....C33
CM1 a 1 0
CM2 a 1 0
1K13 f 0 0
CM4 a 1 1
CM5 a 1 1
1K14 f 0 1
1K2 b 0 1
1K3 b 1 1
1K11 f 0 0
1K12 f 0 0
1K1 b 1 0
CM3 a 1 0


目标输出
ID	ID2	C1	C2
CM1 a 1 0
CM2 a 0 0
CM3 a 0 1
CM4 a 0 0
CM5 a 0 0
1K1 b 1 0
1K2 b 0 1
1K3 b 0 0
1K11 f 0 0
1K12 f 0 0
1K13 f 0 0
1K14 f 0 1



︿ 0
谢谢yangguo_2011的回复
虽然我使用的是spark1.6 不能直接跑  但是大概知道如何解决问题了