首页 > 代码库 > 9.数据的操作

9.数据的操作

数据操作能力是大数据分析至关重要的能力。数据操作主要包括:更改(exchange),移动(moving),排序(sorting),转换(transforming)。Hive提供了诸多查询语句,关键字,操作和方法来进行数据操作。




一、 数据更改
数据更改主要包括:LOAD, INSERT, IMPORT, and EXPORT



1. LOAD DATA
load关键字的作用是将数据移动到hive中。如果是从HDFS加载数据,则加载成功后会删除源数据;如果是从本地加载,则加载成功后不会删除源数据。

数据:employee_hr.txt http://pan.baidu.com/s/1c0D9TpI

例:
hive>(不用输入,在此表示在Hive的shell输入以下命令,下同)
CREATE TABLE IF NOT EXISTS employee_hr(
name string,
employee_id int,
sin_number string,
start_date timestamp
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘|‘
STORED AS TEXTFILE;

例:
hive>
LOAD DATA LOCAL INPATH ‘/apps/ca/yanh/employee_hr.txt‘ OVERWRITE INTO TABLE employee_hr;
注1:在指令中LOCAL关键字用于指定数据从本地加载,如果去掉该关键字,默认从HDFS进行加载!
       OVERWRITE关键字指定使用覆盖方式进行加载数据,否则使用附加方式进行加载。

注2:如果数据加载到分区表,则必须指定分区列。



2. INSERT
同RDBMS一样,Hive也支持从其他hive表提取数据插入到指定表,使用INSERT关键字。INSERT操作是Hive数据处理中最常用的将已有数据填充进指定表操作。在Hive中,INSERT可以和OVERWRITE一起使用实现覆盖插入,可以进行多表插入,动态分区插入以及提取数据至HDFS或本地。

例:
hive>
CREATE TABLE ctas_employee AS SELECT * FROM employee;
TRUNCATE TABLE employee;        //删除employee中的数据,保留表结构

例:
hive>
INSERT INTO TABLE employee SELECT * FROM ctas_employee;
注:这里使用Hive提供的beeline工具进行连接,以便清晰的显示数据表。

例:从CTE插入数据
hive>
WITH a AS (SELECT * FROM ctas_employee) FROM a INSERT OVERWRITE TABLE employee SELECT *;        //效果和上例相同
注:Hive从0.13.0版本开始支持CTE

例:多表插入
hive>
CREATE TABLE employee_internal LIKE employee;
FROM ctas_employee INSERT OVERWRITE TABLE employee SELECT * INSERT OVERWRITE TABLE employee_internal SELECT *;
SELECT * FROM employee_internal;

Hive除了支持向静态分区插入静态数据,还支持插入动态数据,如日期

例:动态分区插入
        动态分区默认是关闭的,可通过以下设置开启:SET hive.exec.dynamic.partition=true;
        Hive默认至少需要一个partition列是静态的,可以通过以下设置关闭:SET hive.exec.dynamic.partition.mode=nonstrict;
hive>
INSERT INTO TABLE employee_partitioned PARTITION(year,month)
SELECT name,array(‘Toronto‘) AS work_place,
named_struct("sex","Male","age",30) AS sex_age,
map("Python",90) AS skills_score,
map("R&D",array(‘Developer‘)) AS depart_title,
year(start_date) AS year, month(start_date) AS month
FROM employee_hr eh WHERE eh.employee_id = 102;

例:
hive>
SELECT * FROM employee_partitioned;

例:提取数据至本地(默认使用^A分离列,换行符分离行)
注:Hive提取数据只能使用OVERWRITE, 不能使用INTO。
注:在一些Hadoop版本中目录深度只支持到2层,可以使用以下设置修复:SET hive.insert.into.multilevel.dirs=true;
hive>
INSERT OVERWRITE LOCAL DIRECTORY ‘/apps/ca‘  SELECT * FROM employee;
注:默认下Hive会将数据按reducer数量生成多个输出文件,可以使用以下命令进行合并:
hdfs dfs -getmerge hdfs://<host_name>:port/user/output/directory

例:使用特定分隔符分隔行
hive>
INSERT OVERWRITE LOCAL DIRECTORY ‘/apps/ca/yanh/data‘
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘|‘
SELECT * FROM employee;

例:Hive同样能多目录输出文件
hive>
FROM employee
INSERT OVERWRITE LOCAL DIRECTORY ‘/apps/ca/yanh/data1‘
SELECT *
INSERT OVERWRITE LOCAL DIRECTORY ‘/apps/ca/yanh/data2‘
SELECT * ;



3. EXPORT and IMPORT
这两条命令是Hive用来和HDFS进行数据迁移或者进行数据备份的,从Hive0.8.0开始可用。EXPORT可以导出数据以及元数据到HDFS,元数据被命名为_metadata. 数据被放在命名为data的目录下

例:
hive>
EXPORT TABLE employee TO ‘/apps/ca/yanh/data‘;
注:输出目录不能已存在

例:将输出的数据导入Hive(将数据导入到已有的表将报错)
hive>
IMPORT FROM ‘/apps/ca/yanh/data‘;

例:导入到新表(也可以是EXTERNAL表)
hive>
IMPORT TABLE employee_imported FROM ‘/apps/ca/yanh/data‘;

例:partition表导入导出
hive>
EXPORT TABLE employee_partitioned PARTITION (year=2015,month=05)
TO ‘/apps/ca/yanh/data1‘;

例:
hive>
IMPORT TABLE employee_partitioned_imported FROM ‘/apps/ca/yanh/data1‘;




二、 数据排序
数据排序主要包括:ORDER, and SORT. 该操作同样经常使用,以便生成已排序表从而进行后面的包括top N, maximum, minimum等取值操作。
主要操作包括ORDER BY (ASC|DESC)、SORT BY(ASC|DESC)、DISTRIBUTE BY、CLUSTER BY



1. ORDER BY (ASC|DESC)
跟RDBMS的ORDER BY操作类似,该操作输出一个全局排序的结果,因此reducer的输出结果仅有一个,所以如果大数据量下过程是十分漫长的!这时可以时候LIMIT关键字提高输出效率。如果Hive设置hive.mapred.mode = strict,那LIMIT关键字将不可使用(默认是可以使用的)。

例:按名字从大到小排序(如果数据量大,可以在最后加上LIMIT n来显示前n行)
hive>
SELECT name FROM employee ORDER BY name DESC;



2. SORT BY(ASC|DESC)
与ORDER BY (ASC|DESC)操作不同,SORT BY(ASC|DESC)操作仅输出局部有序的结果(即多个reducer输出,每个输出有序)。如果要输出全局有序,可以通过SET mapred.reduce.tasks=1;来制定reducer个数为1. 此时效果与ORDER BY (ASC|DESC) 相同。SORT BY指定列排序,可以在数据从mapper端全部传入之前完成排序(只要该列传输完毕)。

例:
hive>
SET mapred.reduce.tasks=2;
SELECT name FROM employee SORT BY name DESC;
设置2个reducer,可以看到结果并不是从大到小排列的。


例:
hive>
SET mapred.reduce.tasks=1;
SELECT name FROM employee SORT BY name DESC;
设置1个reducer,此时与ORDER BY结果相同!



3. DISTRIBUTE BY
该操作类似于RDBMS中的GROUP BY,根据制定的列将mapper的输出分组发送至reducer,而不是根据partition来分组数据。
注:如果使用了SORT BY,那么必须在DISTRIBUTE BY之后,且要分发的列必须出现在已选择的列中(因为SORT BY的性质)。

例:为选择employee_id,出错
hive>
SELECT name FROM employee_hr DISTRIBUTE BY employee_id;

例:
hive>
SELECT name,employee_id FROM employee_hr DISTRIBUTE BY employee_id SORT BY name;



4. CLUSTER BY
CLUSTER BY类似于DISTRIBUTE BY和SORT BY的组合作用(作用于相同列),但不同于ORDER BY的是它仅在每个reducer进行排序,而不是全局排序,且不支持ASC和DESC。如果要实现全局排序,可以先进行CLUSTER BY然后再ORDER BY。

例:
hive>
SELECT name, employee_id FROM employee_hr CLUSTER BY name;

ORDER BY和CLUSTER BY不同之处如下图所示:




三、 数据操作和方法
为了更进一步的数据操作,我们可以对Hive进行诸如表达式、操作、方法等来对数据进行转换。在Hive wiki https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF上已经对每一个表达式和方法进行了规范。同时Hive也已经定义了一些关系型的操作、算术运算操作、逻辑运算操作、复杂类型构造器以及复杂类型操作。对于其中的关系型的操作、算术运算操作、逻辑运算操作而言,和SQL/Java中的标准操作比较类似。
Hive中的方法大致可以分为以下几类:

    数学函数:这些方法主要用于数学计算,如RAND()何E().
    汇总函数:这些方法主要用于对复杂类型进行size、key、value等的查询,如SIZE(Array<T>).
    类型转换函数:这些方法主要用于对数据类型进行转换,如CAST和BINARY.
    日期函数:用于对日期相关进行操作,如YEAR(string date)和MONTH(string date).
    条件函数:用于返回特定条件过滤后的函数,如COALESCE、IF、和CASE WHEN.
    字符串函数:此类函数主要用于字符串相关操作,如UPPER(string A) 和TRIM(string A).
    聚合函数:此类函数主要用于数据聚合,如SUM(),COUNT(*).
    列表生成函数:此类函数主要用于将单行输入转换为多行输出,如EXPLODE(MAP)和JSON_TUPLE(jsonString, k1, k2, ...).
    自定义函数:此类由Java生成的函数作为Hive的扩展函数对Hive功能进行扩展.


可以在Hive CLI使用以下语句进行Hive内建函数查询:
SHOW FUNCTIONS;        //列出Hive所有函数
DESCRIBE FUNCTION <function_name>;        //函数详细描述
DESCRIBE FUNCTION EXTENDED <function_name>;        //更多详细信息

详细样例:

1. 复杂数据类型函数提示:SIZE函数用于计算MAP、ARRAY或嵌套MAP/ARRAY。如果size未知则返回-1.

例:
hive>
SELECT work_place, skills_socre, depart_title FROM employee;

例:
hive>
SELECT SIZE(work_place) AS array_size, SIZE(skills_score) AS map_size, SIZE(depart_title) AS complex_size, SIZE(depart_title["Product"]) AS nest_size FROM employee;

ARRAY_CONTAINS声明用于使用TRUE或FALSE返回值检验指定列是否包含指定值。SORT_ARRAY声明用于对数组进行升序排序。

例:
hive>
SELECT ARRAY_CONTAINS(work_place, ‘Toronto‘) AS is_Toronto, SORT_ARRAY(work_place) AS sorted_array FROM employee;



2. 日期函数提示:FROM_UNIXTIME(UNIX_TIMESTAMP())声明与Oracle中的SYSDATE函数相同,动态返回Hive服务器的当前时间。

例:
hive>
SELECT FROM_UNIXTIME(UNIX_TIMESTAMP()) AS current_time FROM employee limit 1;

TO_DATE用于将获取的系统时间截取日期

例:
hive>
SELECT TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP())) AS current_date FROM employee limit 1;


