reactor的实现

备注:以下代码只是架构,具体编译不能通过,仍需修改

#define BUFFER_LEN 1024
#define MAX_EPOLL_EVENT 1024

typedef int (*NCALLBACK)(int fd, int event, void * arg)

struct item{
	int fd;
  int events;
  void * arg;
  
#if 1
  NCALLBACK callback;
#else
   NCALLBACK readcb; // EPOLLIN
  NCALLBACK writecb; // EPOLLOUT
  NCALLBACK acceptcb; //EPOLLIN
 #endif
 
	unsigned char  rbuffer[BUFFER_LEN]; 
  int rlen; // recv的长度
  unsigned char  wbuffer[BUFFER_LEN];
  int wlen;//send的长度
};

// 如果超过MAX_EPOLL_EVENT,可以使用itemblock
struct itemblock{
	struct itemblock *next;
  struct item * items;
};

struct reactor{
	int epfd;
 
  struct itemblock *head; // 所有io集合
  // struct itemblock **last; 
}

 int read_callback(int fd, int event, void* arg){
 struct reactor * r = getInstance();
   unsigned char * buffer = R->head->items[fd].rbuffer;
   
   int index = 0;
   int ret = 0;
   while(index < BUFFER_LEN){
   	 ret = recv(fd, buffer+index, BUFFER_LEN-index, 0);
     
     if(ret == -1) break;
     else if(ret > 0){
     	index += ret;

     }
     else {
     	break;
     }
   }
   
   if(index == BUFFER_LEN && ret != -1)
     // 需要继续读
          SetEvent(fd, read_callback, READ_CB, NULL);
   else if(ret == 0){
     // 从epoll中清除fd
     DelEvent(fd, NULL, 0, NULL);
        	close(fd);
   }
   else {
     // 读完了,调用写
   	SetEvent(fd, write_callback, WRITE_CB, NULL);
   }
 }

 int write_callback(int fd, int event, void* arg){
 	struct reactor * R = getInstance();
   unsigned char * buffer = R->head->items[fd].wbuffer;
   int len = R->head->items[fd].wlen;
   
   int ret =   send(fd, buffer, len, 0);
   if(ret < len){
     // 如果没有全部发送,继续发送
   	SetEvent(fd, write_callback, WRITE_CB, NULL);
   }
   else {
     // 发送成功
   		SetEvent(fd, read_callback, READ_CB, NULL);
   }

   return 0;
 }


 int accept_callback(int fd, int event, void* arg){
 
   int connfd;
   struct sockaddr_in client;
   socklen_t len = sizeof(client);
   if((connfd = accept(fd, (struct sockaddr *)&client, &len))  == -1){
        printf("accept socket error :%s(errno:%d)
", strerror(errno()));
   
   return -1;
   }

   SetEvent(connfd, read_callback, READ_CB, NULL);
 }


int InitReactor(struct reactor * r){
	if(!r) return -1;
  
  int epfd = epoll_create(1); 
  r->epfd = epfd;
  
  
  // fd->item
  r->head = (struct itemblock*)malloc(sizeof(struct itemblock));
  
  if(r->head == NULL){
  	close(epfd);
    return -2;
  }
  
  // 运行久了可能产生脏数据
  memset(r->head, 0, sizeof(struct itemblock));
  
  r->head->items = malloc(MAX_EPOLL_EVENT * sizeof(struct item));
  if(r->head->items == NULL){
   free(r->head);
    close(epfd);
    return -2;
  }
  
  memset(r->head->items, 0, MAX_EPOLL_EVENT * sizeof(struct item));
  
  r->head->next = NULL;
  
  return 0;
}

#define READ_CB 0
#define WRITE_CB 1
#define ACCEPT_CB 2
int SetEvent(int fd, NCALLBACK cb, int events, int * arg){
  struct reactor * r = getInstance();
  
  struct epoll_event ev = {0};
  ev.data.ptr = arg;
  
		switch(event)
    {
      case READ_CB:
        r->head->items[fd].fd = fd;
        r->head->items[fd].readcb = cb;
        r->head->items[fd].arg = arg;
        
  			ev.events = EPOLLIN;
       
        break;
      case WRITE_CB:
         r->head->items[fd].fd = fd;
        r->head->items[fd].writecb = cb;
        r->head->items[fd].arg = arg;
        
        ev.events = EPOLLOUT;
        break;
      case ACCEPT_CB:
         r->head->items[fd].fd = fd;
        r->head->items[fd].acceptcb = cb;
        r->head->items[fd].arg = arg;
        
        ev.events = EPOLLIN;
        break;
    }
  
    epoll_ctl(r->epfd, EPOLL_CTL_ADD, fd, &ev);
  
  return 0;
}

int DelEvent(int fd, NCALLBACK cb, int events, int * arg){
 struct reactor * r = getInstance();
  
  struct epoll_event ev = {0};
  ev.data.ptr = arg;
  
  epoll_ctl(r->epfd, EPOLL_CTL_DEL, fd, &ev);
}

struct reactor * instance = NULL;
struct reactor * getInstance(void){
                          if(instance  ==  NULL){
	instance = malloc(sizeof(struct reactor));
  if(instance == NULL) return NULL;
  
  memset(instance, 0, sizeof(struct reactor));
  
  if(0>InitReactor(instance)){
    return NULL;
  }
}
                          }
                          
 void reactor_loop(){
   while(true){
  	int nReady = epoll_wait(epfd, events, POLL_SIZE, 5);
    if(nReady == -1) continue;
    
    for(int i = 0 ; i < nReady; i++){
    int clientfd = events[i].data.fd;
      if(clientfd == listenfd){
      R->head->items[listenfd].acceptcb(listenfd, 0, NULL);
      }
      
      if(events[i].events & EPOLLIN){
      	R->head->items[listenfd].readcb(clientfd, 0, NULL);
      }
      
            if(events[i].events & EPOLLOUT){
      	R->head->items[listenfd].writecb(clientfd, 0, NULL);
      }
    }
  }
 }


void main()
{
int listenfd = initServer(9999);
  SetEvent(listenfd, accept_callback, ACCEPT_CB, NULL);
  
reactor_loop();
  
}
展开阅读全文

页面更新:2024-05-24

标签:备注   架构   长度   代码   数据

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top