Spark(十) -- Spark Streaming API编程

本文测试的Spark版本是1.3.1

Spark Streaming编程模型:

第一步:

需要一个StreamingContext对象,该对象是Spark Streaming操作的入口 ,而构建一个StreamingContext对象需要两个参数:

1、SparkConf对象:该对象是配置Spark 程序设置的,例如集群的Master节点,程序名等信息

2、Seconds对象:该对象设置了StreamingContext多久读取一次数据流

第二步:

构建好入口对象之后,直接调用该入口的方法读取各种不同方式传输过来的数据流,如:Socket,HDFS等方式。并会将数据转换成DStream对象进行统一操作

第三步:

DStream本身是一种RDD序列,Streaming接受数据流之后会进行切片,每个片都是一个RDD,而这些RDD最后都会包装到一个DStream对象中统一操作。在这个步骤中,进行对数据的业务处理

第四步:

调用入口对象的start和awaitTermination开始读取数据流

下面分别使用不同的Spark Streaming 处理方式完成WordCount单词计数

HDFS文件测试

object HDFSWordCount {
  def main(args: Array[String]) {
    //参数设置
    if (args.length < 2) {
      System.err.println("Usgae : <spark master> <hdfs path>")
      System.exit(1)
    }

    //第一步:创建StreamingContext入口
    val sparkConf = new SparkConf().setMaster(args(0)).setAppName("HDFSWordCount")
    val streaming = new StreamingContext(sparkConf,Seconds(10))
    //第二步:调用textFileStream读取指定路径的文件
    val data = streaming.textFileStream(args(1))
    //第三步,数据业务处理
    //使用flatMap将数据map之后的分切压成一个DStream
    val words = data.flatMap(_.split(" "))
    val wordCount = words.map(x => (x,1)).reduceByKey(_+_)
    wordCount.print()
    //第四步
    streaming.start()
    streaming.awaitTermination()
}

Socket数据流测试

object NetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println("Usage : <spark master> <hostname> <port>")
      System.exit(1)
    }
    val sparkConf = new SparkConf().setMaster(args(0)).setAppName("NetworkWordCount")
    val streaming = new StreamingContext(sparkConf,Seconds(10))
    //参数:1、主机名;2、端口号;3、存储级别
    val data =
 streaming.socketTextStream(args(1),args(2).toInt,StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordCount = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCount.print()
    streaming.start()
    streaming.awaitTermination()
}

可以看到,对于同一中业务处理逻辑来说,不同的数据来源只要调用不同的方法接收即可,转换成DStream之后的处理步骤是一模一样的

下面的代码时配合测试Socket数据的,使用java命令执行jar包,传入参数:1、端口号;2、产生数据的频率(毫秒)

即可在指定的端口上产生数据提供Spark Streaming接收

package Streaming

import java.net.ServerSocket
import java.io.PrintWriter

object Logger {
  def generateContent(index: Int): String = {
    import scala.collection.mutable.ListBuffer
    val charList = ListBuffer[Char]()
    for (i <- 65 to 90) {
      charList += i.toChar
    }
    val charArray = charList.toArray
    charArray(index).toString()
  }

  def index = {
    import java.util.Random
    val ran = new Random
    ran.nextInt(7)
  }

  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      System.err.println("Usage:<port> <millisecond>")
      System.exit(1)
    }
    val listener = new ServerSocket(args(0).toInt)
    while (true) {
      val socket = listener.accept()
      new Thread() {
        override def run = {
          println("Get client connected from:" + socket.getInetAddress)
          val out = new PrintWriter(socket.getOutputStream(), true)
          while (true) {
            Thread.sleep(args(1).toLong)
            val content = generateContent(index)
            println(content)
            out.write(content + ‘\n‘)
            out.flush()
          }
          socket.close()
        }
      }.start()
    }
  }
}

在上述的例子中,文中使用的是Seconds(10)

也就是说每10秒钟处理一次数据

第一个10秒处理的结果是不会影响到第二个10秒的

但是有时候我们需要进行汇通统计,要用到之前几个10秒阶段的数据怎么办?

这里要用到一个updateStateByKey方法,该方法会保存上次计算数据的状态,以供下次计算使用。

上代码:

