需求将Oracle中的业务基础表增量数据导入Hive中,与当前的全量表合并为最新的全量表。
设计涉及的三张表:- 全量表:保存了截止上一次同步时间的全量基础数据表
- 增量表:增量临时表
- 更新后的全量表:更新后的全量数据表
步骤:- 通过Sqoop将Oracle中的表导入Hive,模拟全量表和增量表
- 通过Hive将“全量表+增量表”合并为“更新后的全量表”,覆盖当前的全量表
步骤1:通过Sqoop将Oracle中表的导入Hive,模拟全量表和增量表为了模拟场景,需要一张全量表,和一张增量表,由于数据源有限,所以两个表都来自Oracle中的OMP_SERVICE,全量表包含所有数据,在Hive中名称叫service_all,增量表包含部分时间段数据,在Hive中名称叫service_tmp。
(1)全量表导入:导出所有数据,只要部分字段,导入到Hive指定表里为实现导入Hive功能,需要先配置HCatalog(HCatalog是Hive子模块)的环境变量,/etc/profile中新增:export HCAT_HOME=/home/fulong/Hive/apache-hive-0.13.1-bin/hcatalog 执行以下命令导入数据:fulong@FBI006:~/Sqoop/sqoop-1.4.4/bin$ ./sqoop import > --connect jdbc:oracle:thin:@192.168.0.147:1521:ORCLGBK --username SP --password fulong > --table OMP_SERVICE > --columns "SERVICE_CODE,SERVICE_NAME,SERVICE_PROCESS,CREATE_TIME,ENABLE_ORG,ENABLE_PLATFORM,IF_DEL" > --hive-import --hive-table SERVICE_ALL
注意:用户名必须大写
(2)增量表导入:只导出所需时间范围内的数据,只要部分字段,导入到Hive指定表里使用以下命令导入数据:fulong@FBI006:~/Sqoop/sqoop-1.4.4/bin$ ./sqoop import > --connect jdbc:oracle:thin:@192.168.0.147:1521:ORCLGBK --username SP --password fulong > --table OMP_SERVICE > --columns "SERVICE_CODE,SERVICE_NAME,SERVICE_PROCESS,CREATE_TIME,ENABLE_ORG,ENABLE_PLATFORM,IF_DEL" > --where "CREATE_TIME > to_date("2012/12/4 17:00:00","yyyy-mm-dd hh24:mi:ss") and CREATE_TIME < to_date("2012/12/4 18:00:00","yyyy-mm-dd hh24:mi:ss")" > --hive-import --hive-overwrite --hive-table SERVICE_TMP
注意:- 由于使用了--hive-overwrite参数,所以该语句可反复执行,往service_tmp表中覆盖插入最新的增量数据;
- Sqoop还支持使用复杂Sql语句查询数据导入,相亲参见http://sqoop.apache.org/docs/1.4.4/SqoopUserGuide.html的“7.2.3.Free-form Query Imports”章节
(3)验证导入结果:列出所有表,统计行数,查看表结构hive> show tables;OKsearchlogsearchlog_tmpservice_allservice_tmpTime taken: 0.04 seconds, Fetched: 4 row(s)hive> select count(*) from service_all;Total jobs = 1Launching Job 1 out of 1Number of reduce tasks determined at compile time: 1In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number>In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number>In order to set a constant number of reducers: set mapreduce.job.reduces=<number>Starting Job = job_1407233914535_0013, Tracking URL = http://FBI003:8088/proxy/application_1407233914535_0013/Kill Command = /home/fulong/Hadoop/hadoop-2.2.0/bin/hadoop job -kill job_1407233914535_0013Hadoop job information for Stage-1: number of mappers: 3; number of reducers: 12014-08-21 16:51:47,389 Stage-1 map = 0%, reduce = 0%2014-08-21 16:51:59,816 Stage-1 map = 33%, reduce = 0%, Cumulative CPU 1.36 sec2014-08-21 16:52:01,996 Stage-1 map = 67%, reduce = 0%, Cumulative CPU 2.45 sec2014-08-21 16:52:07,877 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 3.96 sec2014-08-21 16:52:17,639 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 5.29 secMapReduce Total cumulative CPU time: 5 seconds 290 msecEnded Job = job_1407233914535_0013MapReduce Jobs Launched:Job 0: Map: 3 Reduce: 1 Cumulative CPU: 5.46 sec HDFS Read: 687141 HDFS Write: 5 SUCCESSTotal MapReduce CPU Time Spent: 5 seconds 460 msecOK6803Time taken: 59.386 seconds, Fetched: 1 row(s)hive> select count(*) from service_tmp;Total jobs = 1Launching Job 1 out of 1Number of reduce tasks determined at compile time: 1In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number>In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number>In order to set a constant number of reducers: set mapreduce.job.reduces=<number>Starting Job = job_1407233914535_0014, Tracking URL = http://FBI003:8088/proxy/application_1407233914535_0014/Kill Command = /home/fulong/Hadoop/hadoop-2.2.0/bin/hadoop job -kill job_1407233914535_0014Hadoop job information for Stage-1: number of mappers: 3; number of reducers: 12014-08-21 16:53:03,951 Stage-1 map = 0%, reduce = 0%2014-08-21 16:53:15,189 Stage-1 map = 67%, reduce = 0%, Cumulative CPU 2.17 sec2014-08-21 16:53:16,236 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 3.38 sec2014-08-21 16:53:57,935 Stage-1 map = 100%, reduce = 22%, Cumulative CPU 3.78 sec2014-08-21 16:54:01,811 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 5.34 secMapReduce Total cumulative CPU time: 5 seconds 340 msecEnded Job = job_1407233914535_0014MapReduce Jobs Launched:Job 0: Map: 3 Reduce: 1 Cumulative CPU: 5.66 sec HDFS Read: 4720 HDFS Write: 3 SUCCESSTotal MapReduce CPU Time Spent: 5 seconds 660 msecOK13Time taken: 75.856 seconds, Fetched: 1 row(s)hive> describe service_all;OKservice_code stringservice_name stringservice_process stringcreate_time stringenable_org stringenable_platform stringif_del stringTime taken: 0.169 seconds, Fetched: 7 row(s)hive> describe service_tmp;OKservice_code stringservice_name stringservice_process stringcreate_time stringenable_org stringenable_platform stringif_del stringTime taken: 0.117 seconds, Fetched: 7 row(s)
步骤2:通过Hive将“全量表+增量表”合并为“更新后的全量表”,覆盖当前的全量表
合并新表的逻辑如下:
- 整个tmp表进入最终表中
- all表的数据中不包含在tmp表service_code范围内的数据全部进入新表
执行以下sql语句可以合并得到更新后的全量表:hive> select * from service_tmp union all select a.* from service_all a left outer join service_tmp b on a.service_code = b.service_code where b.service_code is null;
我们需要直接将查询结果更新回全量表中:hive> insert overwrite table service_all select * from service_tmp union all select a.* from service_all a left outer join service_tmp b on a.service_code = b.service_code where b.service_code is null;
注意,将查询结果插入表有以下两类语法:- INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]] select_statement1 FROM from_statement;
- INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 FROM from_statement;
INSERT OVERWRITE 将会覆盖现有数据,由于当前场景需要更新全量表,所以使用了覆盖模式;
INSERT INTO 不会覆盖现有数据,是追加数据
到此为止,Hive中的service_all表已经更新为最新的数据!在真实场景中,需要结合shell+cron实现该过程的定时执行。通过Sqoop实现Mysql / Oracle 与HDFS / Hbase互导数据 http://www.linuxidc.com/Linux/2013-06/85817.htm[Hadoop] Sqoop安装过程详解 http://www.linuxidc.com/Linux/2013-05/84082.htm用Sqoop进行MySQL和HDFS系统间的数据互导 http://www.linuxidc.com/Linux/2013-04/83447.htmHadoop Oozie学习笔记 Oozie不支持Sqoop问题解决 http://www.linuxidc.com/Linux/2012-08/67027.htmHadoop生态系统搭建(hadoop hive hbase zookeeper oozie Sqoop) http://www.linuxidc.com/Linux/2012-03/55721.htmHadoop学习全程记录——使用Sqoop将MySQL中数据导入到Hive中 http://www.linuxidc.com/Linux/2012-01/51993.htm更多Oracle相关信息见Oracle 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=12
本文永久更新链接地址