备注:以下代码只是架构,具体编译不能通过,仍需修改
#define BUFFER_LEN 1024
#define MAX_EPOLL_EVENT 1024
typedef int (*NCALLBACK)(int fd, int event, void * arg)
struct item{
int fd;
int events;
void * arg;
#if 1
NCALLBACK callback;
#else
NCALLBACK readcb; // EPOLLIN
NCALLBACK writecb; // EPOLLOUT
NCALLBACK acceptcb; //EPOLLIN
#endif
unsigned char rbuffer[BUFFER_LEN];
int rlen; // recv的长度
unsigned char wbuffer[BUFFER_LEN];
int wlen;//send的长度
};
// 如果超过MAX_EPOLL_EVENT,可以使用itemblock
struct itemblock{
struct itemblock *next;
struct item * items;
};
struct reactor{
int epfd;
struct itemblock *head; // 所有io集合
// struct itemblock **last;
}
int read_callback(int fd, int event, void* arg){
struct reactor * r = getInstance();
unsigned char * buffer = R->head->items[fd].rbuffer;
int index = 0;
int ret = 0;
while(index < BUFFER_LEN){
ret = recv(fd, buffer+index, BUFFER_LEN-index, 0);
if(ret == -1) break;
else if(ret > 0){
index += ret;
}
else {
break;
}
}
if(index == BUFFER_LEN && ret != -1)
// 需要继续读
SetEvent(fd, read_callback, READ_CB, NULL);
else if(ret == 0){
// 从epoll中清除fd
DelEvent(fd, NULL, 0, NULL);
close(fd);
}
else {
// 读完了,调用写
SetEvent(fd, write_callback, WRITE_CB, NULL);
}
}
int write_callback(int fd, int event, void* arg){
struct reactor * R = getInstance();
unsigned char * buffer = R->head->items[fd].wbuffer;
int len = R->head->items[fd].wlen;
int ret = send(fd, buffer, len, 0);
if(ret < len){
// 如果没有全部发送,继续发送
SetEvent(fd, write_callback, WRITE_CB, NULL);
}
else {
// 发送成功
SetEvent(fd, read_callback, READ_CB, NULL);
}
return 0;
}
int accept_callback(int fd, int event, void* arg){
int connfd;
struct sockaddr_in client;
socklen_t len = sizeof(client);
if((connfd = accept(fd, (struct sockaddr *)&client, &len)) == -1){
printf("accept socket error :%s(errno:%d)
", strerror(errno()));
return -1;
}
SetEvent(connfd, read_callback, READ_CB, NULL);
}
int InitReactor(struct reactor * r){
if(!r) return -1;
int epfd = epoll_create(1);
r->epfd = epfd;
// fd->item
r->head = (struct itemblock*)malloc(sizeof(struct itemblock));
if(r->head == NULL){
close(epfd);
return -2;
}
// 运行久了可能产生脏数据
memset(r->head, 0, sizeof(struct itemblock));
r->head->items = malloc(MAX_EPOLL_EVENT * sizeof(struct item));
if(r->head->items == NULL){
free(r->head);
close(epfd);
return -2;
}
memset(r->head->items, 0, MAX_EPOLL_EVENT * sizeof(struct item));
r->head->next = NULL;
return 0;
}
#define READ_CB 0
#define WRITE_CB 1
#define ACCEPT_CB 2
int SetEvent(int fd, NCALLBACK cb, int events, int * arg){
struct reactor * r = getInstance();
struct epoll_event ev = {0};
ev.data.ptr = arg;
switch(event)
{
case READ_CB:
r->head->items[fd].fd = fd;
r->head->items[fd].readcb = cb;
r->head->items[fd].arg = arg;
ev.events = EPOLLIN;
break;
case WRITE_CB:
r->head->items[fd].fd = fd;
r->head->items[fd].writecb = cb;
r->head->items[fd].arg = arg;
ev.events = EPOLLOUT;
break;
case ACCEPT_CB:
r->head->items[fd].fd = fd;
r->head->items[fd].acceptcb = cb;
r->head->items[fd].arg = arg;
ev.events = EPOLLIN;
break;
}
epoll_ctl(r->epfd, EPOLL_CTL_ADD, fd, &ev);
return 0;
}
int DelEvent(int fd, NCALLBACK cb, int events, int * arg){
struct reactor * r = getInstance();
struct epoll_event ev = {0};
ev.data.ptr = arg;
epoll_ctl(r->epfd, EPOLL_CTL_DEL, fd, &ev);
}
struct reactor * instance = NULL;
struct reactor * getInstance(void){
if(instance == NULL){
instance = malloc(sizeof(struct reactor));
if(instance == NULL) return NULL;
memset(instance, 0, sizeof(struct reactor));
if(0>InitReactor(instance)){
return NULL;
}
}
}
void reactor_loop(){
while(true){
int nReady = epoll_wait(epfd, events, POLL_SIZE, 5);
if(nReady == -1) continue;
for(int i = 0 ; i < nReady; i++){
int clientfd = events[i].data.fd;
if(clientfd == listenfd){
R->head->items[listenfd].acceptcb(listenfd, 0, NULL);
}
if(events[i].events & EPOLLIN){
R->head->items[listenfd].readcb(clientfd, 0, NULL);
}
if(events[i].events & EPOLLOUT){
R->head->items[listenfd].writecb(clientfd, 0, NULL);
}
}
}
}
void main()
{
int listenfd = initServer(9999);
SetEvent(listenfd, accept_callback, ACCEPT_CB, NULL);
reactor_loop();
}
页面更新:2024-05-24
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号