3. 多种不同数据类型的CASE:在Hive0.13.0版本之前THEN或者ELSE后面的数据类型必须相同。否则可能会产生异常值,如ELSE后的数据类型必须同THEN。此问题在0.13.0之后得到修复。

例:由于数据类型不同造成异常返回
hive>
SELECT CASE WHEN 1 IS NULL THEN ‘TRUE‘ ELSE 0 END AS case_result FROM employee LIMIT 1;



4. 解析和查找:LATERAL VIEW是用来生成用户自定义表以展开的形式显示map或array的值,如同EXPLODE(),但其会忽略值为NULL的列,如果要显示这些列,可以使用LATERAL VIEW OUTER(Hive0.12.0之后版本)

例:
hive>
INSERT INTO TABLE employee SELECT ‘Steven‘ AS name, array(null) AS work_place, named_struct("sex","Male","age",30) AS sex_age, map("Python", 90) AS skills_score, map("R&D",array(‘Developer‘)) AS depart_title FROM employee LIMIT 1;
SELECT name, work_place, skills_score FROM employee;

例:
hive>
SELECT name, workplace, skills, score FROM employee LATERAL VIEW explode(work_place) wp AS workplace LATERAL VIEW explode(skills_score) ss AS skills, score;

例:
hive>
SELECT name, workplace, skills, score FROM employee LATERAL VIEW OUTER explode(work_place) wp AS workplace LATERAL VIEW explode(skills_score) ss AS skills, score;

