SparkSQL学习03-数据读取与存储
发布日期:2025-05-01 23:04:27
浏览次数:17
分类:精选文章
本文共 4523 字,大约阅读时间需要 15 分钟。
SparkSQL数据读取与保存教程
SparkSQL提供了通用的数据读取和保存方式,通过统一的API支持多种数据格式的读写操作。以下将详细介绍SparkSQL的数据读取与保存方法。
数据的加载
SparkSQL支持两种主要方式来加载数据:
1.1 方式一:spark.read.format
- 语法格式:
spark.read.format("格式").load("路径") - 支持格式:包括csv、jdbc、json、orc、parquet和textFile。
- 注意事项:
- 读取jdbc时,需在
format和load之间添加JDBC参数,如url、user、password、tablename。 - 当数据源为Parquet文件时,无需使用
format,SparkSQL可以直接读取。
- 读取jdbc时,需在
示例:读取JSON数据
package _02SparkSQLimport java.util.Propertiesimport org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData { def main(args: Array[String]): Unit = { val session = SparkSession.builder() .appName("SparkReadData") .master("local[*]").getOrCreate() // 读取JSON数据 val frame: DataFrame = session.read.json("data/people.json") frame.printSchema() frame.show() }} 示例:读取JDBC数据
package _02SparkSQLimport java.util.Propertiesimport org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData { def main(args: Array[String]): Unit = { val session = SparkSession.builder() .appName("SparkReadData") .master("local[*]").getOrCreate() // 读取JDBC数据 val frame: DataFrame = session.read.jdbc( "jdbc:mysql://localhost:3306/mydb1", "location_info", new Properties() { { put("user", "root") put("password", "123456") } } ) frame.printSchema() frame.show() }} 1.2 方式二:spark.read.xxx
- 简化语法:
spark.read.json("路径")等方式替代spark.read.format("json").load("路径")。 - 注意事项:
- 读取jdbc时,参数为
url、tablename和properties对象,properties中包含user和password。 - 支持的格式包括csv、json、orc、parquet和text。
- 读取jdbc时,参数为
示例:读取JSON数据
package _02SparkSQLimport java.util.Propertiesimport org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData { def main(args: Array[String]): Unit = { val session = SparkSession.builder() .appName("SparkReadData") .master("local[*]").getOrCreate() // 读取JSON数据 val frame: DataFrame = session.read.json("data/people.json") frame.printSchema() frame.show() }} 示例:读取CSV数据
package _02SparkSQLimport java.util.Propertiesimport org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData { def main(args: Array[String]): Unit = { val session = SparkSession.builder() .appName("SparkReadData") .master("local[*]").getOrCreate() // 读取CSV数据 val frame: DataFrame = session.read.csv("data/country.csv") frame.printSchema() frame.show() }} 数据的保存
SparkSQL提供了两种主要方式来保存数据:
2.1 方式一:spark.write.format
- 语法格式:
spark.write.format("格式").mode("模式").save("路径") - 支持格式:包括csv、jdbc、json、orc、parquet和textFile。
- SaveMode:
SaveMode.ErrorifExists(默认):文件已存在时抛出异常。SaveMode.Append:文件已存在时追加新内容。SaveMode.Overwrite:文件已存在时覆盖数据。SaveMode.Ignore:文件已存在时忽略新数据。
示例:保存到ORC格式
package _02SparkSQLimport org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object _07SparkWriteData { def main(args: Array[String]): Unit = { val session = SparkSession.builder() .appName("SparkWriteData") .master("local").getOrCreate() // 读取数据 val frame: DataFrame = session.read.orc("data/student.orc") // 保存到JSON格式 frame.write.format("json").mode(SaveMode.Overwrite).save("data/OWStudent") session.stop() }} 2.2 方式二:spark.write.xxx
- 简化语法:
spark.write.json("路径").mode("模式")等方式替代spark.write.format("json").mode("模式").save("路径")。 - 注意事项:
- 保存jdbc时,参数为
url、tablename和properties对象,properties中包含user和password。 - 支持的格式包括csv、json、orc、parquet和text。
- 保存jdbc时,参数为
示例:保存到JDBC数据库
package _02SparkSQLimport java.util.Propertiesimport org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object _07SparkWriteData { def main(args: Array[String]): Unit = { val session = SparkSession.builder() .appName("SparkWriteData") .master("local").getOrCreate() // 读取数据 val frame: DataFrame = session.read.orc("data/student.orc") // 保存到JDBC数据库 val properties = new Properties() { { put("user", "root") put("password", "123456") } } frame.write.mode(SaveMode.Append).jdbc( "jdbc:mysql://localhost:3306/mydb1", "student", properties ) session.stop() }} 通过以上方法,用户可以方便地使用SparkSQL进行数据读取与保存操作。
发表评论
最新留言
网站不错 人气很旺了 加油
[***.192.178.218]2026年06月12日 10时46分59秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!