消息关闭
    暂无新消息!

spark算法请教(scala)

问题作者 : GPS导航2017-08-09发布
现有1个大表A(ip,value)
ip是ip地址转换的10进制Double数值

1个小表B(beginip,lastip,label)
beginip至lastip标识了一段ip地址
label表示这段ip地址归属的运营商(如:移动,联通,电信)
 
需要在大表的map阶段做join,结果将ip地址转换成ip地址归属的运营商,请教spark scala的程序实现代码,谢谢!

6个回答

︿ 1
感谢 link0007的建议!我用你的方法实现了。
但是我的大表很大,有几十亿行,小表就几万行,这种情况在reduce-side join运行非常慢,所以我想map-side join效率会高一些。我的源码如下,在spark-shell --master yarn-client集群运行,报NullPointerException,不知道是什么原因?出异常的行是val f = m.where(k + ">=fstip and lastip>=" + k) ,如果这行改成f=“test”这样的常量就能正常跑通。

import org.apache.spark.sql.hive.HiveContext 
import org.apache.spark.SparkContext 
import org.apache.spark.sql.SQLContext 

object test1 { 
case class httpxdr(localipv4:String,remoteipv4:Double,host:String,wholeurl:String) 
case class iptable(fstip:Double,lastip:Double,label:String) 
def main(args: Array[String]) { 
if (args.length != 1 ){ 
println("usage is ebda <master> <input> <output>") 
return 

val sc = new SparkContext(args(0), "ebda-spark", System.getenv("SPARK_HOME")) 
val hiveCtx = new HiveContext(sc) 
val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._ 
val sighttp = hiveCtx.sql("SELECT * FROM httpnew where reportdate='20160510'").map(h => httpxdr(h(1).toString(),h(2).toString().split("\\.")(0).toDouble*1000000000+h(2).toString().split("\\.")(1).toDouble*1000000+h(2).toString().split("\\.")(2).toDouble*1000+h(2).toString().split("\\.")(3).toDouble,h(3).toString(),h(4).toString())) 
val iptable1 = hiveCtx.sql("select * from configuration.wwipdsttable").map(ip => iptable(ip(0).toString().toDouble,ip(1).toString().toDouble,ip(3).toString())).toDF() 
val broadCastMap = sc.broadcast(iptable1) 
val sighttp1 = sighttp.map(line => (line.iremoteipv4,line)).mapPartitions({ iter => 
var f1="NULL" 
val m=broadCastMap.value 
for { 
(k,v) <- iter 
val f = m.where(k + ">=fstip and lastip>=" + k) 
if (f.count>0) 
f1=f.first.get(2).toString 
} yield (f1,v) 
}).toDF() 
sighttp1.show(10) 

}
︿ 1
    val broadCastMap = sc.broadcast(iptable1)
    val sighttp1 = sighttp.map(x => (x.iremoteipv4,x)).mapPartitions({ iter =>
        val m=broadCastMap.value
        var f1="NULL"
        for {
          (k,v) <- iter
          val f=m.where(k+">=fstip and lastip>="+k)
          if (f.count()>0)
            f1 = m.where(k + ">=fstip and lastip>=" + k).first.get(2).toString
    } yield (f1,v)
    })
算法实现了,不知道是不是最优的,输出的结果就是能匹配到的行,匹配不到的行不输出。
︿ 0
broadcast(RDD.collect())就不报空指针错误了。但是data frame执行了collect后,data frame的where 等算子就无法使用了,broadcast的对象难道不支持data frame么?
︿ 0
我觉得还有进一步从算法上优化的余地。
现在你小表的ip都是范围数据,如果转换成C类或者B类ip网段的穷举会有多大?估计不会是天文数字吧。
这样就把范围轮询转化成了map/reduce最擅长的等值映射了,估计速度会快几个数量级。
︿ 0

val sqlCtx = new SQLContext(sc)
sqlCtx.read().jdbc(xxx).registerTempTable("t_a")
sqlCtx.read().jdbc(xxx).registerTempTable("t_b")

val res = sqlCtx.sql(" SELECT a.ip , b.label  FROM t_a a JOIN t_b b ON a.ip BETWEEN b.beginip AND b.endip ")
res.show(100)