updateState按键可以用于DStream基于即将到来的数据创建有状态。它需要一个功能:
object UpdateStateFunctions { def updateState(current: Seq[Double], previous: Option[StatCounter]) = { previous.map(s => s.merge(current)).orElse(Some(StatCounter(current))) } }
其中采用一系列current值,即Option先前状态的,并返回Option更新状态的。全部放在一起:
import org.apache.spark._ import org.apache.spark.streaming.dstream.DStream import scala.collection.mutable.Queue import org.apache.spark.util.StatCounter import org.apache.spark.streaming._ object UpdateStateByKeyApp { def main(args: Array[String]) { val sc = new SparkContext("local", "updateStateByKey", new SparkConf()) val ssc = new StreamingContext(sc, Seconds(10)) ssc.checkpoint("/tmp/chk") val queue = Queue( sc.parallelize(Seq(("foo", 5.0), ("bar", 1.0))), sc.parallelize(Seq(("foo", 1.0), ("foo", 99.0))), sc.parallelize(Seq(("bar", 22.0), ("foo", 1.0))), sc.emptyRDD[(String, Double)], sc.emptyRDD[(String, Double)], sc.emptyRDD[(String, Double)], sc.parallelize(Seq(("foo", 1.0), ("bar", 1.0))) ) val inputStream: DStream[(String, Double)] = ssc.queueStream(queue) inputStream.updateStateByKey(UpdateStateFunctions.updateState _).print() ssc.start() ssc.awaitTermination() ssc.stop() } }