首页 > 代码库 > Oracle的pipelined函数实现高性能大数据处理

Oracle的pipelined函数实现高性能大数据处理

在plsql开发中,会涉及到一些大数据量表的数据处理,如将某记录数超亿的表的记录经过处理转换插入到另外一张或几张表。常规的操作方法固然可以实现,但时间、磁盘IO、redo日志等等都非常大。Oracle 提供了一种高级函数,可以将这种数据处理的性能提升到极限。这种函数称为管道函数。在实际项目中,管道函数会和表函数、数据流函数(即表函数和CURSOR结合)、数据集合、并行度一起使用,达到大数据处理的性能顶峰。下面是一个例子,将表t_ss_normal的记录插入到表t_target中,插入过程中有部分转换操作。我分成四个方法来实现这个数据处理操作。
第一个方法,也是最常规的方法,代码如下:

create table T_SS_NORMAL    
(    
  owner          VARCHAR2(30),    
  object_name    VARCHAR2(128),    
  subobject_name VARCHAR2(30),    
  object_id      NUMBER,    
  data_object_id NUMBER,    
  object_type    VARCHAR2(19),    
  created        DATE,    
  last_ddl_time  DATE,    
  timestamp      VARCHAR2(19),    
  status         VARCHAR2(7),    
  temporary      VARCHAR2(1),    
  generated      VARCHAR2(1),    
  secondary      VARCHAR2(1)    
);    
create table T_TARGET    
(    
  owner       VARCHAR2(30),    
  object_name VARCHAR2(128),    
  comm        VARCHAR2(10)    
);  

这是源表和目标表的表结构。现在源表有200W条,其数据来自dba_objects视图。

create or replace package pkg_test is   
  procedure load_target_normal;    
end pkg_test;    
   
create or replace package body pkg_test is   
  procedure load_target_normal is   
  begin      
    insert into t_target (owner, object_name, comm)    
      select owner, object_name, xxx from t_ss_normal;      
    commit;      
  end;    
begin   
  null;    
end pkg_test;

一个insert into select语句搞定这个数据处理,简单。

create type obj_target as object(    
owner VARCHAR2(30), object_name VARCHAR2(128), comm varchar2(10)    
);    
/    
create or replace type typ_array_target as table of obj_target;    
/    
   
create or replace package pkg_test is   
   function pipe_target(p_source_data in sys_refcursor) return typ_array_target    
  pipelined;    
   
  procedure load_target;    
end pkg_test;  

首先创建两个自定义的类型。
obj_target的定义和t_target的表结构一致,用于存储每一条目标表记录。
typ_array_target用于管道函数的返回值。
接着定义一个管道函数。 普通函数的结尾加一个pipelined关键字,就是管道函数。这个函数的返回参数类型为集合,这是为了使其能作为表函数使用。
表函数就是在from子句中以table(v_resultset)调用的,v_resultset就是一个集合类型的参数。
最后定义一个调用存储过程。 在包体中定义该管道函数和调用存储过程。管道函数pipe_target的传入参数一个sys_refcursor类型。这是一个游标,可以理解为使用select * from table才能得到的结果集。 你也可以不用这个传入的游标,取而代之,在函数中定义一个游标,也一样使用。

function pipe_target(p_source_data in sys_refcursor) return typ_array_target    
    pipelined is   
    r_target_data obj_target := obj_target(null, null, null);    
    r_source_data t_ss%rowtype;    
   
 begin   
      
    loop    
      fetch p_source_data    
        into r_source_data;    
      exit when p_source_data%notfound;        
          
      r_target_data.owner       := r_source_data.owner;    
      r_target_data.object_name := r_source_data.object_name;    
      r_target_data.comm        := xxx;        
      pipe row(r_target_data);    
        
    end loop;    
      
    close p_source_data;    
    return;    
      
  end;    
   
  procedure load_target is   
  begin      
    insert into t_target    
      (owner, object_name, comm)    
      select owner, object_name, comm    
        from table(pipe_target(cursor(select * from t_ss_normal)));      
    commit;      
  end;    

关键字 pipe row 的作用是将obj_target插入到typ_array_target类型的数组中,管道函数自动返回这些数据。
因为源表的数据量会非常大,所以在fetch取值时会使用bulk collect ,实现批量取值。这样做可以减少plsql引擎和sql引擎的控制转换次数。
这种转换称为上下文切换。

function pipe_target_array(p_source_data in sys_refcursor,    
                           p_limit_size  in pls_integer default c_default_limit)    
  return typ_array_target pipelined is      
  r_target_data obj_target := obj_target(null, null, null);     
  type typ_source_data is table of t_ss%rowtype index by pls_integer;    
  aa_source_data typ_source_data;    
begin   
  loop    
    fetch p_source_data bulk collect into aa_source_data;    
    exit when aa_source_data.count = 0;    
    for i in 1 .. aa_source_data.count loop    
      r_target_data.owner       := aa_source_data(i).owner;    
      r_target_data.object_name := aa_source_data(i).object_name;    
      r_target_data.comm        := xxx;    
      pipe row(r_target_data);    
    end loop;     
  end loop;    
   
  close p_source_data;    
  return;    
end;    
   
procedure load_target_array is   
begin   
  insert into t_target    
    (owner, object_name, comm)    
    select owner, object_name, comm    
    from   table(pipe_target_array(cursor (select * from t_ss_normal),100));      
  commit;      
end;  

还可以使用并行度,使得管道函数可以多进程同时执行。并行度还有一个好处,就是将数据插入方式从常规路径转换为直接路径。直接路径可以大量减少redo日志的生成量。

function pipe_target_parallel(p_source_data in sys_refcursor,    
                              p_limit_size  in pls_integer default c_default_limit)    
  return typ_array_target pipelined parallel_enable(partition p_source_data by any) is   
   
  r_target_data obj_target := obj_target(null, null, null);
  type typ_source_data is table of t_ss%rowtype index by pls_integer;      
  aa_source_data typ_source_data;    
   
begin      
  loop    
    fetch p_source_data bulk collect into aa_source_data;    
    exit when aa_source_data.count = 0;        
    for i in 1 .. aa_source_data.count loop          
      r_target_data.owner       := aa_source_data(i).owner;    
      r_target_data.object_name := aa_source_data(i).object_name;    
      r_target_data.comm        := xxx;          
      pipe row(r_target_data);          
    end loop;        
  end loop;      
  close p_source_data;    
  return;    
end;    
   
procedure load_target_parallel is   
begin   
  execute immediate alter session enable parallel dml;      
  insert /*+parallel(t,4)*/ into t_target t    
    (owner, object_name, comm)    
    select owner, object_name, comm    
    from table(pipe_target_array(cursor (select /*+parallel(s,4)*/ *    
                                 from t_ss_normal s), 100));      
  commit;    
end;  

在测试过程中,我测试200W记录的操作,时间从24秒降到到8秒,重做日志也降低更多。
在此数据处理操作中,涉及到集合(collection)、表函数、管道函数、流函数、bulk collect、游标等知识点。