一、簡介#
1.1多數據源支持#
Spark支持以下六個核心數據源,同時Spark社區還提供了多達上百種數據源的讀取方式,能夠滿足絕大部分使用場景。
CSV
JSON
Parquet
ORC
JDBC/ODBCconnections
Plain-textfiles
注:以下所有測試文件均可從本倉庫的resources目錄進行下載
1.2讀數據格式#
所有讀取API遵循以下調用格式:
Copy
//格式
DataFrameReader.format(...).option("key","value").schema(...).load()
//示例
spark.read.format("csv")
.option("mode","FAILFAST")//讀取模式
.option("inferSchema","true")//是否自動推斷schema
.option("path","path/to/file(s)")//文件路徑
.schema(someSchema)//使用預定義的schema
.load()
讀取模式有以下三種可選項:
讀模式描述
permissive當遇到損壞的記錄時,將其所有字段設置為null,并將所有損壞的記錄放在名為_corruptiont_record的字符串列中
dropMalformed刪除格式不正確的行
failFast遇到格式不正確的數據時立即失敗
1.3寫數據格式#
Copy
//格式
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()
//示例
dataframe.write.format("csv")
.option("mode","OVERWRITE")//寫模式
.option("dateFormat","yyyy-MM-dd")//日期格式
.option("path","path/to/file(s)")
.save()
寫數據模式有以下四種可選項:
Scala/Java描述
SaveMode.ErrorIfExists如果給定的路徑已經存在文件,則拋出異常,這是寫數據默認的模式
SaveMode.Append數據以追加的方式寫入
SaveMode.Overwrite數據以覆蓋的方式寫入
SaveMode.Ignore如果給定的路徑已經存在文件,則不做任何操作
二、CSV#
CSV是一種常見的文本文件格式,其中每一行表示一條記錄,記錄中的每個字段用逗號分隔。
2.1讀取CSV文件#
自動推斷類型讀取讀取示例:
Copy
spark.read.format("csv")
.option("header","false")//文件中的第一行是否為列的名稱
.option("mode","FAILFAST")//是否快速失敗
.option("inferSchema","true")//是否自動推斷schema
.load("/usr/file/csv/dept.csv")
.show()
使用預定義類型:
Copy
importorg.apache.spark.sql.types.{StructField,StructType,StringType,LongType}
//預定義數據格式
valmyManualSchema=newStructType(Array(
StructField("deptno",LongType,nullable=false),
StructField("dname",StringType,nullable=true),
StructField("loc",StringType,nullable=true)
))
spark.read.format("csv")
.option("mode","FAILFAST")
.schema(myManualSchema)
.load("/usr/file/csv/dept.csv")
.show()
2.2寫入CSV文件#
Copy
df.write.format("csv").mode("overwrite").save("/tmp/csv/dept2")
也可以指定具體的分隔符:
Copy
df.write.format("csv").mode("overwrite").option("sep","\t").save("/tmp/csv/dept2")
2.3可選配置#
為節省主文篇幅,所有讀寫配置項見文末9.1小節。三、JSON#
3.1讀取JSON文件#
Copy
spark.read.format("json").option("mode","FAILFAST").load("/usr/file/json/dept.json").show(5)
需要注意的是:默認不支持一條數據記錄跨越多行(如下),可以通過配置multiLine為true來進行更改,其默認值為false。
Copy
//默認支持單行
{"DEPTNO":10,"DNAME":"ACCOUNTING","LOC":"NEWYORK"}
//默認不支持多行
{
"DEPTNO":10,
"DNAME":"ACCOUNTING",
"LOC":"NEWYORK"
}
3.2寫入JSON文件#
Copy
df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept")
3.3可選配置#
為節省主文篇幅,所有讀寫配置項見文末9.2小節。
四、Parquet#
Parquet是一個開源的面向列的數據存儲,它提供了多種存儲優化,允許讀取單獨的列非整個文件,這不僅節省了存儲空間而且提升了讀取效率,它是Spark是默認的文件格式。
4.1讀取Parquet文件#
Copy
spark.read.format("parquet").load("/usr/file/parquet/dept.parquet").show(5)
2.2寫入Parquet文件#
Copy
df.write.format("parquet").mode("overwrite").save("/tmp/spark/parquet/dept")
2.3可選配置#
Parquet文件有著自己的存儲規則,因此其可選配置項比較少,常用的有如下兩個:
讀寫操作配置項可選值默認值描述
WritecompressionorcodecNone,
uncompressed,
bzip2,
deflate,gzip,
lz4,orsnappyNone壓縮文件格式
ReadmergeSchematrue,false取決于配置項spark.sql.parquet.mergeSchema
五、ORC#
ORC是一種自描述的、類型感知的列文件格式,它針對大型數據的讀寫進行了優化,也是大數據中常用的文件格式。
5.1讀取ORC文件#
Copy
spark.read.format("orc").load("/usr/file/orc/dept.orc").show(5)
4.2寫入ORC文件#
Copy
csvFile.write.format("orc").mode("overwrite").save("/tmp/spark/orc/dept")
六、SQLDatabases#
Spark同樣支持與傳統的關系型數據庫進行數據讀寫。但是Spark程序默認是沒有提供數據庫驅動的,所以在使用前需要將對應的數據庫驅動上傳到安裝目錄下的jars目錄中。下面示例使用的是Mysql數據庫,使用前需要將對應的mysql-connector-java-x.x.x.jar上傳到jars目錄下。
6.1讀取數據#
讀取全表數據示例如下,這里的help_keyword是mysql內置的字典表,只有help_keyword_id和name兩個字段。
Copy
spark.read
.format("jdbc")
.option("driver","com.mysql.jdbc.Driver")//驅動
.option("url","jdbc:mysql://127.0.0.1:3306/mysql")//數據庫地址
.option("dbtable","help_keyword")//表名
.option("user","root").option("password","root").load().show(10)
從查詢結果讀取數據:
valpushDownQuery="""(SELECT*FROMhelp_keywordWHEREhelp_keyword_id<20)AShelp_keywords"""
spark.read.format("jdbc")
.option("url","jdbc:mysql://127.0.0.1:3306/mysql")
.option("driver","com.mysql.jdbc.Driver")
.option("user","root").option("password","root")
.option("dbtable",pushDownQuery)
.load().show()
//輸出
+---------------+-----------+
help_keyword_id|name
+---------------+-----------+
0|<>
1|ACTION
2|ADD
3|AES_DECRYPT
4|AES_ENCRYPT
5|AFTER
6|AGAINST
7|AGGREGATE
8|ALGORITHM
9|ALL
10|ALTER
11|ANALYSE
12|ANALYZE
13|AND
14|ARCHIVE
15|AREA
16|AS
17|ASBINARY
18|ASC
1
七、Text#
Text文件在讀寫性能方面并沒有任何優勢,且不能表達明確的數據結構,所以其使用的比較少,讀寫操作如下:
7.1讀取Text數據#
Copy
spark.read.textFile("/usr/file/txt/dept.txt").show()
7.2寫入Text數據#
Copy
df.write.text("/tmp/spark/txt/dept")
八、數據讀寫高級特性#
8.1并行讀#
多個Executors不能同時讀取同一個文件,但它們可以同時讀取不同的文件。這意味著當您從一個包含多個文件的文件夾中讀取數據時,這些文件中的每一個都將成為DataFrame中的一個分區,并由可用的Executors并行讀取。
8.2并行寫#
寫入的文件或數據的數量取決于寫入數據時DataFrame擁有的分區數量。默認情況下,每個數據分區寫一個文件。
8.3分區寫入#
分區和分桶這兩個概念和Hive中分區表和分桶表是一致的。都是將數據按照一定規則進行拆分存儲。需要注意的是partitionBy指定的分區和RDD中分區不是一個概念:這里的分區表現為輸出目錄的子目錄,數據分別存儲在對應的子目錄中。
Copy
valdf=spark.read.format("json").load("/usr/file/jsonp.json")
df.write.mode("overwrite").partitionBy("deptno").save("/tmp/spark/partitions")
輸出結果如下:可以看到輸出被按照部門編號分為三個子目錄,子目錄中才是對應的輸出文件。
8.3分桶寫入#
分桶寫入就是將數據按照指定的列和桶數進行散列,目前分桶寫入只支持保存為表,實際上這就是Hive的分桶表。
valnumberBuckets=10
valcolumnToBucketBy="empno"
df.write.format("parquet").mode("overwrite")
.bucketBy(numberBuckets,columnToBucketBy).saveAsTable("bucketedFiles")
.......
具體介紹來源于https://www.cnblogs.com/heibaiying/p/11347390.html