首页 > 代码库 > kettle 4.4源码分析之Transformation

kettle 4.4源码分析之Transformation

1.1. 相关类和接口

1.1.1. JobEntryTrans

实现了JobEntryInterfaceexecute()方法,被job执行。由JobEntryTrans实例化Trans,并执行。

1.1.2. TransGraph

当点击trans面板的run时,由TransGraph实例化Trans,并执行。

Trans主要成员有:

private TransMeta transMeta;

private Repository repository;

   private Job parentJob;

private Trans parentTrans;

   private List<RowSetrowsets;

private List<StepMetaDataCombi> steps

其中最重要的是rowsetssteps。rowsets保存了所有hop对应的行元数据和数据信息。List<StepMetaDataCombi> steps封装了一个step的主要内容。

1.1.3. TransMeta

描述了整个Trans的元数据信息。 主要的属性成员有:

    private List<StepMeta>           steps;

private List<TransHopMeta>       hops;

private String              name;

private Result            previousResult;上一个jobentry的执行结果。

    private List<RowMetaAndData> resultRows;这次trans执行后的数据结果。

    private List<ResultFile>     resultFiles;            

resultRows成员将作为result比部分返回多行的元数据和数据(如果有的话)需要返回数据结果时。把resultRows加入Result结果的rows列表,并返回。

1.1.4. StepMetaDataCombi

提取了一个step所需的主要信息。

public class StepMetaDataCombi

{

    public StepMeta stepMeta;

    public String stepname;

    public int    copy;

    public StepInterface     step;

    public StepMetaInterface meta;

    public StepDataInterface data;

}

1.1.5. TransHopMeta

描述hop信息。

1.1.6. StepMeta

描述step的公有基本信息(stepidstepname),对于每一个具体的step,由成员变量StepMetaInterface step来描述。

1.1.7. StepInterface

主要成员函数:

processRow()对一行的数据处理。

putRow()把处理后的数据放入下一个stepinputrowsets中。

1.1.8. StepBase

实现了StepInterface是各step具体实现类的基类。完成了公用的处理函数,如putRow(),但是对于更具体的processRow()StepBase的子类中。StepBase的主要成员有

public ArrayList<RowSet>  inputRowSets,outputRowSets

StepBase的子类每次从inputRowSets中取出一行数据,向outputRowSets中写入一行数据。

1.1.9. StepDataInterface

step相关的数据信息。比如行的元数据信息。StepMetaInterface的实现类是与具体step相关的元数据信息,与StepMeta配合使用,共同描述具体step的元数据信息。

1.1.10. RowSet

 

RowSet类中包含源step,目标step和由源向目标发送的一个rowMeta和一组data。其中data数据是以行为单位的队列(queArray)。一个RowSet作为此源stepoutputrowsets的一部分。同时作为目标stepinputRowsets一部分。源Step每次向队列中写一行数据,目标step每次从队列中读取一行数据。

1.1.11. RowMetaAndData

public class RowMetaAndData implements Cloneable{   

private RowMetaInterface rowMeta;//行的元数据,描述了每行的数据名字,数据类型。

private Object[]         data;//数据

}

1.2. 执行过程概述

Trans的执行机制是搭建一个结构,使得每一个step能够从自己的inputRowsets读,处理一行,将结果输出到自己的outputRowsets中。

 技术分享

 

注意:一个rowset对象既属于前一个step成员outputRowsets的一部分,也属于后一个对象的inputRowsets的一部分。所有的rowset信息都在Trans对象中以List形式维护。

1.3. Trans执行过程时序图

由于trans可以有TransGraph实例化,也可以由JobEntryTrans实例化。但基本过程是一样的,先实例化TransMeta,再实例化Trans,最终调用transstart方法。

TransGraph实例化如下图所示:

 技术分享

JobEntryTrans实例化,如下图所示:

 技术分享

1.4. Trans代码解释

1.4.1. JobEntryTransexecute( )

首先获取元数据,然后以此作为参数实例化trans

TransMeta transMeta = getTransMeta(rep);

……

Trans trans = new Trans(transMeta);

……

trans.execute(args);

1.4.2. Transexecute( )

具体执行前需要进行准备工作

public void execute(String[] arguments) throws KettleException{

         prepareExecution(arguments);

         startThreads();

     }

1.4.3. Trans的prepareExecution()

搭建以下结构结构。

