找回密码
 立即注册
搜索

Windows 实现高并发IOCP完成端口例子

2024-11-18 15:40| 发布者: admin| 查看: 59| 评论: 0|来自: CSDN

摘要: 本文介绍了使用WindowsIOCompletionPort(IOCP)实现的一个简单服务端与客户端通信的例子,涉及代码结构、子线程与主线程协作、重叠端口的使用以及如何检测客户端断开连接。作者通过实例展示了如何在Winsock中使用IOCP ...
本文介绍了使用WindowsIOCompletionPort(IOCP)实现的一个简单服务端与客户端通信的例子,涉及代码结构、子线程与主线程协作、重叠端口的使用以及如何检测客户端断开连接。作者通过实例展示了如何在Winsock中使用IOCP进行异步I/O操作和处理非阻塞IO的特殊情况。                
原文链接:https://blog.csdn.net/zlllc/article/details/136481155
代码
服务端代码
代码结构
这里通过主线程进行主要代码管理、各种SOCKET和IOCP对象的创建、WSARECV的投递、退出IOCP等待等操作。
子线程就负责从GetQueuedCompletionStatus()中获取完成的信息。

主要参考以下文章中的模型:
IO完成端口(IOCP) - 木文的文章 - 知乎
用户态和内核态的相对关系:

主线程和子线程的工作流程:

图片来源:
IO完成端口(IOCP) - 木文的文章 - 知乎

代码
#include <WinSock2.h>
#include <iostream>
#include <thread>
#include <mutex>
#pragma comment(lib, "ws2_32.lib")

std::mutex mtx;
inline void OPT(const char * head, const char* msg)
{
std::lock_guard<std::mutex> lkg(mtx);
std::cout << head << " - " << msg << std::endl;
}

HANDLE hIOCP = nullptr;
struct IOContext {
OVERLAPPED overlapped{};
WSABUF wsaBuf{ 1024, buffer };
CHAR buffer[1024]{};
SOCKET socket = INVALID_SOCKET;
DWORD nBytes = 0;
};

子线程

void _func()
{

OPT("child thread","IN");
IOContext* ioContext = nullptr;
DWORD lpNumberOfBytesTransferred = 0;
void* lpCompletionKey = nullptr;

DWORD dwFlags = 0;
DWORD nBytes = 1024;

while (true) {
OPT("child thread", "before GetQueuedCompletionStatus()");
//等待GetQueuedCompletionStatus()返回完成的ioContext。
//这里的阻塞选项选择的INFINITE,就是一直等待。
BOOL bRt = GetQueuedCompletionStatus(
hIOCP,
&lpNumberOfBytesTransferred,
(PULONG_PTR)&lpCompletionKey,
(LPOVERLAPPED*)&ioContext,
INFINITE);
OPT("child thread", "after GetQueuedCompletionStatus()");
if (!bRt)
{ //根据MSDN的说法, 返回值非0,就是执行失败
OPT("child thread", "GetQueuedCompletionStatus() failed");
continue;
}
// 收到 PostQueuedCompletionStatus 发出的退出指令
if (lpNumberOfBytesTransferred == -1)
{
OPT("child thread", "recv quit cmd");
break;
}

if (lpNumberOfBytesTransferred == 0)
{ // 通常认为这样相当于recv() == 0, 是断开连接的信号
OPT("child thread", "recv msg size is 0");
{
int res = recv(ioContext->socket, ioContext->buffer, 1024, 0);
if (res == 0)
{
OPT("child thread", "conn disconnected");
}
else
{
OPT("child thread", "continue");
}
continue;
}
}
// 读到,或者写入的字节总数
ioContext->nBytes = lpNumberOfBytesTransferred;
// 处理对应的事件
{
// 输出读取到的内容
std::cout << ">>>Recv:" << ioContext->buffer << std::endl;
// 继续提交Recv请求
OPT("child thread", "Post RECV again");
int nRt = WSARecv(
ioContext->socket,
&ioContext->wsaBuf,
1,
&nBytes,
&dwFlags,
&(ioContext->overlapped),
nullptr);
auto e = WSAGetLastError();
if (SOCKET_ERROR == nRt && e != WSAGetLastError()) {
// 读取发生错误,如果发生了
closesocket(ioContext->socket);
delete ioContext;
ioContext = nullptr;
}
}
}
}

主线程

