首页 > 代码库 > 基于本博客版本中的循环缓冲的测试(Linux环境)

基于本博客版本中的循环缓冲的测试(Linux环境)

 

#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include "ringbuffer.h"

static int b_flag = 0;

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
#define TX_LOCK(lock)   pthread_mutex_lock(&lock)
#define TX_UNLOCK(lock) pthread_mutex_unlock(&lock)

void* ReadBufferThread(void* param)
{
    RingBuffer * pstRing = (RingBuffer *)param;
    FILE* fpOut = NULL;
    size_t ret = 0;
    uint8 szBuf[128*1024] = {0};
    int nLen = 0;

    fpOut = fopen("file.bak", "w");
    if (fpOut == NULL)
    {
        return NULL;
    }
    while(b_flag)
    {
        TX_LOCK(mutex);
        nLen = RingBuffer_read(pstRing, szBuf, sizeof(szBuf));
        TX_UNLOCK(mutex);
        if (nLen)
        {
            ret = fwrite(szBuf, sizeof(char), nLen, fpOut);
            if (ret != nLen)
            {
                printf("fwrite failure!, ret = %d, nLen = %d\n", ret, nLen);
            }
        }
        
        usleep(100);        
    }
    fclose(fpOut);
    return NULL;
}
int main(int argc, char**argv)
{
    FILE* fpIn = NULL;
    size_t ret = 0;
    uint8 szBuf[64*1024] = {0};    
    int nWriteLen = 0;
    int nOffset = 0;
    int nReReadFlag = 0;
    pthread_t threadID = 0;
    RingBuffer * pstRing = NULL;

    if (argc < 2)
    {
        printf("usage: %s file_to_read\n", argv[0]);
        return -1;
    }

    pstRing = RingBuffer_create(2*1024*1024);

    if (pstRing == NULL)
    {
        printf("create ring buffer failure!\n");
        return -1;
    }
    
    b_flag = 1;
    pthread_create(&threadID, NULL, ReadBufferThread, pstRing);

    fpIn = fopen(argv[1], "r");
    if (fpIn == NULL)
    {
        b_flag = 0;
        pthread_cancel(threadID);
        pthread_join(threadID,NULL);
        return -1;
    }
    while(1)
    {
        if (nReReadFlag == 0)
        {
            ret = fread (szBuf, sizeof(char), sizeof(szBuf), fpIn) ;
            if (ret != sizeof(szBuf))
            {
                printf("reach to the end of the file ret = %d, eof = %d!\n",ret,feof(fpIn));
                TX_LOCK(mutex);
                nWriteLen = RingBuffer_write(pstRing, szBuf, ret);
                TX_UNLOCK(mutex);
                if (nWriteLen != ret)
                {
                    ret = ret - nWriteLen;
                    nOffset = nWriteLen;
                    nReReadFlag = 1;
                    printf("here still need to write %d byte to buffer!\n", ret);
                }
                else
                {
                    ret = 0;
                    nWriteLen = 0;
                    nOffset = 0;
                    nReReadFlag = 0;
                }
                break;
            }
        }        
        TX_LOCK(mutex);
        nWriteLen = RingBuffer_write(pstRing, &szBuf[nOffset], ret);
        TX_UNLOCK(mutex);
        if (nWriteLen != ret)
        {
            printf("%d, still need to write %d, write len = %d, offset = %d\n", __LINE__, ret, nWriteLen, nOffset);
            ret -= nWriteLen;
            nOffset += nWriteLen;
            nReReadFlag = 1;
        }
        else
        {
            printf("ret = %d, nWriteLen = %d\n",ret, nWriteLen);
            nWriteLen = 0;
            nOffset = 0;
            ret = 0;
            nReReadFlag = 0;
        }
        
        usleep(200);
    }
    
    fclose(fpIn);
    
    printf("already finished read the file!\n");

    while(ret)
    {
        printf("here again write %d byte...\n", ret);
        TX_LOCK(mutex);
        nWriteLen = RingBuffer_write(pstRing, &szBuf[nOffset], ret);
        TX_UNLOCK(mutex);
        if (nWriteLen != ret)
        {
            ret -= nWriteLen;
            nOffset += nWriteLen;
        }
        else
        {
            ret = 0;
            nWriteLen = 0;
            nOffset = 0;
        }
        
        usleep(300);
    }

    while(1)
    {
        TX_LOCK(mutex);
        ret = RingBuffer_empty(pstRing);
        TX_UNLOCK(mutex);
        if (ret)
        {
            printf("the buffer is empty...\n");
            break;
        }
        usleep(100);
    }
        
    RingBuffer_destroy(pstRing);

    b_flag = 0;
    pthread_cancel(threadID);
      pthread_join(threadID,NULL);

    return 0;
}