(1)、对每一个step根据hop信息进行找到下一个step或多个step

(2)、对于每一个this stepnex tstep生成一个RowSet对象,作为缓存供this step写,同时供next step读取数据。

(3)、把此RowSet对象加入到TransList<RowSet>成员中保存。

List<StepMeta> hopsteps=transMeta.getTransHopSteps(false);

得到step列表

……

对每一个step进行如下设置

for (int i=0;i<hopsteps.size();i++)

{

StepMeta thisStep=hopsteps.get(i);

……

//找到下一个step的列表

List<StepMeta> nextSteps = transMeta.findNextSteps(thisStep);

int nrTargets = nextSteps.size();

for (int n=0;n<nrTargets;n++)

{

StepMeta nextStep = nextSteps.get(n);

…… 对于每一个hop信息生成RowSet,并设置RowSet,把

 

 int thisCopies = thisStep.getCopies(); 处理源step的次数


        int nextCopies = nextStep.getCopies(); 处理目标step的次数


                   for (int c=0;c<nrCopies;c++)     nrCopies   根据 上面的thiscopies和nextcopies得到
    {
    RowSet rowSet;    //定义rowset  用来存放元数据   对于每一个hop信息生成RowSet,并设置RowSet

    switch(transMeta.getTransformationType()) {
    case Normal:
     // This is a temporary patch until the batching rowset has proven to be working in all situations.
     // Currently there are stalling problems when dealing with small amounts of rows.
     //
     Boolean batchingRowSet = ValueMeta.convertStringToBoolean(System.getProperty(Const.KETTLE_BATCHING_ROWSET));
     if (batchingRowSet!=null && batchingRowSet.booleanValue()) {
       rowSet = new BlockingBatchingRowSet(transMeta.getSizeRowset());       
     } else {
       rowSet = new BlockingRowSet(transMeta.getSizeRowset());
     }
     break;
     
    case SerialSingleThreaded: 
     rowSet = new SingleRowRowSet(); 
     break;
     
              case SingleThreaded: 
                rowSet = new QueueRowSet(); 
                break;
       .................         

rowsets.add(rowSet);                  最后的得到rowset


            } 

……

(4)、根据TransMetastep信息生成相应的StepMetaDataCombi(即steps)信息,加到steps列表中。

StepMetaDataCombi combi = new StepMetaDataCombi();

combi.stepname = stepMeta.getName();

combi.copy     = c;

combi.stepMeta = stepMeta;

combi.meta = stepMeta.getStepMetaInterface();

StepDataInterface data = combi.meta.getStepData();

combi.data = data;

……

StepInterface step=combi.meta.getStep(stepMeta, data, c, transMetathis);

在step初始化时,会把Trans中的List<RowSet>的相应的rowset加入到stepinputRowSets,和outputRowSets中。

combi.step = step;

steps.add(combi);

1.4.4. TransstartThreads( )

打开了所有的step线程,核心代码如下:

     for (int i=0;i<steps.size();i++){

      steps.get(i).step.start(); 

     }

1.5. Step执行

实现StepInterface的不同的step各个功能个不一样,但是它们之间也有一定的规律性。下图只列举了两个step,(TextInput)文本输入和Uniquerow(去重)

 技术分享

1.5.1. 启动

每一个具体的step启动线程时,自动调用run函数,它们统一调用基类的静态方法

public void run(){

     BaseStep.runStepThread(thismetadata);

}

1.5.2. 处理

基类BaseStep采取了统一的处理方式,调用子类processRow以行为单位处理,核心代码如下。

while (stepInterface.processRow(meta, data) && !stepInterface.isStopped());

processRow( )通用过程是:调用基类BaseStep 的getRow( )得到数据,对一行数据处理,处理之后调用基类putRow( )方法数据保存至outputRowSets(即next stepinputRowSets

1.5.3. 元数据与数据关系。

  Trans中的ETL过程(每个step)以行为单位处理,其中行的元数据信息RowMeta和数据信息统一保存在RowSet对象中。

RowSetRowMeta的成员的调试结果如下。可见rowMeta储存了每列数据的名称和类型。第一列列名flag,数据是长度为1String;第二列列名id

 技术分享

 

RowSet的数据信息在queArray队列中,调试结果如下:可以看出第一个数据元素是一个Object包含了3列,数据内容为(N1a…)

 技术分享

kettle 4.4源码分析之Transformation