本文介绍了使用WindowsIOCompletionPort(IOCP)实现的一个简单服务端与客户端通信的例子,涉及代码结构、子线程与主线程协作、重叠端口的使用以及如何检测客户端断开连接。作者通过实例展示了如何在Winsock中使用IOCP进行异步I/O操作和处理非阻塞IO的特殊情况。 原文链接:https://blog.csdn.net/zlllc/article/details/136481155 服务端代码 代码结构 这里通过主线程进行主要代码管理、各种SOCKET和IOCP对象的创建、WSARECV的投递、退出IOCP等待等操作。 子线程就负责从GetQueuedCompletionStatus()中获取完成的信息。 主要参考以下文章中的模型: IO完成端口(IOCP) - 木文的文章 - 知乎 用户态和内核态的相对关系: 主线程和子线程的工作流程: 图片来源: IO完成端口(IOCP) - 木文的文章 - 知乎 代码 #include <WinSock2.h> #include <iostream> #include <thread> #include <mutex> #pragma comment(lib, "ws2_32.lib") std::mutex mtx; inline void OPT(const char * head, const char* msg) { std::lock_guard<std::mutex> lkg(mtx); std::cout << head << " - " << msg << std::endl; } HANDLE hIOCP = nullptr; struct IOContext { OVERLAPPED overlapped{}; WSABUF wsaBuf{ 1024, buffer }; CHAR buffer[1024]{}; SOCKET socket = INVALID_SOCKET; DWORD nBytes = 0; }; 子线程 void _func() { OPT("child thread","IN"); IOContext* ioContext = nullptr; DWORD lpNumberOfBytesTransferred = 0; void* lpCompletionKey = nullptr; DWORD dwFlags = 0; DWORD nBytes = 1024; while (true) { OPT("child thread", "before GetQueuedCompletionStatus()"); //等待GetQueuedCompletionStatus()返回完成的ioContext。 //这里的阻塞选项选择的INFINITE,就是一直等待。 BOOL bRt = GetQueuedCompletionStatus( hIOCP, &lpNumberOfBytesTransferred, (PULONG_PTR)&lpCompletionKey, (LPOVERLAPPED*)&ioContext, INFINITE); OPT("child thread", "after GetQueuedCompletionStatus()"); if (!bRt) { //根据MSDN的说法, 返回值非0,就是执行失败 OPT("child thread", "GetQueuedCompletionStatus() failed"); continue; } // 收到 PostQueuedCompletionStatus 发出的退出指令 if (lpNumberOfBytesTransferred == -1) { OPT("child thread", "recv quit cmd"); break; } if (lpNumberOfBytesTransferred == 0) { // 通常认为这样相当于recv() == 0, 是断开连接的信号 OPT("child thread", "recv msg size is 0"); { int res = recv(ioContext->socket, ioContext->buffer, 1024, 0); if (res == 0) { OPT("child thread", "conn disconnected"); } else { OPT("child thread", "continue"); } continue; } } // 读到,或者写入的字节总数 ioContext->nBytes = lpNumberOfBytesTransferred; // 处理对应的事件 { // 输出读取到的内容 std::cout << ">>>Recv:" << ioContext->buffer << std::endl; // 继续提交Recv请求 OPT("child thread", "Post RECV again"); int nRt = WSARecv( ioContext->socket, &ioContext->wsaBuf, 1, &nBytes, &dwFlags, &(ioContext->overlapped), nullptr); auto e = WSAGetLastError(); if (SOCKET_ERROR == nRt && e != WSAGetLastError()) { // 读取发生错误,如果发生了 closesocket(ioContext->socket); delete ioContext; ioContext = nullptr; } } } } 主线程 int main() { WSAData wsadata; WSAStartup(MAKEWORD(2,2), &wsadata); //创建IOCP OPT("master thread", "create IOCP"); hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, 5); if (INVALID_HANDLE_VALUE != hIOCP) { //创建工作线程 OPT("master thread", "create thread"); std::thread th(_func); //创建listen的SOCKET,注意需要WSA_FLAG_OVERLAPPED声明是重叠IO可以进行异步操作 OPT("master thread", "create Overlapped socket"); //SOCKET listen_sock = socket(AF_INET, SOCK_STREAM, 0); SOCKET listen_sock = WSASocket(AF_INET, SOCK_STREAM, 0, nullptr, 0, WSA_FLAG_OVERLAPPED); if (listen_sock != INVALID_SOCKET) { OPT("master thread", "bind Overlapped socket"); sockaddr_in local_addr; local_addr.sin_family = AF_INET; local_addr.sin_port = htons(9100); local_addr.sin_addr.S_un.S_addr = ADDR_ANY; if (bind(listen_sock, (sockaddr*)&local_addr, sizeof(local_addr)) == 0) { OPT("master thread", "listen Overlapped socket"); if (listen(listen_sock, 5) == 0) { sockaddr_in conn_addr; int sock_addr_len = sizeof(conn_addr); SOCKET conn_sock = accept(listen_sock, (sockaddr*)&conn_addr, &sock_addr_len); //将要绑定到IOCP的套接字设置为非阻塞的 unsigned long ul = 1; if (SOCKET_ERROR != ioctlsocket(conn_sock, FIONBIO, &ul)) { //将要监控的SOCKET绑定到IOCP OPT("master thread", "bind accept socket with Overlapped socket"); if (nullptr != CreateIoCompletionPort((HANDLE)conn_sock, hIOCP, 0, 0)) { DWORD nBytes = 1024; DWORD dwFlags = 0; auto ioContext = new IOContext; ioContext->socket = conn_sock; // 提交一次读取事件 OPT("master thread", "post recv request to Overlapped"); auto rt = WSARecv(ioContext->socket, &ioContext->wsaBuf, 1, &nBytes, &dwFlags, &ioContext->overlapped, nullptr); auto err = WSAGetLastError(); if (SOCKET_ERROR == rt && ERROR_IO_PENDING != err) { // 发生不为 ERROR_IO_PENDING 的错误 //如果发生ERROR_IO_PENDING 错误,认为这是正常的,系统在进行异步处理中 closesocket(ioContext->socket); delete ioContext; } else { getchar(); OPT("master thread", "post exit request to Overlapped"); void* lpCompletionKey = nullptr; PostQueuedCompletionStatus(hIOCP, -1, (ULONG_PTR)lpCompletionKey, nullptr); } } } } else {//略} } else{//略} } else{//略} OPT("master thread", "thread join"); th.join(); } WSACleanup(); return 0; } 客户端模拟 使用NetAssist工具,模拟TCP客户端,进行连接,并发送一条消息"this is a test msg",然后断开连接。 服务端log输出如下。 可以看到,子线程执行以后,就卡在了GetQueuedCompletionStatus(). 直到主线程投递了一个WSARECV,然后客户端发送了msg,子线程才从GetQueuedCompletionStatus()返回。 备注 重叠端口 上面的代码中,listen scoket是重叠端口,accept socket不是,但是依旧可以对accept socket完成异步操作。accept scoket没有声明为重叠端口, 为什么还可以和IOCP绑定呢? 那么如果使用上述函数但是传入一个非阻塞的SOCKET会怎么样呢,这些函数只看是否传入OVERLAPPED结构而不管SOCKET是否是阻塞的,一律按重叠IO的方式来运行。这也就是说,要使用重叠I/O方式来操作SOCKET,那么不一定非要一开初就创建一个重叠I/O方式的SOCKET对象 WinSock 重叠IO模型 如何获取客户端断开连接 返回0字节的判断 if (lpNumberOfBytesTransferred == 0) ... 1 2 通常如果获取到的msg是0,认为客户端断开了连接。 2. 必须投递recv请求否则没反应、 这里要注意的是,如果我们向IOCP绑定了一个accept socket,然后投递了一个WSARECV,然后通过 GetQueuedCompletionStatus()获取到了recv buffer。 此时就要再次WSARECV或者WSASEND,否则就再也无法通过 GetQueuedCompletionStatus()获取这个accept socekt的事件结果了。 3. 客户端异常断开连接 还有一种 GetQueuedCompletionStatus()返回结果对应客户端异常断开连接 // 可能是客户端异常退出了(64) else if (ERROR_NETNAME_DELETED == dwErr) { iocp->OnConnectionError(sockContext, dwErr); // 回收socket iocp->DoClose(sockContext); continue; } 代码来源:WinSock IOCP 模型总结 - 知乎 非阻塞IO WSARECV在配合重叠IO使用时,是非阻塞IO,是会立刻返回的。 |
|Archiver|手机版|小黑屋|软件开发编程门户 ( 陇ICP备2024013992号-1|甘公网安备62090002000130号 )
GMT+8, 2025-1-18 09:53 , Processed in 0.036659 second(s), 16 queries .
Powered by Discuz! X3.5
© 2001-2024 Discuz! Team.