首页 > 代码库 > 阻塞与非阻塞那些事

阻塞与非阻塞那些事

日常开发中,经常碰到处理程序阻塞的情况:

1.select函数

select函数用于确定一个或多个套接口的状态,对每一个套接口,调用者可查询它的可读性、可写性及错误状态信息,用fd_set结构来表示一组等待检查的套接口,在调用返回时,这个结构存有满足一定条件的套接口组的子集,并且select()返回满足条件的套接口的数目。有一组宏可用于对fd_set的操作,这些宏与Berkeley Unix软件中的兼容,但内部的表达是完全不同的。

头文件:

#include <winsock.h>

原型:

int PASCAL FAR select(         int nfds,                //整数值,是指集合中所有文件描述符的范围,即所有文件描述符的最大值加1;windows中可省略;        fd_set FAR* readfds,     //(可选)指向一组等待可读性检查的套接口;        fd_set FAR* writefds,    //(可选)指向一组等待可写性检查的套接口;        fd_set FAR* exceptfds, //(可选)指向一组等待错误检查的套接口;        const struct timeval FAR* timeout    //select()最多等待时间,对阻塞操作则为NULL;        );

参数解释:

readfds:标识等待可读性检查的套接口
如果该套接口正处于监听listen()状态,则若有连接请求到达,该套接口便被标识为可读,这样一个accept()调用保证可以无阻塞完成;
对其他套接口而言,可读性意味着有排队数据供读取。对于SOCK_STREAM类型套接口,便是recv()或recvfrom()操作均能无阻塞完成;

writefds:标识等待可写性检查的套接口
如果一个套接口正在connect()连接(非阻塞),可写性意味着连接顺利建立。
如果套接口并未处于connect()调用中,可写性意味着send()和sendto()调用将无阻塞完成。

exceptfds:标识等待带外数据存在性或意味错误条件检查的套接口

timeout:控制select完成的时间 若为空指针,则select将一直阻塞到有一个描述字满足条件; 若为{0,0},则select立即返回; 否则,指向一个timeval结构,其中指定了select调用在返回前等待多长时间;

MARK:

在winsock.h头文件中共定义了四个宏来操作描述字集。FD_SETSIZE变量用于确定一个集合中最多有多少描述字(FD_SETSIZE缺省值为64,可在包含winsock.h前用#define FD_SETSIZE来改变该值)。对于内部表示,fd_set被表示成一个套接口的队列,最后一个有效元素的后续元素为INVAL_SOCKET。
1、FD_CLR(s,*set):从集合set中删除描述字s。
2、FD_ISSET(s,*set):若s为集合中一员,非零;否则为零。
3、FD_SET(s,*set):向集合添加描述字s。
4、FD_ZERO(*set):将set初始化为空集NULL。

示例代码:

//////////////////////////////////////////////////////////////////////////*创建两个socket,循环检测socket有请求到来时,分别进行处理;*/int _tmain(int argc, _TCHAR* argv[])       {    struct sockaddr_in servaddr1, clientaddr1;    struct sockaddr_in servaddr2, clientaddr2;    int listenfd1, connfd1 = 0;    int listenfd2, connfd2 = 0;    int clientlen = 0;    WSADATA wsaData;    int ServerPort1;    int ServerPort2;    HANDLE hThread;    fd_set set;    int maxfd;    if(WSAStartup( MAKEWORD( 2, 2 ), &wsaData ))    {        printf("WSAStartup Error\n");        return 1;    }    servaddr1.sin_family = AF_INET;    servaddr1.sin_port = htons(PORT1);    servaddr1.sin_addr.s_addr = INADDR_ANY;    servaddr2.sin_family = AF_INET;    servaddr2.sin_port = htons(PORT2);    servaddr2.sin_addr.s_addr = INADDR_ANY;    clientlen = sizeof(struct sockaddr);    listenfd1 = socket(AF_INET, SOCK_STREAM, 0);    listenfd2 = socket(AF_INET, SOCK_STREAM, 0);    if((listenfd1<0) || (listenfd2<0))    {        printf("listenfd Error\n");        return 1;    }    if( bind(listenfd1, (struct sockaddr *)&servaddr1, sizeof(servaddr1)) < 0         ||bind(listenfd2, (struct sockaddr *)&servaddr2, sizeof(servaddr2)) < 0)    {        printf("Server Bind Port Error\n");        return 1;    }        if( listen(listenfd1, 5) < 0        ||listen(listenfd2, 5) < 0)     {        printf("listen Error\n");        return 1;    }    maxfd = __max(listenfd1, listenfd2);    for(;;)    {        FD_ZERO(&set);        FD_SET((unsigned int)listenfd1, &set);        FD_SET((unsigned int)listenfd2, &set);        if( select(maxfd+1, &set, NULL, NULL, 0) < 0 )         {            printf("select errorErrorcode :%d ",WSAGetLastError());            return 1;        }        if( FD_ISSET(listenfd1, &set) )//port1有数据到来        {            connfd1 = accept( listenfd1, (struct sockaddr *)&clientaddr1, &clientlen );            if(connfd1 < 0)            {                printf("accept error.errcode:%d\n",WSAGetLastError());                continue;            }            hThread = CreateThread(                 NULL,                                        0,                                   (LPTHREAD_START_ROUTINE)DoProxy1,                      NULL,                             0,                                NULL);        }        if( FD_ISSET(listenfd2, &set) )//port2有数据到来        {            connfd2 = accept( listenfd2, (struct sockaddr *)&clientaddr2, &clientlen );            if(connfd2 < 0)            {                printf("accept error.errcode:%d\n",WSAGetLastError());                continue;            }            hThread = CreateThread(                 NULL,                                    0,                                       (LPTHREAD_START_ROUTINE)DoProxy2,                          &parm,                                          0,                                           NULL);        }        }//end for    return 0;}
View Code

 

2、PeekNamedPipe函数

从命名管道/匿名管道中拷贝数据到一个指定缓冲区,原管道中的数据任保留;

原型:

BOOL WINAPI PeekNamedPipe(    __in       HANDLE hNamedPipe,    //命名\匿名管道 句柄;    __out_opt  LPVOID lpBuffer,        //接收从管道中读取的数据,可以为空;    __in       DWORD nBufferSize,    //指定lpBuffer的大小;    __out_opt  LPDWORD lpBytesRead,    //实际接收的数据大小;    __out_opt  LPDWORD lpTotalBytesAvail,        //管道中所有可读数据的大小;    __out_opt  LPDWORD lpBytesLeftThisMessage    //当前消息中剩余的字节数;    );


示例代码:

 1 ////////////////////////////////////////////////////////////////////////// 2 //获取目标控制台程序的输出,并显示在编辑框中; 3 //实际运行发现目标控制台进程结束后,才会将数据一次性写入管道中; 4 //才能从读取到数据,否则PeekNamedPipe任会返回1,但lpTotalBytesAvail却为0; 5 //可以在控制台程序中每次printf后添加fflush(STDOUT);解决; 6 ///////////////////////////////////////////////////////////////////////// 7  int CEntreVisionDlg::Start(CEntreVisionDlg *pDlg) 8 { 9     SECURITY_ATTRIBUTES sa;10     HANDLE hRead, hWrite;11     sa.nLength = sizeof(SECURITY_ATTRIBUTES);12     sa.lpSecurityDescriptor = NULL;13     sa.bInheritHandle = TRUE;14 15     if(!CreatePipe(&hRead, &hWrite, &sa, 0))16     {17         CloseHandle(hRead);18         CloseHandle(hWrite);19         return 0;20     }21 22     STARTUPINFO si;23     PROCESS_INFORMATION pi;24     si.cb = sizeof(STARTUPINFO);25     GetStartupInfo(&si);26     si.hStdOutput = hWrite;27     si.hStdOutput = hWrite;28     si.wShowWindow = SW_HIDE;29     si.dwFlags = STARTF_USESHOWWINDOW |  STARTF_USESTDHANDLES;30 31     if (!CreateProcess(NULL, "test.exe", 32         NULL, NULL, TRUE, NULL, NULL, NULL, &si, &pi))33     {34         CloseHandle(hRead);35         CloseHandle(hWrite);36         return 0;37     }38     g_hProcess = pi.hProcess;39 40     char buffer[512] = {0};41     DWORD bytesRead = 0, dwTotalArrival = 0;42     BOOL bRet = FALSE;43 44     while(1)45     {46         while(pDlg->m_bListen)47         {48             dwTotalArrival = -1;49             bRet = PeekNamedPipe(hRead, NULL, 0, NULL, &dwTotalArrival, NULL);50             if (bRet && (dwTotalArrival > 0))51             {52                 bRet = ReadFile(hRead, buffer, 512, &bytesRead, NULL);53                 pDlg->m_edit.SetSel(-1);54                 pDlg->m_edit.ReplaceSel(buffer);55                 /*pDlg->SetDlgItemText(IDC_EDIT1, buffer);*/56                 memset(buffer, 0, 512);57             }58     59         }60         61     }62 63     return 1;64 }
PeekNamedPipe

 3、WaitForSingleObject 与 WaitForMultipleObjects函数

  3.1 WaitForSingleObject

 等待直到指定的对象处于有信号状态或超时;

 原型: 

DWORD WINAPI WaitForSingleObject(    __in  HANDLE hHandle,        //被等待的对象句柄;    __in  DWORD dwMilliseconds    //超时时间;    );

参数:

hHandle:被等待的对象句柄 被等待的对象类型可以是更改通知(Change notification)、控制台输入(Console input)、事件(Event)、内存资源通知(Memory resource notification)、  互斥体(Mutex)、进程(Process)、信号量(Semaphore)、线程(Thread)、可等待计时器(Waitable timer);

dwMilliseconds:超时时间,可以设置为以下值  为0,函数不进入等待状态,立即返回;  INFINITE,一直等待直到被等待对象变为有信号状态时才返回;  其它DWORD值,函数等待直到被等待对象变为有信号或者等待时间间隔到期;

返回值:

指示导致函数从等待状态返回的原因,可以是以下值:
 WAIT_ABANDONED (0x00000080L) 指定对象是一个互斥体,拥有该互斥体的线程在终止时没有释放该互斥体对象,此时该互斥体对象的所有权被授予给调用线程,并被设置为无信号状态;
 WAIT_OBJECT_0 (0x00000000L) 指定对象变为有信号状态;
 WAIT_TIMEOUT (0x00000102L) 等待时间耗尽且指定对象仍为无信号状态;
 WAIT_FAILED  (0xFFFFFFFF) 函数运行不成功,调用CallLastError获得错误信息;

示例代码:

 1 #include <windows.h> 2 #include <stdio.h> 3  4 #define THREADCOUNT 2 5  6 HANDLE ghMutex;  7  8 DWORD WINAPI WriteToDatabase( LPVOID ); 9 10 void main()11 {12     HANDLE aThread[THREADCOUNT];13     DWORD ThreadID;14     int i;15 16     // Create a mutex with no initial owner17 18     ghMutex = CreateMutex( 19         NULL,              // default security attributes20         FALSE,             // initially not owned21         NULL);             // unnamed mutex22 23     if (ghMutex == NULL) 24     {25         printf("CreateMutex error: %d\n", GetLastError());26         return;27     }28 29     // Create worker threads30 31     for( i=0; i < THREADCOUNT; i++ )32     {33         aThread[i] = CreateThread( 34                      NULL,       // default security attributes35                      0,          // default stack size36                      (LPTHREAD_START_ROUTINE) WriteToDatabase, 37                      NULL,       // no thread function arguments38                      0,          // default creation flags39                      &ThreadID); // receive thread identifier40 41         if( aThread[i] == NULL )42         {43             printf("CreateThread error: %d\n", GetLastError());44             return;45         }46     }47 48     // Wait for all threads to terminate49 50     WaitForMultipleObjects(THREADCOUNT, aThread, TRUE, INFINITE);51 52     // Close thread and mutex handles53 54     for( i=0; i < THREADCOUNT; i++ )55         CloseHandle(aThread[i]);56 57     CloseHandle(ghMutex);58 }59 60 DWORD WINAPI WriteToDatabase( LPVOID lpParam )61 { 62     DWORD dwCount=0, dwWaitResult; 63 64     // Request ownership of mutex.65 66     while( dwCount < 20 )67     { 68         dwWaitResult = WaitForSingleObject( 69             ghMutex,    // handle to mutex70             INFINITE);  // no time-out interval71  72         switch (dwWaitResult) 73         {74             // The thread got ownership of the mutex75             case WAIT_OBJECT_0: 76                 __try { 77                     // TODO: Write to the database78                     printf("Thread %d writing to database...\n", 79                             GetCurrentThreadId());80                     dwCount++;81                 } 82 83                 __finally { 84                     // Release ownership of the mutex object85                     if (! ReleaseMutex(ghMutex)) 86                     { 87                         // Handle error.88                     } 89                 } 90                 break; 91 92             // The thread got ownership of an abandoned mutex93             // The database is in an indeterminate state94             case WAIT_ABANDONED: 95                 return FALSE; 96         }97     }98     return TRUE; 99 }
WaitForSingleObject

 

3.2 WaitForMultipleObjects

等待直到一个或所有指定的对象处于有信号状态或超时;

原型:

DWORD WINAPI WaitForMultipleObjects(    __in  DWORD nCount,                //表示lpHandles指向数组中对象句柄的数量,最大值为 MAXIMUM_WAIT_OBJECTS;    __in  const HANDLE *lpHandles,    //指向包含一组句柄对象的数组;                                __in  BOOL bWaitAll,            //为TRUE,表示数组lpHandles中所有对象变为有信号状态才返回;为FALSE,表示任何一个对象变为有信号状态即返回;    __in  DWORD dwMilliseconds        //意义通WaitForSingleObject;    );

参数:

lpHandles:
   数组中不可以包含相同的对象句柄;
   如果函数为返回之前,数组中便有一个或多个句柄被释放,此时函数的行为未被定义,是不可被判断的;
   数组中句柄必需都有SYNCHRONIZE 权限;

返回值:

WAIT_OBJECT_0 to (WAIT_OBJECT_0 + nCount– 1): 如果bWaitAll是TRUE,表示所有等待对象句柄均变为有信号;如果bWaitAll设为FALSE,返回值减WAIT_OBJECT_0的值便是lpHanddles数组中满足函数返回的对象的下标(若同时有多个对象变为了有有信号状态,返回值减WAIT_OBJECT_0的值 表示的是变为有信号状态对象中最小的下标);

WAIT_ABANDONED_0 to (WAIT_ABANDONED_0 + nCount– 1):如果bWaitAll是TRUE,表示所有等待对象句柄均变为有信号,且至少有一个对象是被抛弃的互斥体对象;如果bWaitAll设为FALSE,返回值减WAIT_OBJECT_0的值 便是lpHanddles数组中被抛弃的互斥体对象; (互斥体对象的被抛弃的处理,同WaitForSingleObject中WAIT_ABANDONED返回值)。

WAIT_TIMEOUT 和 WAIT_FAILED 同 WaitForSingleObject 中对应返回值;

示例代码:

 1 #include <windows.h> 2 #include <stdio.h> 3  4 HANDLE ghEvents[2]; 5  6 DWORD WINAPI ThreadProc( LPVOID ); 7  8 void main() 9 {10     HANDLE hThread; 11     DWORD i, dwEvent, dwThreadID; 12 13     // Create two event objects14 15     for (i = 0; i < 2; i++) 16     { 17         ghEvents[i] = CreateEvent( 18             NULL,   // default security attributes19             FALSE,  // auto-reset event object20             FALSE,  // initial state is nonsignaled21             NULL);  // unnamed object22 23         if (ghEvents[i] == NULL) 24         { 25             printf("CreateEvent error: %d\n", GetLastError() ); 26             ExitProcess(0); 27         } 28     } 29 30     // Create a thread31 32     hThread = CreateThread( 33                  NULL,         // default security attributes34                  0,            // default stack size35                  (LPTHREAD_START_ROUTINE) ThreadProc, 36                  NULL,         // no thread function arguments37                  0,            // default creation flags38                  &dwThreadID); // receive thread identifier39 40     if( hThread == NULL )41     {42         printf("CreateThread error: %d\n", GetLastError());43         return;44     }45 46     // Wait for the thread to signal one of the event objects47 48     dwEvent = WaitForMultipleObjects( 49         2,           // number of objects in array50         ghEvents,     // array of objects51         FALSE,       // wait for any object52         5000);       // five-second wait53 54     // The return value indicates which event is signaled55 56     switch (dwEvent) 57     { 58         // ghEvents[0] was signaled59         case WAIT_OBJECT_0 + 0: 60             // TODO: Perform tasks required by this event61             printf("First event was signaled.\n");62             break; 63 64         // ghEvents[1] was signaled65         case WAIT_OBJECT_0 + 1: 66             // TODO: Perform tasks required by this event67             printf("Second event was signaled.\n");68             break; 69 70         case WAIT_TIMEOUT:71             printf("Wait timed out.\n");72             break;73 74         // Return value is invalid.75         default: 76             printf("Wait error: %d\n", GetLastError()); 77             ExitProcess(0); 78     }79 80     // Close event handles81 82     for (i = 0; i < 2; i++) 83         CloseHandle(ghEvents[i]);    84 }85 86 DWORD WINAPI ThreadProc( LPVOID lpParam )87 {88     // Set one event to the signaled state89 90     if ( !SetEvent(ghEvents[0]) ) 91     {92         printf("SetEvent failed (%d)\n", GetLastError());93         return -1;94     }95     return 1;96 }
WaitForMultipleObjects

 

阻塞与非阻塞那些事