environment :
hudi 0.10.1
spark 2.4.5
hive 2.3.7
hadoop 2.7.5

Will compile the hudi jar, copy reach hive lib Directory :
cp
/Users/xxx/cloudera/lib/hudi/packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.11.0-SNAPSHOT.jar
~/cloudera/cdh5.7/hive/lib/
<>1, Create table and insert data

hudi The table is automatically created , You can also create a table in advance :
CREATE EXTERNAL TABLE `member_rt`( `_hoodie_commit_time` string, `
_hoodie_commit_seqno` string, `_hoodie_record_key` string, `
_hoodie_partition_path` string, `_hoodie_file_name` string, `uid` int, `ad_id`
int, `fullname` string, `iconurl` string, `ts` string, `hudipartition` string)
PARTITIONEDBY ( `dt` string, `dn` string) ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS
INPUTFORMAT'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'
OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION'hdfs://localhost:8020/user/zyh/hudi/hivetest';

Test write data
import com.google.gson.Gson import com.zyh.bean.DwsMember import org.apache.
hudi.DataSourceWriteOptions import org.apache.hudi.config.HoodieIndexConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.
index.HoodieIndex import org.apache.spark.sql.{SaveMode, SparkSession} object
HudiTestHive{ def main(args: Array[String]): Unit = { val sparkSession =
SparkSession.builder() .appName("dwd_member_import") .master("local[*]") .config
("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .
enableHiveSupport() .getOrCreate() val ssc = sparkSession.sparkContext ssc.
hadoopConfiguration.set("fs.defaultFS", "hdfs://localhost:8020") ssc.
hadoopConfiguration.set("dfs.nameservices", "localhost") import org.apache.spark
.sql.functions._ import sparkSession.implicits._ val commitTime = System.
currentTimeMillis().toString // Generation submission time val df = sparkSession.read.text(
"/user/test/ods/member.log") .mapPartitions(partitions => { val gson = new Gson
partitions.map(item => { gson.fromJson(item.getString(0), classOf[DwsMember]) })
}) .withColumn("ts", lit(commitTime)) .withColumn("hudipartition", concat_ws("/"
, col("dt"), col("dn"))) Class.forName("org.apache.hive.jdbc.HiveDriver"); df.
write.format("org.apache.hudi") .option(DataSourceWriteOptions.
TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) // Select the type of table
Exactly MERGE_ON_READ still COPY_ON_WRITE .option(DataSourceWriteOptions.
RECORDKEY_FIELD_OPT_KEY, "uid") // Set primary key .option(DataSourceWriteOptions.
PRECOMBINE_FIELD_OPT_KEY, "ts") // Of the data update timestamp .option(DataSourceWriteOptions.
PARTITIONPATH_FIELD_OPT_KEY, "hudipartition") //hudi Partition column .option(
"hoodie.table.name", "member") //hudi Table name .option(DataSourceWriteOptions.
HIVE_URL_OPT_KEY, "jdbc:hive2://localhost:10000") //hiveserver2 address .option(
DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "hudi_test") // set up hudi And hive Synchronized databases .
option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "member") // set up hudi And hive Synchronized table name ,
The name doesn't have to be hive Consistent external table name .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY
, "dt,dn") //hive Partition columns for table synchronization .option(DataSourceWriteOptions.
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].
getName) // Partition extractor according to / Extract partition .option(DataSourceWriteOptions.
HIVE_SYNC_ENABLED_OPT_KEY, "true") // Set up dataset registration and sync to hive .option(HoodieIndexConfig.
BLOOM_INDEX_UPDATE_PARTITION_PATH, "true") // Set when the partition is changed , Whether the partition directory of current data is changed .option(
HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
// Set index type currently has HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM Four indexes To ensure that the partition can be found after it is changed, the global must be set GLOBAL_BLOOM
.option("hoodie.insert.shuffle.parallelism", "12") .option(
"hoodie.upsert.shuffle.parallelism", "12") .mode(SaveMode.Append) .save(
"/user/zyh/hudi/hivetest") } }
Query table data :

Hudi integration hive, Namely Hudi Synchronize data to hive, For hive query

<>2, Table type comparison

