首页 > 代码库 > 【笔记】【Informatica】自定义序列生成器组件

【笔记】【Informatica】自定义序列生成器组件

/************************************************************************** * * Copyright (c) 2003 Informatica Corporation.  This file contains * material proprietary to Informatica Corporation and may not be copied * or distributed in any form without the written permission of Informatica * Corporation * **************************************************************************//************************************************************************** * Custom Transformation p_GetSeqVal Procedure File * * This file contains code that functions that will be called by the main * server executable. * * For more information on these files, * see $(PM_HOME)/ExtProc/include/Readme.txt **************************************************************************//**************************************************************************                               Includes **************************************************************************/#include "p_GetSeqVal.h"#include <stdio.h>#include <string.h>#include <stdlib.h>#include <windows.h>#include <stdarg.h>#include <process.h>/**************************************************************************                               UserDefineMacro **************************************************************************/#define DLLNAME "SeqGenerator.dll"#define PMDTM "pmdtm.exe"#define EXTPROCDIR "ExtProc\\"#define SEQFILE "SeqGenerator.tsv"#define LOCKFILE "SeqGenerator.lock"#define SEQNAMELEN 100#define MAXSEQNUM 10000typedef unsigned long int seqval_t;#define MSGCHARS 1000#define MAX_LINE_SIZE 1000#define MSGMARK ">>>>>>>>>>>>>>>"#define PROCNAME "SeqGenerator"/**************************************************************************                               SharedVariables **************************************************************************/struct seq_t{	char chSeqName[SEQNAMELEN+1];	seqval_t nSeqVal;};struct seqlist_t{	struct seq_t arrSequences[MAXSEQNUM];	int nSeqNums;};#pragma data_seg("SeqGeneratorShared")static int nInitFlag=0;static int nRefCount=0;static struct seqlist_t strSeqList={0};#pragma data_seg()#pragma comment(linker,"/section:SeqGeneratorShared,rws") /**************************************************************************                               Debug switch **************************************************************************///#define DEBUG_MODE#ifdef DEBUG_MODE    //LogMessage("%s DebugLog:",MSGMARK);#endif/**************************************************************************                               Global Variables **************************************************************************/char chLogMsg[MSGCHARS],chMutexName[MSGCHARS];TCHAR szPath[MAX_PATH],dllDir[MAX_PATH],SeqFilePath[MAX_PATH],LockFilePath[MAX_PATH];FILE *fpSeqFile,*fpLockFile;char *rowSeqName;char chSeqName[SEQNAMELEN+1];seqval_t nSeqVal;struct seqcache_t{	char chSeqName[SEQNAMELEN+1];	seqval_t *ptrSeqVal;};struct seqlistcache_t{	struct seqcache_t arrSequences[MAXSEQNUM];	int nSeqNums;};struct seqlistcache_t strSeqListCache={0};//HANDLE hMutex;/**************************************************************************                               Functions **************************************************************************/int LogMessage(char *fmt,...){    va_list ptrArg;	va_start(ptrArg,fmt);	vsprintf(chLogMsg,fmt,ptrArg);	va_end(ptrArg);	INFA_CTLogMessageM( eESL_LOG,chLogMsg);	return 0;}int GetFileNumRows(FILE* fp){       int i = 0;    char strLine[MAX_LINE_SIZE];    fseek(fp,0,SEEK_SET);    while (fgets(strLine, MAX_LINE_SIZE, fp))        i++;    fseek(fp,0,SEEK_SET);    return i;}char * left(char *dst,char *src, int n){    char *p = src;    char *q = dst;    int len = strlen(src);    if(n>len) n = len;    while(n--) *(q++) = *(p++);    *(q++)=‘\0‘;     return dst;}char * mid(char *dst,char *src, int n,int m) {    char *p = src;    char *q = dst;    int len = strlen(src);    if(m>len) m = len-n;     if(n<0) n=0;      if(n>len) return NULL;    p += n;    while(m--) *(q++) = *(p++);    *(q++)=‘\0‘;     return dst;}/*INFA_STATUS mGetMutex(){    while(1){	    hMutex=CreateMutex(NULL,FALSE,PROCNAME);		if(hMutex&&GetLastError()==ERROR_ALREADY_EXISTS){		    if(hMutex!=NULL){			    CloseHandle(hMutex);			}			Sleep(1000);			continue;		} 		else if(hMutex!=NULL){		    break;		}		else {			return INFA_FAILURE;		}	}	return INFA_SUCCESS;}*//**************************************************************************   Function: p_GetSeqVal_procInit Description: Initialization for the procedure.  Returns INFA_SUCCESS if  procedure initialization succeeds, else return INFA_FAILURE. Input: procedure - the handle for the procedure Output: None Remarks: This function will get called once for the session at initialization time.  It will be called after the moduleInit function. **************************************************************************/INFA_STATUS p_GetSeqVal_procInit( INFA_CT_PROCEDURE_HANDLE procedure){	//Sleep(10000);    INFA_CTChangeStringMode( procedure, eASM_MBCS );	if( !GetModuleFileName( NULL, szPath, MAX_PATH ) ){        LogMessage("GetModuleFileName failed (%d)\n", GetLastError());        return INFA_FAILURE;	} else {		LogMessage("ModuleFileName is : %s\n", szPath);		mid(dllDir,szPath,0,strlen(szPath)-strlen(PMDTM));		strcat(dllDir,EXTPROCDIR);		LogMessage("ModuleDirectory is : %s\n", dllDir);		strcpy(SeqFilePath,dllDir);		strcat(SeqFilePath,SEQFILE);		LogMessage("Sequence File is : %s\n", SeqFilePath);		strcpy(LockFilePath,dllDir);		strcat(LockFilePath,LOCKFILE);		LogMessage("Lock File is : %s\n", LockFilePath);	}	//CreateMutex(NULL,FALSE,PROCNAME);	//mGetMutex();	//WaitForSingleObject(hMutex, INFINITE);	fpLockFile=fopen(LockFilePath,"w");	while(LockFile(fpLockFile,1,1,1,1)!=0){		Sleep(2000);	}	if(1==++nRefCount){		int nFileNumRows,i;        LogMessage("%s Loading Sequence File\n",MSGMARK);        fpSeqFile=fopen(SeqFilePath,"a+");        nFileNumRows=GetFileNumRows(fpSeqFile);		LogMessage("%s Sequence Objects Nums: %d\n",MSGMARK,nFileNumRows);		strSeqList.nSeqNums=0;		for(i=0;i<nFileNumRows;i++){			fscanf(fpSeqFile,"%s\t%lu\n",chSeqName,&nSeqVal);			strSeqList.nSeqNums++;			strcpy(strSeqList.arrSequences[i].chSeqName,chSeqName);			strSeqList.arrSequences[i].nSeqVal=nSeqVal;		}		if(EOF==fclose(fpSeqFile)){		    LogMessage("Close Sequence File Failed!\n" );		    return INFA_FAILURE;	    }		nInitFlag=1;		//CloseHandle(hMutex);		LogMessage("%s Finish loading Sequence File",MSGMARK);	} else {		LogMessage("%s Wait for loading Sequence File",MSGMARK);		while(1!=nInitFlag){		}		LogMessage("%s Finish loading Sequence File",MSGMARK);	}	UnlockFile(fpLockFile,1,1,1,1);	if(EOF==fclose(fpLockFile)){        LogMessage("Close Lock File Failed!\n" );		return INFA_FAILURE;	}	//ReleaseMutex(hMutex);	    return INFA_SUCCESS;}/**************************************************************************   Function: p_GetSeqVal_procDeinit Description: Deinitialization for the procedure.  Returns INFA_SUCCESS if  procedure deinitialization succeeds, else return INFA_FAILURE. Input: procedure - the handle for the procedure Output: None Remarks: This function will get called once for the session at deinitialization time.  It will be called before the moduleDeinit function. **************************************************************************/INFA_STATUS p_GetSeqVal_procDeinit( INFA_CT_PROCEDURE_HANDLE procedure, INFA_STATUS sessionStatus ){	//mGetMutex();	//WaitForSingleObject(hMutex, INFINITE);	fpLockFile=fopen(LockFilePath,"w");	while(LockFile(fpLockFile,1,1,1,1)!=0){		Sleep(2000);	}	if(0==--nRefCount){		int i;		LogMessage("%s Writing Sequence File",MSGMARK);		fpSeqFile=fopen(SeqFilePath,"w");		for(i=0;i<strSeqList.nSeqNums;i++){			fprintf(fpSeqFile,"%s\t%lu\n",strSeqList.arrSequences[i].chSeqName,strSeqList.arrSequences[i].nSeqVal);		}		if(EOF==fclose(fpSeqFile)){			LogMessage("Close Sequence File Failed!\n" );			return INFA_FAILURE;		}		LogMessage("%s Finish Writing Sequence File",MSGMARK);	}    UnlockFile(fpLockFile,1,1,1,1);	if(EOF==fclose(fpLockFile)){        LogMessage("Close Lock File Failed!\n" );		return INFA_FAILURE;	}	//ReleaseMutex(hMutex);	//CloseHandle(hMutex);    return INFA_SUCCESS;}/**************************************************************************   Function: p_GetSeqVal_partitionInit Description: Initialization for the partition.  Returns INFA_SUCCESS if  partition deinitialization succeeds, else return INFA_FAILURE. Input: partition - the handle for the partition Output: None Remarks: This function will get called once for each partition for each transformation in the session. **************************************************************************/INFA_STATUS p_GetSeqVal_partitionInit( INFA_CT_PARTITION_HANDLE partition ){    /*TODO: fill in code here*/    return INFA_SUCCESS;}/**************************************************************************   Function: p_GetSeqVal_partitionDeinit Description: Deinitialization for the partition.  Returns INFA_SUCCESS if  partition deinitialization succeeds, else return INFA_FAILURE. Input: partition - the handle for the partition Output: None Remarks: This function will get called once for each partition for each transformation in the session. **************************************************************************/INFA_STATUS p_GetSeqVal_partitionDeinit( INFA_CT_PARTITION_HANDLE partition ){    /*TODO: fill in code here*/    return INFA_SUCCESS;}/**************************************************************************   Function: p_GetSeqVal_inputRowNotification Description: Notification that a row needs to be processed for an input group in a transformation for the given partition.  Returns INFA_ROWSUCCESS if the input row was processed successfully, INFA_ROWERROR if the input row was not processed successfully and INFA_FATALERROR if the input row causes the session to fail. Input: partition - the handle for the partition for the given row        group - the handle for the input group for the given row Output: None Remarks: This function is probably where the meat of your code will go, as it is called for every row that gets sent into your transformation.  **************************************************************************/INFA_ROWSTATUS p_GetSeqVal_inputRowNotification( INFA_CT_PARTITION_HANDLE partition,                                    INFA_CT_INPUTGROUP_HANDLE inputGroup ){    const INFA_CT_OUTPUTGROUP_HANDLE* outputGroups = NULL;    const INFA_CT_INPUTPORT_HANDLE* inputGroupPorts = NULL;    const INFA_CT_OUTPUTPORT_HANDLE* outputGroupPorts = NULL;    size_t nNumInputPorts = 0, nNumOutputGroups = 0, nNumPortsInOutputGroup = 0;	int i,j;	outputGroups = INFA_CTGetChildrenHandles(partition,&nNumOutputGroups,OUTPUTGROUPTYPE);	outputGroupPorts = INFA_CTGetChildrenHandles(outputGroups[0],&nNumPortsInOutputGroup,OUTPUTPORTTYPE);	inputGroupPorts = INFA_CTGetChildrenHandles(inputGroup,&nNumInputPorts,INPUTPORTTYPE);    rowSeqName=(char*)INFA_CTGetDataVoid(inputGroupPorts[0]);	for(j=0;j<strSeqListCache.nSeqNums;j++){        if(strcmp(rowSeqName,strSeqListCache.arrSequences[j].chSeqName)==0)			break;	}	if(j==strSeqListCache.nSeqNums){		for(i=0;i<strSeqList.nSeqNums;i++){			if(strcmp(rowSeqName,strSeqList.arrSequences[i].chSeqName)==0)				break;		}		if(i==strSeqList.nSeqNums){			strcpy(strSeqList.arrSequences[i].chSeqName,rowSeqName);			strSeqList.nSeqNums++;			strcpy(strSeqListCache.arrSequences[j].chSeqName,rowSeqName);		    strSeqListCache.arrSequences[j].ptrSeqVal=&(strSeqList.arrSequences[i].nSeqVal);		    strSeqListCache.nSeqNums++;		}		nSeqVal=++(strSeqList.arrSequences[i].nSeqVal);		strcpy(strSeqListCache.arrSequences[j].chSeqName,rowSeqName);		strSeqListCache.arrSequences[j].ptrSeqVal=&(strSeqList.arrSequences[i].nSeqVal);		strSeqListCache.nSeqNums++;	} else {		nSeqVal=++*(strSeqListCache.arrSequences[j].ptrSeqVal);	}	    INFA_CTSetData(outputGroupPorts[0], &nSeqVal);	INFA_CTSetIndicator(outputGroupPorts[0],INFA_DATA_VALID);    return INFA_CTOutputNotification(outputGroups[0]);}/**************************************************************************   Function: p_GetSeqVal_eofNotification Description: Notification that the last row for an input group has already been seen.  Return INFA_FAILURE if the session should fail as a result of seeing this notification, INFA_SUCCESS otherwise. Input: partition - the handle for the partition for the notification        group - the handle for the input group for the notification Output: None **************************************************************************/INFA_STATUS p_GetSeqVal_eofNotification( INFA_CT_PARTITION_HANDLE partition,                            INFA_CT_INPUTGROUP_HANDLE group){    /*TODO: fill in code here*/    return INFA_SUCCESS;}/**************************************************************************   Function: p_GetSeqVal_dataBdryNotification Description: Notification that a transaction has ended.  The data boundary type can either be commit or rollback. Return INFA_FAILURE if the session should fail as a result of seeing this notification, INFA_SUCCESS otherwise. Input: partition - the handle for the partition for the notification        transactionType - commit or rollback  Output: None **************************************************************************/INFA_STATUS p_GetSeqVal_dataBdryNotification (                        INFA_CT_PARTITION_HANDLE partition,                       INFA_CT_DATABDRY_TYPE transactionType){    /*TODO: fill in code here*/    return INFA_SUCCESS;}