object StatefulWordCount {
  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println("Usage : <spark master> <hostname> <port>")
      System.exit(1)
    }
    //定义一个匿名函数,并赋值给updateFunc
    //该函数是updateStateByKey方法的参数,该方法要求传入一个匿名参数且参数格式为values:Seq[Int],state:Option[Int]
    //其中values是当前的数据,state表示之前的数据
    //这个匿名函数的作用就是将各个10秒阶段的结果累加汇总
    val updateFunc = (values:Seq[Int],state:Option[Int]) => {
    val now = values.foldLeft(0)(_+_)
    val old = state.getOrElse(0)
    Some(now + old)
}
    val conf = new SparkConf().setAppName("StatefulWordCount").setMaster(args(0))
    val streaming = new StreamingContext(conf, Seconds(10))
    //checkpoint会将数据放在指定的路径上,这个操作是必须的,为了保护数据,如果不设置会报异常
    streaming.checkpoint(".")
    val lines = streaming.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordDStream = words.map(x => (x, 1))
    //在这里将updateFunc传入
    val stateDStream = wordDStream.updateStateByKey(updateFunc)
    stateDStream.print()
    streaming.start()
    streaming.awaitTermination()
}

在Spark Streaming中还有一个window的概念,即滑动窗体

下图是官方文档中给出的解释:

使用滑动窗体要设置两个指定参数:

1、窗体长度

2、滑动时间

例如,设置一个窗体长度为5,滑动时间为2,意味着,每2秒处理上一个5秒内的数据流

这样的处理可以应用在例如微博统计最热搜索词

每2秒钟统计一次过去5秒内的最热搜索词

统计最热搜索词实例代码:

object WindowWordCount {
  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println("Usage : <spark master> <hostname> <port> <Streaming Seconds> <Window Seconds> <Slide Seconds>")
      System.exit(1)
    }
    val conf = new SparkConf().setAppName("WindowWordCount").setMaster(args(0))
    val streaming = new StreamingContext(conf, Seconds(args(3).toInt))
    //checkpoint会将数据放在指定的路径上,这个操作是必须的,为了保护数据,如果不设置会报异常
    streaming.checkpoint(".")
    val lines = streaming.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_ONLY)
    val words = lines.flatMap(_.split(" "))
    //map操作之后数据的格式为:
    //(a,1)(b,1)...(n,1)格式
    //调用reduceByKeyAndWindow替代普通的reduceByKey
    //最后两个参数分别是窗体长度和滑动时间
    val wordCount = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(args(4).toInt),
      Seconds(args(5).toInt))
    //对结果进行降序排序
    //由于DStream本身不具备RDD的一些操作,调用transform方法可以让RDD的一些操作(例如sortByKey等)作用在其之上,返回的仍然是一个DStream对象
    val sorted = wordCount.map { case (char, count) => (count, char) }.transform(_.sortByKey(false)).map
    { case (count, char) => (char, count) }
    sorted.print()
    streaming.start()
    streaming.awaitTermination()
  }
}

reduceByKeyAndWindow有两种使用方法:

1、educeByKeyAndWindow(_ + _, Seconds(5),seconds(1))

2、reduceByKeyAndWindow(_ + , - _, Seconds(5),seconds(1))

二者的区别见下图:

第一种是简单粗暴的直接累加

而第二种方式就显得比较文雅和高效了

例如现在计算t+4的累积数据

第一种方式是,直接从t+…+(t+4)

第二种处理为,用已经计算好的(t+3)的数据加上(t+4)的数据,在减去(t-1)的数据,就可以得到和第一种方式一样的结果,但是中间复用了三个数据(t+1,t+2,t+3)

以上为Spark Streaming API的简单使用

时间: 05-24

Spark(十) -- Spark Streaming API编程的相关文章

利用SparkLauncher 类以JAVA API 编程的方式提交Spark job

一.环境说明和使用软件的版本说明: hadoop-version:hadoop-2.9.0.tar.gz spark-version:spark-2.2.0-bin-hadoop2.7.tgz java-version:jdk1.8.0_151 集群环境:单机伪分布式环境. 二.适用背景 在学习Spark过程中,资料中介绍的提交Spark Job的方式主要有两种(我所知道的):第一种是通过命令行的方式提交Job,使用spark 自带的spark-submit工具提交,官网和大多数参考资料都是已这

