消息关闭
    暂无新消息!
import org.apache.spark.broadcast._
import scala.collection.mutable.ArrayBuffer

import java.io._
object SimpleApp {

    var broadcast1: Broadcast[ArrayBuffer[String]] = _
    val ip_grp_start = ArrayBuffer[String]()

    def matchword(s: (String, Int)): List[(String ,Int)] = {

        val fw = new FileWriter("/home/hadoop/a.txt", true)
        val out = new PrintWriter(fw)

        out.println("11111111111111111"+broadcast1.value.length)
        out.close()
        return List(s)
    }

    def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("Simple Application")
        val sc = new SparkContext(conf)
        val line = sc.textFile("wordcount.txt")

        ip_grp_start += "fsda"
        ip_grp_start += "dsfsf"

        broadcast1 = line.sparkContext.broadcast(ip_grp_start)

        val words = line.flatMap(line => line.split(" "))
        val wordpair = words.map(word => (word,1))

        val word = wordpair.flatMap(x => matchword(x))
        val pair = word.reduceByKey(_+_)

        pair.collect().foreach(println)
        sc.stop()
    }
}

4个回答

︿ 1
这是我写的测试广播变量函数的例子,不知道为何传进去会为空,如果我在运行时候去掉--master参数,就会变成单机运行,运行就不会报任何错误。求大神解释。。
︿ 1
17/02/19 15:07:46 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.1.2:56023 (size: 2.8 KB, free: 366.3 MB)
17/02/19 15:07:47 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.2:56023 (size: 20.4 KB, free: 366.3 MB)
17/02/19 15:07:48 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 192.168.1.2, executor 0): java.lang.NullPointerException
at SimpleApp$.matchword(SimpleApp.scala:20)
at SimpleApp$$anonfun$4.apply(SimpleApp.scala:38)
at SimpleApp$$anonfun$4.apply(SimpleApp.scala:38)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)

上面是错误报的空指针的错误。其实就是传到闭包函数里,这个广播变量的值为空。
︿ 0
The reason is that your broadcast var is defined in the class/object level. When the class is initialized in the worker node, it will ony see a NULL, instead of the value you assigned in main method.

Change the broadcast scope to the main method. You can pass the value to your method.

See the detail example here:

https://github.com/databricks/learning-spark/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/ChapterSixExample.scala#L75


    val signPrefixes = sc.broadcast(loadCallSignTable())
    val countryContactCounts = contactCounts.map{case (sign, count) =>
      val country = lookupInArray(sign, signPrefixes.value)
      (country, count)
    }.reduceByKey((x, y) => x + y)


See how the signPrefixes broadcast val is defined in the main method, and how its value passed to lookupInArray method.
︿ 0
我也碰到这个问题,还没有找到解决方法。
我估计是broadcast变量被发送到Executor上的时间导致的,
也就是说 还没有执行 broadcast1 = line.sparkContext.broadcast(ip_grp_start) 这句代码。broadcast1已经被发送到Executor上了。

我暂时的解决办法是,不定义 var broadcast1。而是  val  broadcast1 = line.sparkContext.broadcast(ip_grp_start),
然后将 broadcast1作为 matchword 函数的参数传递过去
def matchword(s: (String, Int), broadcast1: Broadcast[ArrayBuffer[String]]): List[(String ,Int)]