aim at copy_on_write Table and merge_on_read Table generates two table data at the same time , read member Log data generates two types of tables
object HudiTestHiveForTableType { def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "zhangyunhao") val sparkSession =
SparkSession.builder() .appName("dwd_member_import") .master("local[*]") .config
("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .
enableHiveSupport() .getOrCreate() val ssc = sparkSession.sparkContext ssc.
hadoopConfiguration.set("fs.defaultFS", "hdfs://localhost:8020") ssc.
hadoopConfiguration.set("dfs.nameservices", "localhost") generateData(
sparkSession) // queryData(sparkSession) // updateData(sparkSession) //
queryData(sparkSession) } def generateData(sparkSession: SparkSession) = {
import org.apache.spark.sql.functions._ import sparkSession.implicits._ val
commitTime= System.currentTimeMillis().toString // Generation submission time val df = sparkSession.
read.text("/user/test/ods/member.log") .mapPartitions(partitions => { val gson =
new Gson partitions.map(item => { gson.fromJson(item.getString(0), classOf[
DwsMember]) }) }).withColumn("ts", lit(commitTime)) .withColumn("hudipartition",
concat_ws("/", col("dt"), col("dn"))) // Merge_ON_READ Two views are automatically created when creating a table :
member1_ro ( Read optimization view ) / member1_rt ( Live view ) df.write.format("org.apache.hudi") .
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.
MOR_TABLE_TYPE_OPT_VAL) // Specify the table type as MERGE_ON_READ .option(
"hoodie.insert.shuffle.parallelism", 12) .option(
"hoodie.upsert.shuffle.parallelism", 12) .option(DataSourceWriteOptions.
RECORDKEY_FIELD_OPT_KEY, "uid") // Specify record key .option(DataSourceWriteOptions.
PRECOMBINE_FIELD_OPT_KEY, "ts") // Of the data update timestamp .option(DataSourceWriteOptions.
PARTITIONPATH_FIELD_OPT_KEY, "hudipartition") //hudi Partition column .option(
"hoodie.table.name", "hudimembertest1") //hudi Table name .option(DataSourceWriteOptions.
HIVE_URL_OPT_KEY, "jdbc:hive2://localhost:10000") //hiveserver2 address .option(
DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "hudi_test") // set up hudi And hive Synchronized databases .
option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "member1") // set up hudi And hive Synchronized table name
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt,dn")
//hive Partition columns for table synchronization .option(DataSourceWriteOptions.
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].
getName) // Partition extractor according to / Extract partition .option(DataSourceWriteOptions.
HIVE_SYNC_ENABLED_OPT_KEY, "true") // Set up dataset registration and sync to hive .option(HoodieIndexConfig.
BLOOM_INDEX_UPDATE_PARTITION_PATH, "true") // Set when the partition is changed , Whether the partition directory of current data is changed .option(
HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
// Set index type currently has HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM Four indexes To ensure that the partition can be found after it is changed, the global must be set GLOBAL_BLOOM
.option("hoodie.insert.shuffle.parallelism", "12") .option(
"hoodie.upsert.shuffle.parallelism", "12") .mode(SaveMode.Overwrite) .save(
"/user/zyh/hudi/hudimembertest1") // COPY_ON_WRITE Only read optimization views will be automatically created when creating tables : member2 df.
write.format("org.apache.hudi") .option(DataSourceWriteOptions.
TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
// Specify the table type as COPY_ON_WRITE .option("hoodie.insert.shuffle.parallelism", 12) .option(
"hoodie.upsert.shuffle.parallelism", 12) .option(DataSourceWriteOptions.
RECORDKEY_FIELD_OPT_KEY, "uid") // Specify record key .option(DataSourceWriteOptions.
PRECOMBINE_FIELD_OPT_KEY, "ts") // Of the data update timestamp .option(DataSourceWriteOptions.
PARTITIONPATH_FIELD_OPT_KEY, "hudipartition") //hudi Partition column .option(
"hoodie.table.name", "hudimembertest2") //hudi Table name .option(DataSourceWriteOptions.
HIVE_URL_OPT_KEY, "jdbc:hive2://localhost:10000") //hiveserver2 address .option(
DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "hudi_test") // set up hudi And hive Synchronized databases .
option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "member2") // set up hudi And hive Synchronized table name
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt,dn")
//hive Partition columns for table synchronization .option(DataSourceWriteOptions.
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].
getName) // Partition extractor according to / Extract partition .option(DataSourceWriteOptions.
HIVE_SYNC_ENABLED_OPT_KEY, "true") // Set up dataset registration and sync to hive .option(HoodieIndexConfig.
BLOOM_INDEX_UPDATE_PARTITION_PATH, "true") // Set when the partition is changed , Whether the partition directory of current data is changed .option(
HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
// Set index type currently has HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM Four indexes To ensure that the partition can be found after it is changed, the global must be set GLOBAL_BLOOM
.option("hoodie.insert.shuffle.parallelism", "12") .option(
"hoodie.upsert.shuffle.parallelism", "12") .mode(SaveMode.Overwrite) .save(
"/user/zyh/hudi/hudimembertest2") } }
check hive, Found automatically 3 Zhang Biao , Continue using the command to view the table structure


You can see that there are two formats HoodieParquetInputFormat and HoodieParquetRealtimeInputFormat, Two in hudi Medium is read optimization view and real-time view .

Merge_ON_READ Two views are automatically created when creating a table ,COPY_ON_WRITE Only read optimization views will be automatically created when creating tables .
Query table , All synchronized data .

<>3, Modify data