大数据技术之_27_电商平台数据分析项目_02_预备知识 + Scala + Spark Core + Spark SQL + Spark Streaming + Java 对象池

第0章 预备知识0.1 Scala0.1.1 Scala 操作符0.1.2 拉链操作0.2 Spark Core0.2.1 Spark RDD 持久化0.2.2 Spark 共享变量0.3 Spark SQL0.3.1 RDD.DataFrame 与 DataSet0.3.2 DataSet 与 RDD 互操作0.3.3 RDD.DataFrame 与 DataSet 之间的转换0.3.4 用户自定义聚合函数(UDAF)0.3.5 开窗函数0.4 Spark Streaming0.4.1 Dst

Spark 以及 spark streaming 核心原理及实践

导语 spark 已经成为广告.报表以及推荐系统等大数据计算场景中首选系统,因效率高,易用以及通用性越来越得到大家的青睐,我自己最近半年在接触spark以及spark streaming之后,对spark技术的使用有一些自己的经验积累以及心得体会,在此分享给大家. 本文依次从spark生态,原理,基本概念,spark streaming原理及实践,还有spark调优以及环境搭建等方面进行介绍,希望对大家有所帮助. spark 生态及运行原理 Spark 特点 运行速度快 => Spark拥有DA

Spark(1.6.1) Sql 编程指南+实战案例分析

首先看看从官网学习后总结的一个思维导图 概述(Overview) Spark SQL是Spark的一个模块,用于结构化数据处理.它提供了一个编程的抽象被称为DataFrames,也可以作为分布式SQL查询引擎. 开始Spark SQL Spark SQL中所有功能的入口点是SQLContext类,或者它子类中的一个.为了创建一个基本的SQLContext,你所需要的是一个SparkContext. 除了基本的SQLContext,你还可以创建一个HiveContext,它提供了基本的SQLCon

Spark基础与Java Api介绍

原创文章,转载请注明: 转载自http://www.cnblogs.com/tovin/p/3832405.html  一.Spark简介 1.什么是Spark 发源于AMPLab实验室的分布式内存计算平台,它克服了MapReduce在迭代式计算和交互式计算方面的不足. 相比于MapReduce,Spark能充分利用内存资源提高计算效率. 2.Spark计算框架 Driver程序启动很多workers,然后workers在(分布式)文件系统中读取数据后转化为RDD(弹性分布式数据集),最后对RD

Real Time Credit Card Fraud Detection with Apache Spark and Event Streaming

https://mapr.com/blog/real-time-credit-card-fraud-detection-apache-spark-and-event-streaming/ Editor's Note: Have questions about the topics discussed in this post? Search for answers and post questions in the Converge Community. In this post we are

spark读写压缩文件API使用详解

最近研究了下Spark如何读写压缩格式的文件,主要有如下三种方式,这里以lzo方式压缩为例     /*******************old hadoop api*************************/     val confHadoop = new JobConf     confHadoop.set("mapred.output.compress", "true")     confHadoop.set("mapred.output

salesforce零基础学习(八十五)streaming api 简单使用(接近实时获取你需要跟踪的数据的更新消息状态)

Streaming API参考链接: https://trailhead.salesforce.com/en/modules/api_basics/units/api_basics_streaming https://resources.docs.salesforce.com/210/latest/en-us/sfdc/pdf/api_streaming.pdf 背景:工作中我们有可能会有这样相关的需求:某些数据很重要,需要实时监控是否有变化,或者某些数据在其他的平台有集成.如果有变化,不刷新页

【转】科普Spark,Spark是什么,如何使用Spark

本博文是转自如下链接,为了方便自己查阅学习和他人交流.感谢原博主的提供! http://www.aboutyun.com/thread-6849-1-1.html http://www.aboutyun.com/thread-6850-1-1.html 科普Spark,Spark核心是什么,如何使用Spark(1) 阅读本文章可以带着下面问题: 1.Spark基于什么算法的分布式计算(很简单) 2.Spark与MapReduce不同在什么地方 3.Spark为什么比Hadoop灵活 4.Spar