One , Create partition table
scala> spark.sql("CREATE TABLE ptable(c1 int,c2 int) PARTITIONED BY (step
string)") res1: org.apache.spark.sql.DataFrame = []
Two , Insert test data into the partition
scala> spark.sql("INSERT INTO TABLE ptable PARTITION(step='a') select 1,2")
res2: org.apache.spark.sql.DataFrame = [] scala> spark.sql("select * from
ptable") res3: org.apache.spark.sql.DataFrame = [c1: int, c2: int ... 1 more
field] scala> spark.sql("select * from ptable").show(100,false) +---+---+----+
|c1 |c2 |step| +---+---+----+ |1 |2 |a | +---+---+----+ scala>
spark.sql("INSERT INTO TABLE ptable PARTITION(step='a') select 3,4") res5:
org.apache.spark.sql.DataFrame = [] scala> spark.sql("select * from
ptable").show(100,false) +---+---+----+ |c1 |c2 |step| +---+---+----+ |1 |2 |a
| |3 |4 |a | +---+---+----+ scala> spark.sql("INSERT INTO TABLE ptable
PARTITION(step='b') select 5,6") res7: org.apache.spark.sql.DataFrame = []
scala> spark.sql("INSERT INTO TABLE ptable PARTITION(step='b') select 7,8")
res8: org.apache.spark.sql.DataFrame = [] scala> spark.sql("select * from
ptable").show(100,false) +---+---+----+ |c1 |c2 |step| +---+---+----+ |1 |2 |a
| |3 |4 |a | |5 |6 |b | |7 |8 |b | +---+---+----+ scala> spark.sql("INSERT INTO
TABLE ptable PARTITION(step='c') select 9,10") res10:
org.apache.spark.sql.DataFrame = [] scala> spark.sql("INSERT INTO TABLE ptable
PARTITION(step='c') select 11,12") res11: org.apache.spark.sql.DataFrame = []
scala> spark.sql("select * from ptable").show(100,false) +---+---+----+ |c1 |c2
|step| +---+---+----+ |1 |2 |a | |3 |4 |a | |5 |6 |b | |7 |8 |b | |9 |10 |c |
|11 |12 |c | +---+---+----+ scala> spark.sql("INSERT INTO TABLE ptable
PARTITION(step='c') select 13,14") res13: org.apache.spark.sql.DataFrame = []
scala> spark.sql("select * from ptable").show(100,false) +---+---+----+ |c1 |c2
|step| +---+---+----+ |1 |2 |a | |3 |4 |a | |5 |6 |b | |7 |8 |b | |9 |10 |c |
|11 |12 |c | |13 |14 |c | +---+---+----+
Three , use sql Statement to override the specified partition c, Override successful
scala> spark.sql("INSERT overwrite TABLE ptable PARTITION(step='c') select
13,14") res15: org.apache.spark.sql.DataFrame = [] scala> spark.sql("select *
from ptable").show(100,false) +---+---+----+ |c1 |c2 |step| +---+---+----+ |1
|2 |a | |3 |4 |a | |5 |6 |b | |7 |8 |b | |13 |14 |c | +---+---+----+
Four , Using the newly generated DataFrame And call its write Methods , Override successful
scala> val seq = Seq((111,222,"a")).toDF("c1","c2","step") seq:
org.apache.spark.sql.DataFrame = [c1: int, c2: int ... 1 more field] scala>
spark.sql("select * from ptable").show(100,false) +---+---+----+ |c1 |c2 |step|
+---+---+----+ |11 |22 |a | |11 |22 |a | |11 |22 |a | |333|444|b | |33 |44 |b |
|555|666|c | |55 |66 |c | +---+---+----+ scala>
seq.write.mode("overwrite").insertInto("ptable")
org.apache.spark.SparkException: Dynamic partition strict mode requires at
least one static partition column. To turn this off set
hive.exec.dynamic.partition.mode=nonstrict at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.run(InsertIntoHiveTable.scala:314)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:66)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:61)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:84)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:609)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:609)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609)
at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:283)
at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:269)
... 48 elided
This needs to be closed hive Dynamic partition strict mode , Need to be implemented
scala> spark.sql("set hive.exec.dynamic.partition.mode=nonstrict") res2:
org.apache.spark.sql.DataFrame = [key: string, value: string] scala>
seq.write.mode("overwrite").insertInto("ptable") scala> spark.sql("select *
from ptable").show(100,false) +---+---+----+ |c1 |c2 |step| +---+---+----+
|111|222|a | |333|444|b | |33 |44 |b | |555|666|c | |55 |66 |c | +---+---+----+
Five , use DataFrame And call its write Method override , However, the number of output partition files is reduced to 1, Override successful
scala> spark.sql("select * from ptable").show(100,false) +---+---+----+ |c1
|c2 |step| +---+---+----+ |111|222|a | |111|222|a | |111|222|a | |111|222|a |
|333|444|b | |333|444|b | |333|444|b | |333|444|b | |333|444|b | |555|666|c |
|55 |66 |c | +---+---+----+ scala> val seq =
Seq((333,444,"c"),(333,444,"c"),(333,444,"c"),(333,444,"c"),(333,444,"c")).toDF("c1","c2","step")
seq: org.apache.spark.sql.DataFrame = [c1: int, c2: int ... 1 more field]
scala> seq.coalesce(1).write.mode("overwrite").insertInto("ptable") scala>
spark.sql("select * from ptable").show(100,false) +---+---+----+ |c1 |c2 |step|
+---+---+----+ |111|222|a | |111|222|a | |111|222|a | |111|222|a | |333|444|b |
|333|444|b | |333|444|b | |333|444|b | |333|444|b | |333|444|c | |333|444|c |
|333|444|c | |333|444|c | |333|444|c | +---+---+----+

*** be careful : If sparksql It's with hive Shared metadata is about to hive-site.xml The files are in spark Under the configuration directory of , So it's on the top spark-sql The table structure of the partitioned table created in :
spark-sql> show create table ptable; CREATE TABLE `ptable`(`c1` INT, `c2` INT)
PARTITIONED BY (`step` STRING) ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES (  
'serialization.format' = '1' ) STORED AS   INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'   OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' TBLPROPERTIES (  
'transient_lastDdlTime' = '1572849178' )


The format of the underlying storage file is not spark default parquet format . The partition table created in this way can be used coalesce(1).write.mode("overwrite").insertInto("ptable") Method for dynamic partition overlay write .
But if you don't create a table structure like this in advance , It's using sparksql Of DataFrame.coalesce(1).write.mode("append or
overwrite").partitionBy("step").saveAsTable("ptable") The method of storing partition table directly , The generated table structure is as follows
spark-sql> show create table ptable_spark; CREATE TABLE `ptable_spark` (`c1`
INT, `c2` INT, `step` STRING) USING parquet OPTIONS (  
`partitionOverwriteMode` 'dynamic',   `serialization.format` '1' ) PARTITIONED
BY (step)


In this case , use coalesce(1).write.mode("overwrite").insertInto("ptable") Method will cause the whole table to be covered . This is for the habit saveAsTable It's unfriendly for people who save tables first and then want to write in partitions . Therefore, to achieve partition overlay write, you can only follow the above operation , Create first hive Format table structure re execution set
hive.exec.dynamic.partition.mode=nonstrict Disable strict mode , Final execution coalesce(1).write.mode("overwrite").insertInto("ptable") Write partition overlay .

 

 

Technology
©2019-2020 Toolsou All rights reserved,
c Language to achieve mine clearance Games and mine source code Node.js Middle template engine 3 species Python data structure ,13 Creation methods , This is the conclusion , Great ! The situation of receiving and receiving multi-path git It's set correctly ssh, But it still prompts Permission denied (publickey) new iPhone I won't support it 5G Will lead to further decline in shipment C# Sorting method of Chinese Dictionaries 10909 rice ! Chinese fighter successfully landed in Mariana Trench MySQL8.0MGR Single owner / Multi master installation and switching Java Poor reflection performance ?