REVERSE用于将指定字符串进行反转,SPLIT用于将字符串按指定分隔符进行分隔。

例:
hive>
SELECT reverse(split(reverse(‘/apps/ca/yanh/employee.txt‘),‘/‘)[0]) AS linux_file_name FROM employee LIMIT 1;

REVERSE将输出转换为单独元素,而COLLECT_SET和COLLECT_LIST则是反过来将元素组合成集合进行输出。COLLECT_SET和COLLECT_LIST的不同在COLLECT_SET返回的集合不含重复元素,而COLLECT_LIST则可以包含重复元素。

例:
hive>
SELECT collect_set(work_place[0]) AS flat_wprkplace FROM employee;

例:
hive>
SELECT collect_list(work_place[0]) AS flat_wprkplace FROM employee;
注:Hive0.11.0及以前不支持collect_list



5. 虚拟列:虚拟列是Hive中特殊的列的特殊函数类型。目前为止Hive仅支持2个虚拟列:INPUT_FILE_NAME和BLOCK_OFFSET_INSIDE_FILE。INPUT_FILE_NAME列是mapper的输入文件名,BLOCK_OFFSET_INSIDE_FILE是当前全部文件位置或当前压缩文件的块偏移量。

例:
hive>
SELECT INPUT_FILE_NAME, BLOCK_OFFSET_INSIDE_FILE AS OFFSIDE FROM employee_id;
注:在Hive0.13.0上测试失败,没有该函数。