int main()
{
WSAData wsadata;
WSAStartup(MAKEWORD(2,2), &wsadata);

//创建IOCP 
OPT("master thread", "create IOCP");
hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, 5);
if (INVALID_HANDLE_VALUE != hIOCP) {

//创建工作线程
OPT("master thread", "create thread");
std::thread th(_func);
//创建listen的SOCKET,注意需要WSA_FLAG_OVERLAPPED声明是重叠IO可以进行异步操作
OPT("master thread", "create Overlapped socket");
//SOCKET listen_sock = socket(AF_INET, SOCK_STREAM, 0);
SOCKET listen_sock = WSASocket(AF_INET, SOCK_STREAM, 0, nullptr, 0, WSA_FLAG_OVERLAPPED);

if (listen_sock != INVALID_SOCKET)
{
OPT("master thread", "bind Overlapped socket");
sockaddr_in local_addr;
local_addr.sin_family = AF_INET;
local_addr.sin_port = htons(9100);
local_addr.sin_addr.S_un.S_addr = ADDR_ANY;
if (bind(listen_sock, (sockaddr*)&local_addr, sizeof(local_addr)) == 0)
{
OPT("master thread", "listen Overlapped socket");
if (listen(listen_sock, 5) == 0)
{
sockaddr_in conn_addr;
int sock_addr_len = sizeof(conn_addr);
SOCKET conn_sock = accept(listen_sock, (sockaddr*)&conn_addr, &sock_addr_len);

//将要绑定到IOCP的套接字设置为非阻塞的
unsigned long ul = 1;
if (SOCKET_ERROR != ioctlsocket(conn_sock, FIONBIO, &ul)) {

//将要监控的SOCKET绑定到IOCP 
OPT("master thread", "bind accept socket with Overlapped socket");
if (nullptr != CreateIoCompletionPort((HANDLE)conn_sock, hIOCP, 0, 0))
{
DWORD nBytes = 1024;
DWORD dwFlags = 0;
auto ioContext = new IOContext;
ioContext->socket = conn_sock;
// 提交一次读取事件
OPT("master thread", "post recv request to Overlapped");
auto rt = WSARecv(ioContext->socket, &ioContext->wsaBuf, 1, &nBytes, &dwFlags, &ioContext->overlapped, nullptr);
auto err = WSAGetLastError();
if (SOCKET_ERROR == rt && ERROR_IO_PENDING != err) {
// 发生不为 ERROR_IO_PENDING 的错误
//如果发生ERROR_IO_PENDING 错误,认为这是正常的,系统在进行异步处理中
closesocket(ioContext->socket);
delete ioContext;
}
else
{
getchar();
OPT("master thread", "post exit request to Overlapped");
void* lpCompletionKey = nullptr;
PostQueuedCompletionStatus(hIOCP, -1, (ULONG_PTR)lpCompletionKey, nullptr);
}
}
}

}
else {//略}
}
else{//略}
}
else{//略}
OPT("master thread", "thread join");
th.join();
}
WSACleanup();
return 0;
}

客户端模拟
使用NetAssist工具,模拟TCP客户端,进行连接,并发送一条消息"this is a test msg",然后断开连接。


服务端log输出如下。

可以看到,子线程执行以后,就卡在了GetQueuedCompletionStatus().
直到主线程投递了一个WSARECV,然后客户端发送了msg,子线程才从GetQueuedCompletionStatus()返回。

备注
重叠端口
上面的代码中,listen scoket是重叠端口,accept socket不是,但是依旧可以对accept socket完成异步操作。accept scoket没有声明为重叠端口, 为什么还可以和IOCP绑定呢?

那么如果使用上述函数但是传入一个非阻塞的SOCKET会怎么样呢,这些函数只看是否传入OVERLAPPED结构而不管SOCKET是否是阻塞的,一律按重叠IO的方式来运行。这也就是说,要使用重叠I/O方式来操作SOCKET,那么不一定非要一开初就创建一个重叠I/O方式的SOCKET对象
WinSock 重叠IO模型

如何获取客户端断开连接
返回0字节的判断
if (lpNumberOfBytesTransferred == 0)
...
1
2
通常如果获取到的msg是0,认为客户端断开了连接。
2. 必须投递recv请求否则没反应、
这里要注意的是,如果我们向IOCP绑定了一个accept socket,然后投递了一个WSARECV,然后通过 GetQueuedCompletionStatus()获取到了recv buffer。
此时就要再次WSARECV或者WSASEND,否则就再也无法通过 GetQueuedCompletionStatus()获取这个accept socekt的事件结果了。
3. 客户端异常断开连接
还有一种 GetQueuedCompletionStatus()返回结果对应客户端异常断开连接

 // 可能是客户端异常退出了(64)
            else if (ERROR_NETNAME_DELETED == dwErr)
            {
                iocp->OnConnectionError(sockContext, dwErr);

                // 回收socket
                iocp->DoClose(sockContext);
                continue;
            }

代码来源:WinSock IOCP 模型总结 - 知乎

非阻塞IO
WSARECV在配合重叠IO使用时,是非阻塞IO,是会立刻返回的。


路过

雷人

握手

鲜花

鸡蛋

QQ|Archiver|手机版|小黑屋|软件开发编程门户 ( 陇ICP备2024013992号-1|甘公网安备62090002000130号 )

GMT+8, 2024-11-21 14:26 , Processed in 0.036626 second(s), 16 queries .

Powered by Discuz! X3.5

© 2001-2024 Discuz! Team.

返回顶部