Spark SQL的介绍和DataFrame的建立及使用

1.

Spark SQL定位处理结构化数据的模块。SparkSQL提供相应的优化机制,并支持不同语言的开发API。
java、scala、Python,类SQL的方法调用(DSL)
2.

RDD与Spark SQL的比较说明:
  使用Spark SQL的优势:a.面向结构化数据;b.优化机制;
  RDD缺点:a.没有优化机制,如对RDD执行Filter操作;
     b.RDD类型转换后无法进行模式推断
3.

DataFrame/SchemaRDD
  DataFrame是一个分布式的数据集合,该数据集合以命名列的方式进行整合。
  Dateframe=RDD(数据集)+Schema(元数据/模型)
  SchemaRDD就是DataFrame的前身,在1.3.0版本后。
  DataFrame存放的是ROW对象。每个Row 对象代表一行记录。      

  SchemaRDD还包含记录的结构信息(即数据字段)
4.

创建Spark SQL环境
  a.将SparkSQL依赖库添加至pom.xml文件中
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.1.2</version>
    </dependency>
  b.创建SparkSQL Context-->SparkSession
    通过SparkSession.builder()创建构造器;
    并调用.appName("sparkSQL").master("local")设置集群模式以及app名称
    最后必须调用getOrCreate()方法创建SparkSession对象。
    val spark = SparkSession.builder().appName("sparkSQL").master("local").getOrCreate()
  c.加载外部数据源:
    通过SparkSession的read()方法加载不同的数据源:json、CVS、jdbc、textfile、parquert等
    val df = spark.read.textFile("file:///d:/测试数据/users.txt").toDF()
    df.show()

DF的创建方式

  

  (1)通过SparkSession的createDataFrame(...)方法创建DF对象
    a.将Seq序列转换为DF
    b.将RDD[Product]多元素转换为DF
  (2)通过SparkSession的read读取外部文件调用toDF()
  (3)通过导入隐式转换,可直接将Scala中的序列转换为DF
    val spark = SparkSession.builder().appName("sparkSQL").master("local").getOrCreate()
    import spark.implicits._
    val list = List(("zhangsan",12,"changchun"),("lilei",25,"haerbin"))
    val df_implicits = list.toDF()

 查看DF的Schema

1.案例说明:
  val rdd = sc.textFile("file:///d:/测试数据/users.txt").map(x=>x.split(" ")).map(x=>(x(0),x(1),x(2)))
  val df_rdd = spark.createDataFrame(rdd)
  df_rdd.show()
  df_rdd.select("_1","_2").where("_1 like ‘%o%‘").show()
  df_rdd.printSchema()
    root
      |-- _1: string (nullable = true)
      |-- _2: string (nullable = true)
      |-- _3: string (nullable = true)
  通过case用例类可以对DF进行Schema匹配
  case class Person(name:String,age:Int,address:String)

  val rdd = sc.textFile("file:///d:/测试数据/users.txt").map(x=>x.split(" ")).map(x=>new Person(x(0),x(1).toInt,x(2)))
  val df_rdd = spark.createDataFrame(rdd)
  df_rdd.printSchema()
    root
      |-- name: string (nullable = true)
      |-- age: integer (nullable = true)
      |-- address: string (nullable = true)
  df_rdd.show()
    +------+---+-------+
    | name|age|address|
    +------+---+-------+
    | anne| 22| NY|
    | joe| 39| CO|
    |alison| 35| NY|
    +------+---+-------+
2.实现简单的select操作
  df_rdd.select("name","age").where("name like ‘%o%‘").show()
    +------+---+
    | name|age|
    +------+---+
    | joe| 39|
    |alison| 35|
    | bob| 71|
    +------+---+

 DF的操作方式

1.显示:
  df_rdd.show()
2.查询:
  df_rdd.select("name").show()
3.条件查询:
  df_rdd.select($"name",$"age").where("name like ‘%o%‘").show() //注:引入spark.implicits._
    +------+---+
    | name|age|
    +------+---+
    | joe| 39|
    |alison| 35|
    | bob| 71|
    +------+---+
4.条件查询:
  df_rdd.select($"name",$"age"+1).where("name like ‘%o%‘").show() //$是scala的用法,需要隐式转换 import spark.implicits._
    +------+---------+
    | name|(age + 1)|
    +------+---------+
    | joe| 40|
    |alison| 36|
    | bob| 72|
    +------+---------+
5.过滤操作
  a.通过过滤表达式:
    df_rdd.filter("age > 36").show()
  b.通过func式编程进行处理,DF中每个元素均为ROW
    df_rdd.filter(x=>{if(x.getAs[Int]("age") > 36) true else false }).show()