Update the table after data is available , Update only two tables respectively uid 0-9 of 10 Pieces of data , take full_name Change all to testName
def updateData(sparkSession: SparkSession) = { import org.apache.spark.sql.
functions._ import sparkSession.implicits._ val commitTime = System.
currentTimeMillis().toString // Generation submission time val df = sparkSession.read.text(
"/user/test/ods/member2.log") .mapPartitions(partitions => { val gson = new
Gson partitions.map(item => { gson.fromJson(item.getString(0), classOf[DwsMember
]) }) }).where("uid>=0 and uid<=9") val result = df.map(item => { item.fullname
= "testName" // modify fullname by testName item }).withColumn("ts", lit(commitTime)) .
withColumn("hudipartition", concat_ws("/", col("dt"), col("dn"))) //
result.show() result.write.format("org.apache.hudi") .option(
"hoodie.insert.shuffle.parallelism", 12) .option(
"hoodie.upsert.shuffle.parallelism", 12) .option(DataSourceWriteOptions.
RECORDKEY_FIELD_OPT_KEY, "uid") // Specify record key .option(DataSourceWriteOptions.
PRECOMBINE_FIELD_OPT_KEY, "ts") // Of the data update timestamp .option(DataSourceWriteOptions.
PARTITIONPATH_FIELD_OPT_KEY, "hudipartition") //hudi Partition column .option(
"hoodie.table.name", "hudimembertest1") //hudi Table name 1 .option(DataSourceWriteOptions
.HIVE_URL_OPT_KEY, "jdbc:hive2://localhost:10000") //hiveserver2 address .option(
DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "hudi_test") // set up hudi And hive Synchronized databases .
option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "member1") // set up hudi And hive Synchronized table name
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt,dn")
//hive Partition columns for table synchronization .option(DataSourceWriteOptions.
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].
getName) // Partition extractor according to / Extract partition .option(DataSourceWriteOptions.
HIVE_SYNC_ENABLED_OPT_KEY, "true") // Set up dataset registration and sync to hive .option(HoodieIndexConfig.
BLOOM_INDEX_UPDATE_PARTITION_PATH, "true") // Set when the partition is changed , Whether the partition directory of current data is changed .option(
HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
// catalog index .option("hoodie.insert.shuffle.parallelism", "12") .option(
"hoodie.upsert.shuffle.parallelism", "12") .mode(SaveMode.Append) .save(
"/user/zyh/hudi/hudimembertest1") result.write.format("org.apache.hudi") .option
("hoodie.insert.shuffle.parallelism", 12) .option(
"hoodie.upsert.shuffle.parallelism", 12) .option(DataSourceWriteOptions.
RECORDKEY_FIELD_OPT_KEY, "uid") // Specify record key .option(DataSourceWriteOptions.
PRECOMBINE_FIELD_OPT_KEY, "ts") // Of the data update timestamp .option(DataSourceWriteOptions.
PARTITIONPATH_FIELD_OPT_KEY, "hudipartition") //hudi Partition column .option(
"hoodie.table.name", "hudimembertest2") //hudi Table name .option(DataSourceWriteOptions.
HIVE_URL_OPT_KEY, "jdbc:hive2://localhost:10000") //hiveserver2 address .option(
DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "hudi_test") // set up hudi And hive Synchronized databases .
option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "member2") // set up hudi And hive Synchronized table name
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt,dn")
//hive Partition columns for table synchronization .option(DataSourceWriteOptions.
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].
getName) // Partition extractor according to / Extract partition .option(DataSourceWriteOptions.
HIVE_SYNC_ENABLED_OPT_KEY, "true") // Set up dataset registration and sync to hive .option(HoodieIndexConfig.
BLOOM_INDEX_UPDATE_PARTITION_PATH, "true") // Set when the partition is changed , Whether the partition directory of current data is changed .option(
HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
// catalog index .option("hoodie.insert.shuffle.parallelism", "12") .option(
"hoodie.upsert.shuffle.parallelism", "12") .mode(SaveMode.Append) .save(
"/user/zyh/hudi/hudimembertest2") }
After modification , View corresponding hdfs File changes under path


You can see the new data ,Merger_On_Read( Read time consolidation table ) Modify data by means of incremental log , and Copy_On_Write( Write real-time table ) It adopts the function of completely updating a new document .

Comparison between the two

After final modification , Query corresponding hive surface

Can be found ro The end read optimization view has not changed . query rt surface

rt Table data has changed , Real time view is a consolidated view for querying basic data and log data . Last query member2 Live view of

<>3, summary

This can be understood as MERGE_ON_READ Table for , Records table data in incremental form , All modifications are saved in the form of logs . and COPY_ON_READ The table of is used to save data in the way of full coverage , Each time there is a modification operation, it will be re merged with the historical data to produce a new data file , And the historical data will not be deleted .

Two table views ,HoodieParquetInputFormat Format only supports the original data of query table ,HoodieParquetRealtimeInputFormat The format supports the query of the latest data in the consolidated view of tables and logs .

Technology
©2019-2020 Toolsou All rights reserved,
【C++ Must see for entry 】C++ from 0 reach 1 Introductory programming axios Interceptor packaging and use Spring Boot Interview must ask : Automatic configuration principle VMware 16 install centos 7 Detailed tutorial C Language data structure - Sequence table delete duplicates V2.0.0 The 12th Blue Bridge Cup c++b Group personal problem solving On sending data from serial port single chip microcomputer to upper computer centos7 install RabbitMqjava Polymorphic array of opencv-python Fourier transform and inverse transform