6. wiki未提到的函数:

例:isnull,用于检验值是否为空
hive>
SELECT work_place, isnull(work_place) is_null, isnotnull(work_place) is_not_null FROM employee;

例:assert_true,如果条件为false时抛出异常
hive>
SELECT assert_true(work_place IS NULL) FROM employee;

例:elt,返回第n个字符串
hive>
SELECT elt(2, ‘New York‘, ‘Beijing‘, ‘Toronto‘) FROM employee LIMIT 1;

例:current_database,返回当前数据库名
hive>
SELECT current_database();
注:Hive0.11.0及以前没有此函数




四、 数据转换
在Hive0.13.0以前不支持行级的数据转换。因此,数据行的更新、插入、删除都不能实现。因此数据重写只能发生在表或者分区,这使得Hive很难处理并发读写和数据清洗的情况。但是从0.13.0开始,Hive提供了原子性、一致性、隔离性和持久性(ACID)的行级数据处理功能。如今所有的转换操作支持ORC(优化排柱状,从Hive0.11.0开始支持)文件和桶列表中的数据。

以下配置参数需要适当的配置以开启Hive的转换功能:
SET hive.support.concurrency = true;
SET hive.enforce.bucketing = true;
SET hive.exec.dynamic.partition.mode = nonstrict;
SET hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
SET hive.compactor.initiator.on = true;
SET hive.compactor.worker.threads = 1;

SHOW TRANSACTIONS可以对当前已开启的转换操作进行查询:
hive>
SHOW TRANSACTIONS;

从Hive0.14.0开始,行级插入数值、更新和删除可以使用以下语法规则进行实现:
INSERT INTO TABLE tablename [PARTITION (partcol1[=val1], partcol2[=val2] ...)] VALUES values_row [, values_row …];
UPDATE tablename SET column = value [, column = value…] [WHERE expression];
DELETE FROM tablename [WHERE expression];




结语
以上便是全部Hive的关于数据的具体操作,相信到此为止应该能对Hive的常规数据操作进行较为得心应手的使用了吧。以上有截图的用例均由本人亲测可行,测试环境为Hive0.11.0,部分Hive0.13.0的特性是在0.13.0下进行测试,在截图下均有说明。

9.数据的操作