6.分组操作
    df_rdd.groupBy("address").count().show
      +-------+-----+
      |address|count|
      +-------+-----+
      | OR| 2|
      | VA| 2|
      | CA| 2|
      | NY| 3|
      | CO| 1|
      +-------+-----+

原文地址:https://www.cnblogs.com/lyr999736/p/10202276.html

时间: 12-31

Spark SQL的介绍和DataFrame的建立及使用的相关文章

Spark Sql的介绍

1.hive 与sparkSql比较 2.使用的语言 3.使用的框架 4.描述

第56课:Spark SQL和DataFrame的本质

一.Spark SQL与Dataframe Spark SQL之所以是除Spark core以外最大和最受关注的组件的原因: a) 能处理一切存储介质和各种格式的数据(你同时可以方便的扩展Spark SQL的功能来支持更多的数据类型,例如KUDO) b)Spark SQL 把数据仓库的计算能力推向了一个新的高度.不仅是无敌的计算速度(Spark SQL比Shark快了一个数量级,Shark比Hive快了一个数量级),尤其是在tungsten成熟以后会更加无可匹敌.更为重要的是把数据仓库的计算复杂

Spark SQL数据加载和保存实战

一:前置知识详解: Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作, Load:可以创建DataFrame, Save:把DataFrame中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型. 二:Spark SQL读写数据代码实战: import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRD

转】Spark SQL 之 DataFrame

原博文出自于: http://www.cnblogs.com/BYRans/p/5003029.html 感谢! Spark SQL 之 DataFrame 转载请注明出处:http://www.cnblogs.com/BYRans/ 概述(Overview) Spark SQL是Spark的一个组件,用于结构化数据的计算.Spark SQL提供了一个称为DataFrames的编程抽象,DataFrames可以充当分布式SQL查询引擎. DataFrames DataFrame是一个分布式的数据

spark结构化数据处理:Spark SQL、DataFrame和Dataset

本文讲解Spark的结构化数据处理,主要包括:Spark SQL.DataFrame.Dataset以及Spark SQL服务等相关内容.本文主要讲解Spark 1.6.x的结构化数据处理相关东东,但因Spark发展迅速(本文的写作时值Spark 1.6.2发布之际,并且Spark 2.0的预览版本也已发布许久),因此请随时关注Spark SQL官方文档以了解最新信息. 文中使用Scala对Spark SQL进行讲解,并且代码大多都能在spark-shell中运行,关于这点请知晓. 概述 相比于

Spark修炼之道(进阶篇)——Spark入门到精通:第十三节 Spark Streaming—— Spark SQL、DataFrame与Spark Streaming

主要内容 Spark SQL.DataFrame与Spark Streaming 1. Spark SQL.DataFrame与Spark Streaming 源码直接参照:https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala import org.apache.spark.SparkConf

Spark SQL 中 dataFrame 学习总结

dataFrame多了数据的结构信息.就是schema. RDD是分布式的 Java对象的集合.DataFrame是分布式的Row对象的集合. DataFrame 提供了详细的结构信息,可以让sparkSQL清楚的知道数据集中包含哪些列,列的名称和类型各是什么? RDD是分布式的 Java对象的集合.DataFrame是分布式的Row对象的集合.DataFrame除了提供了 比RDD更丰富的算子以外,更重要的特点是提升执行效率.减少数据读取以及执行计划的优化,比如 filter下推.裁剪等. 提

Spark SQL 之 Join 实现

原文地址:Spark SQL 之 Join 实现 Spark SQL 之 Join 实现 涂小刚 2017-07-19 217标签: spark , 数据库 Join作为SQL中一个重要语法特性,几乎所有稍微复杂一点的数据分析场景都离不开Join,如今Spark SQL(Dataset/DataFrame)已经成为Spark应用程序开发的主流,作为开发者,我们有必要了解Join在Spark中是如何组织运行的. SparkSQL总体流程介绍 在阐述Join实现之前,我们首先简单介绍SparkSQL

spark sql 优化心得

本篇文章主要记录最近在使用spark sql 时遇到的问题已经使用心得. 1 spark 2.0.1 中,启动thriftserver 或者是spark-sql时,如果希望spark-sql run on hdfs,那样需要增加参数 "--conf spark.sql.warehouse.dir=hdfs://HOSTNAME:9000/user/hive/warehouse" 例如启动thriftserver: bin/start-thriftserver.sh --master s

Spark SQL Catalyst源码分析之Physical Plan 到 RDD的具体实现

接上一篇文章Spark SQL Catalyst源码分析之Physical Plan,本文将介绍Physical Plan的toRDD的具体实现细节: 我们都知道一段sql,真正的执行是当你调用它的collect()方法才会执行Spark Job,最后计算得到RDD. lazy val toRdd: RDD[Row] = executedPlan.execute() Spark Plan基本包含4种操作类型,即BasicOperator基本类型,还有就是Join.Aggregate和Sort这种