准备代码
// 屏蔽日志Logger.getLogger("org.apache.spark").setLevel(Level.WARN)val spark = SparkSession.builder().appName("SparkSessionT").master("local[1]").getOrCreate()val commodityDF = spark.read.format("jdbc") //利用jdbc读取MySQL数据库的数据.option("url", "jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC") //连接的URL.option("driver", "com.mysql.jdbc.Driver") //连接的驱动.option("dbtable", "commodityPrice") //获取的 数据表.option("user", "root") //用户名.option("password", "123456") //密码.load() //登录
使用SQL语句查询
// 转换为虚拟表 "commodityTable"使用 SQL语句查询commodityDF.registerTempTable("commodityTable")spark.sql("select * from commodityTable").show(10)
查看数据
// 默认只显示20条commodityDF.show()// 是否最多只显示20个字符,默认为truecommodityDF.show(false)// 完整查看10条数据commodityDF.show(10, false)// 取前n行数据, 和take与head不同的是,limit方法不是Action操作。commodityDF.limit(5).show(false)
加载数据到数组
// 将数据加载到集合里面commodityDF.collect().foreach(println)// 和collect类似,只不过转换成listcommodityDF.collectAsList()
获取指定字段的统计信息
// 获取指定字段的统计信息count, mean, stddev, min, max 返回的还是DataframecommodityDF.describe("price", "yprice").show(false)// 遍历每个统计信息commodityDF.describe("price", "yprice").collect().foreach(println)// 取0行的 "count" 数据println(commodityDF.describe("price", "yprice").collect()(0))
获取n行数据
// 获取第一行数据println(commodityDF.first())println(commodityDF.head())// 获取前5条数据commodityDF.head(5).foreach(println)commodityDF.take(5).foreach(println)// 以行数据list返回commodityDF.takeAsList(5)
条件查询
// where和filter方法和SQL的where后面的语句一样commodityDF.where("price>100 or yprice<200").show()commodityDF.filter("price>100 or yprice<200").show()
选取字段
// 选取 name ,price字段commodityDF.select("name", "price").show(5, false)// 对price字段的数据都+100commodityDF.select(commodityDF("name"), commodityDF("price") + 100).show(5)// 对指定字段进行特殊处理; price字段重名名为 p, 对price取四舍五入commodityDF.selectExpr("name", "price as p", "round(price)").show(10)// 只获取单个字段val name = commodityDF.col("name")val parice = commodityDF.apply("price")
删除指定字段
// 删除指定字段 priceval c1 = commodityDF.drop("price")val c2 = c1.drop(c1("yprice"))c2.show(5)
排序
// 降序排序commodityDF.orderBy(-commodityDF("price")).show(5)commodityDF.orderBy(commodityDF("price").desc).show(5)// 升序排序commodityDF.orderBy("price").show(5)commodityDF.orderBy(commodityDF("price")).show(5)
分组
// 对字段数据分组, 再对分组后的数据处理 count max mean sum aggcommodityDF.groupBy("degree").count().show(5)commodityDF.groupBy(commodityDF("degree")).count().show(5)commodityDF.groupBy(commodityDF("degree")).max("price", "yprice").show(5)
agg聚合
// 聚合agg一般和group by一起使用commodityDF.agg("price" -> "max", "yprice" -> "sum").show()// 对degree分组然后取 price字段的最大值和downNum的平均值commodityDF.groupBy("degree").agg("price" -> "max", "downNum" -> "mean").show()
去除重复数据
// 注意自增ID不同的问题val df = commodityDF.drop("id")println(df.count())// 去除一行数据完全相同的println(df.distinct().count())// 删除指定字段存在相同的数据println(df.dropDuplicates(Seq("price", "yprice")).count())
同字段数据组合(unionAll)
// 同字段组合commodityDF.limit(5).unionAll(commodityDF.limit(5)).show()
同字段数据行组合(join)
// 组合数据DFval df1 = commodityDF.limit(5)val df2 = commodityDF.limit(10)val df3 = commodityDF.filter("id>5 and id<11")// 只有id字段相同的才会横向组合 innerdf1.join(df2, "id").show()df1.join(df2, df1("id") === df2("id")).show()df1.join(df2, df1("id") === df2("id"), "inner").show()// 根据 id和name两个字段joindf2.join(df3, Seq("id", "name")).show()
Dataframe的逻辑操作
// 计算出两个DataFrame的交集数据df1.intersect(df2).show()// 获取df2中df3没有的记录df2.except(df3).show()
重命名字段名
// 重命名字段名,如果指定的字段名不存在,不进行任何操作commodityDF.limit(5).show()commodityDF.withColumnRenamed("name", "reName").limit(5).show()
添加字段
// 往当前DataFrame中新增一列commodityDF.withColumn("newCol", commodityDF("name")).show()
拆分字段数据为行数据(字段本身并不删除)
// 将列字段数据拆分为行数据commodityDF.limit(1).explode("name", "name_") { time: String => time.split(" ") }.show()