首页 > 代码库 > 【笔记】【Informatica】自定义序列生成器组件
【笔记】【Informatica】自定义序列生成器组件
/************************************************************************** * * Copyright (c) 2003 Informatica Corporation. This file contains * material proprietary to Informatica Corporation and may not be copied * or distributed in any form without the written permission of Informatica * Corporation * **************************************************************************//************************************************************************** * Custom Transformation p_GetSeqVal Procedure File * * This file contains code that functions that will be called by the main * server executable. * * For more information on these files, * see $(PM_HOME)/ExtProc/include/Readme.txt **************************************************************************//************************************************************************** Includes **************************************************************************/#include "p_GetSeqVal.h"#include <stdio.h>#include <string.h>#include <stdlib.h>#include <windows.h>#include <stdarg.h>#include <process.h>/************************************************************************** UserDefineMacro **************************************************************************/#define DLLNAME "SeqGenerator.dll"#define PMDTM "pmdtm.exe"#define EXTPROCDIR "ExtProc\\"#define SEQFILE "SeqGenerator.tsv"#define LOCKFILE "SeqGenerator.lock"#define SEQNAMELEN 100#define MAXSEQNUM 10000typedef unsigned long int seqval_t;#define MSGCHARS 1000#define MAX_LINE_SIZE 1000#define MSGMARK ">>>>>>>>>>>>>>>"#define PROCNAME "SeqGenerator"/************************************************************************** SharedVariables **************************************************************************/struct seq_t{ char chSeqName[SEQNAMELEN+1]; seqval_t nSeqVal;};struct seqlist_t{ struct seq_t arrSequences[MAXSEQNUM]; int nSeqNums;};#pragma data_seg("SeqGeneratorShared")static int nInitFlag=0;static int nRefCount=0;static struct seqlist_t strSeqList={0};#pragma data_seg()#pragma comment(linker,"/section:SeqGeneratorShared,rws") /************************************************************************** Debug switch **************************************************************************///#define DEBUG_MODE#ifdef DEBUG_MODE //LogMessage("%s DebugLog:",MSGMARK);#endif/************************************************************************** Global Variables **************************************************************************/char chLogMsg[MSGCHARS],chMutexName[MSGCHARS];TCHAR szPath[MAX_PATH],dllDir[MAX_PATH],SeqFilePath[MAX_PATH],LockFilePath[MAX_PATH];FILE *fpSeqFile,*fpLockFile;char *rowSeqName;char chSeqName[SEQNAMELEN+1];seqval_t nSeqVal;struct seqcache_t{ char chSeqName[SEQNAMELEN+1]; seqval_t *ptrSeqVal;};struct seqlistcache_t{ struct seqcache_t arrSequences[MAXSEQNUM]; int nSeqNums;};struct seqlistcache_t strSeqListCache={0};//HANDLE hMutex;/************************************************************************** Functions **************************************************************************/int LogMessage(char *fmt,...){ va_list ptrArg; va_start(ptrArg,fmt); vsprintf(chLogMsg,fmt,ptrArg); va_end(ptrArg); INFA_CTLogMessageM( eESL_LOG,chLogMsg); return 0;}int GetFileNumRows(FILE* fp){ int i = 0; char strLine[MAX_LINE_SIZE]; fseek(fp,0,SEEK_SET); while (fgets(strLine, MAX_LINE_SIZE, fp)) i++; fseek(fp,0,SEEK_SET); return i;}char * left(char *dst,char *src, int n){ char *p = src; char *q = dst; int len = strlen(src); if(n>len) n = len; while(n--) *(q++) = *(p++); *(q++)=‘\0‘; return dst;}char * mid(char *dst,char *src, int n,int m) { char *p = src; char *q = dst; int len = strlen(src); if(m>len) m = len-n; if(n<0) n=0; if(n>len) return NULL; p += n; while(m--) *(q++) = *(p++); *(q++)=‘\0‘; return dst;}/*INFA_STATUS mGetMutex(){ while(1){ hMutex=CreateMutex(NULL,FALSE,PROCNAME); if(hMutex&&GetLastError()==ERROR_ALREADY_EXISTS){ if(hMutex!=NULL){ CloseHandle(hMutex); } Sleep(1000); continue; } else if(hMutex!=NULL){ break; } else { return INFA_FAILURE; } } return INFA_SUCCESS;}*//************************************************************************** Function: p_GetSeqVal_procInit Description: Initialization for the procedure. Returns INFA_SUCCESS if procedure initialization succeeds, else return INFA_FAILURE. Input: procedure - the handle for the procedure Output: None Remarks: This function will get called once for the session at initialization time. It will be called after the moduleInit function. **************************************************************************/INFA_STATUS p_GetSeqVal_procInit( INFA_CT_PROCEDURE_HANDLE procedure){ //Sleep(10000); INFA_CTChangeStringMode( procedure, eASM_MBCS ); if( !GetModuleFileName( NULL, szPath, MAX_PATH ) ){ LogMessage("GetModuleFileName failed (%d)\n", GetLastError()); return INFA_FAILURE; } else { LogMessage("ModuleFileName is : %s\n", szPath); mid(dllDir,szPath,0,strlen(szPath)-strlen(PMDTM)); strcat(dllDir,EXTPROCDIR); LogMessage("ModuleDirectory is : %s\n", dllDir); strcpy(SeqFilePath,dllDir); strcat(SeqFilePath,SEQFILE); LogMessage("Sequence File is : %s\n", SeqFilePath); strcpy(LockFilePath,dllDir); strcat(LockFilePath,LOCKFILE); LogMessage("Lock File is : %s\n", LockFilePath); } //CreateMutex(NULL,FALSE,PROCNAME); //mGetMutex(); //WaitForSingleObject(hMutex, INFINITE); fpLockFile=fopen(LockFilePath,"w"); while(LockFile(fpLockFile,1,1,1,1)!=0){ Sleep(2000); } if(1==++nRefCount){ int nFileNumRows,i; LogMessage("%s Loading Sequence File\n",MSGMARK); fpSeqFile=fopen(SeqFilePath,"a+"); nFileNumRows=GetFileNumRows(fpSeqFile); LogMessage("%s Sequence Objects Nums: %d\n",MSGMARK,nFileNumRows); strSeqList.nSeqNums=0; for(i=0;i<nFileNumRows;i++){ fscanf(fpSeqFile,"%s\t%lu\n",chSeqName,&nSeqVal); strSeqList.nSeqNums++; strcpy(strSeqList.arrSequences[i].chSeqName,chSeqName); strSeqList.arrSequences[i].nSeqVal=nSeqVal; } if(EOF==fclose(fpSeqFile)){ LogMessage("Close Sequence File Failed!\n" ); return INFA_FAILURE; } nInitFlag=1; //CloseHandle(hMutex); LogMessage("%s Finish loading Sequence File",MSGMARK); } else { LogMessage("%s Wait for loading Sequence File",MSGMARK); while(1!=nInitFlag){ } LogMessage("%s Finish loading Sequence File",MSGMARK); } UnlockFile(fpLockFile,1,1,1,1); if(EOF==fclose(fpLockFile)){ LogMessage("Close Lock File Failed!\n" ); return INFA_FAILURE; } //ReleaseMutex(hMutex); return INFA_SUCCESS;}/************************************************************************** Function: p_GetSeqVal_procDeinit Description: Deinitialization for the procedure. Returns INFA_SUCCESS if procedure deinitialization succeeds, else return INFA_FAILURE. Input: procedure - the handle for the procedure Output: None Remarks: This function will get called once for the session at deinitialization time. It will be called before the moduleDeinit function. **************************************************************************/INFA_STATUS p_GetSeqVal_procDeinit( INFA_CT_PROCEDURE_HANDLE procedure, INFA_STATUS sessionStatus ){ //mGetMutex(); //WaitForSingleObject(hMutex, INFINITE); fpLockFile=fopen(LockFilePath,"w"); while(LockFile(fpLockFile,1,1,1,1)!=0){ Sleep(2000); } if(0==--nRefCount){ int i; LogMessage("%s Writing Sequence File",MSGMARK); fpSeqFile=fopen(SeqFilePath,"w"); for(i=0;i<strSeqList.nSeqNums;i++){ fprintf(fpSeqFile,"%s\t%lu\n",strSeqList.arrSequences[i].chSeqName,strSeqList.arrSequences[i].nSeqVal); } if(EOF==fclose(fpSeqFile)){ LogMessage("Close Sequence File Failed!\n" ); return INFA_FAILURE; } LogMessage("%s Finish Writing Sequence File",MSGMARK); } UnlockFile(fpLockFile,1,1,1,1); if(EOF==fclose(fpLockFile)){ LogMessage("Close Lock File Failed!\n" ); return INFA_FAILURE; } //ReleaseMutex(hMutex); //CloseHandle(hMutex); return INFA_SUCCESS;}/************************************************************************** Function: p_GetSeqVal_partitionInit Description: Initialization for the partition. Returns INFA_SUCCESS if partition deinitialization succeeds, else return INFA_FAILURE. Input: partition - the handle for the partition Output: None Remarks: This function will get called once for each partition for each transformation in the session. **************************************************************************/INFA_STATUS p_GetSeqVal_partitionInit( INFA_CT_PARTITION_HANDLE partition ){ /*TODO: fill in code here*/ return INFA_SUCCESS;}/************************************************************************** Function: p_GetSeqVal_partitionDeinit Description: Deinitialization for the partition. Returns INFA_SUCCESS if partition deinitialization succeeds, else return INFA_FAILURE. Input: partition - the handle for the partition Output: None Remarks: This function will get called once for each partition for each transformation in the session. **************************************************************************/INFA_STATUS p_GetSeqVal_partitionDeinit( INFA_CT_PARTITION_HANDLE partition ){ /*TODO: fill in code here*/ return INFA_SUCCESS;}/************************************************************************** Function: p_GetSeqVal_inputRowNotification Description: Notification that a row needs to be processed for an input group in a transformation for the given partition. Returns INFA_ROWSUCCESS if the input row was processed successfully, INFA_ROWERROR if the input row was not processed successfully and INFA_FATALERROR if the input row causes the session to fail. Input: partition - the handle for the partition for the given row group - the handle for the input group for the given row Output: None Remarks: This function is probably where the meat of your code will go, as it is called for every row that gets sent into your transformation. **************************************************************************/INFA_ROWSTATUS p_GetSeqVal_inputRowNotification( INFA_CT_PARTITION_HANDLE partition, INFA_CT_INPUTGROUP_HANDLE inputGroup ){ const INFA_CT_OUTPUTGROUP_HANDLE* outputGroups = NULL; const INFA_CT_INPUTPORT_HANDLE* inputGroupPorts = NULL; const INFA_CT_OUTPUTPORT_HANDLE* outputGroupPorts = NULL; size_t nNumInputPorts = 0, nNumOutputGroups = 0, nNumPortsInOutputGroup = 0; int i,j; outputGroups = INFA_CTGetChildrenHandles(partition,&nNumOutputGroups,OUTPUTGROUPTYPE); outputGroupPorts = INFA_CTGetChildrenHandles(outputGroups[0],&nNumPortsInOutputGroup,OUTPUTPORTTYPE); inputGroupPorts = INFA_CTGetChildrenHandles(inputGroup,&nNumInputPorts,INPUTPORTTYPE); rowSeqName=(char*)INFA_CTGetDataVoid(inputGroupPorts[0]); for(j=0;j<strSeqListCache.nSeqNums;j++){ if(strcmp(rowSeqName,strSeqListCache.arrSequences[j].chSeqName)==0) break; } if(j==strSeqListCache.nSeqNums){ for(i=0;i<strSeqList.nSeqNums;i++){ if(strcmp(rowSeqName,strSeqList.arrSequences[i].chSeqName)==0) break; } if(i==strSeqList.nSeqNums){ strcpy(strSeqList.arrSequences[i].chSeqName,rowSeqName); strSeqList.nSeqNums++; strcpy(strSeqListCache.arrSequences[j].chSeqName,rowSeqName); strSeqListCache.arrSequences[j].ptrSeqVal=&(strSeqList.arrSequences[i].nSeqVal); strSeqListCache.nSeqNums++; } nSeqVal=++(strSeqList.arrSequences[i].nSeqVal); strcpy(strSeqListCache.arrSequences[j].chSeqName,rowSeqName); strSeqListCache.arrSequences[j].ptrSeqVal=&(strSeqList.arrSequences[i].nSeqVal); strSeqListCache.nSeqNums++; } else { nSeqVal=++*(strSeqListCache.arrSequences[j].ptrSeqVal); } INFA_CTSetData(outputGroupPorts[0], &nSeqVal); INFA_CTSetIndicator(outputGroupPorts[0],INFA_DATA_VALID); return INFA_CTOutputNotification(outputGroups[0]);}/************************************************************************** Function: p_GetSeqVal_eofNotification Description: Notification that the last row for an input group has already been seen. Return INFA_FAILURE if the session should fail as a result of seeing this notification, INFA_SUCCESS otherwise. Input: partition - the handle for the partition for the notification group - the handle for the input group for the notification Output: None **************************************************************************/INFA_STATUS p_GetSeqVal_eofNotification( INFA_CT_PARTITION_HANDLE partition, INFA_CT_INPUTGROUP_HANDLE group){ /*TODO: fill in code here*/ return INFA_SUCCESS;}/************************************************************************** Function: p_GetSeqVal_dataBdryNotification Description: Notification that a transaction has ended. The data boundary type can either be commit or rollback. Return INFA_FAILURE if the session should fail as a result of seeing this notification, INFA_SUCCESS otherwise. Input: partition - the handle for the partition for the notification transactionType - commit or rollback Output: None **************************************************************************/INFA_STATUS p_GetSeqVal_dataBdryNotification ( INFA_CT_PARTITION_HANDLE partition, INFA_CT_DATABDRY_TYPE transactionType){ /*TODO: fill in code here*/ return INFA_SUCCESS;}
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。