首页

源码搜藏网

首页 > 开发教程 > 软件工程 >

memcached源码阅读----使用libevent和多线程模型

创建时间:2016-06-12 17:36  

一、libevent的使用

    首先我们知道,memcached是使用了iblievet作为网络框架的,而iblievet又是单线程模型的基于linux下epoll事件的异步模型。因此,其基本的思想就是 对可读,可写,超时,出错等事件进行绑定函数,等有其事件发生,对其绑定函数回调。

    

可以减掉了解一下 libevent基本api调用

[cpp] view plain copy  memcached源码阅读----使用libevent和多线程模型memcached源码阅读----使用libevent和多线程模型
  1. struct event_base *base;  
  2. base = event_base_new();//初始化libevent  

event_base_new对比epoll,可以理解为epoll里的epoll_create。

event_base内部有一个循环,循环阻塞在epoll调用上,当有一个事件发生的时候,才会去处理这个事件。其中,这个事件是被绑定在event_base上面的,每一个事件就会对应一个struct event,可以是监听的fd。 

其中struct event 使用event_new 来创建和绑定,使用event_add来启用,例如:

[cpp] view plain copy  memcached源码阅读----使用libevent和多线程模型memcached源码阅读----使用libevent和多线程模型
  1. struct event *listener_event;  
  2. listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);  


参数说明:

base:event_base类型,event_base_new的返回值

listener:监听的fd,listen的fd

EV_READ|EV_PERSIST事件的类型及属性

do_accept:绑定的回调函数

(void*)base:给回调函数的参数

event_add(listener_event, NULL);

对比epoll:

event_new相当于epoll中的epoll_wait,其中的epoll里的while循环,在libevent里使用event_base_dispatch。

event_add相当于epoll中的epoll_ctl,参数是EPOLL_CTL_ADD,添加事件。

注:libevent支持的事件及属性包括(使用bitfield实现,所以要用 | 来让它们合体)
EV_TIMEOUT: 超时
EV_READ: 只要网络缓冲中还有数据,回调函数就会被触发
EV_WRITE: 只要塞给网络缓冲的数据被写完,回调函数就会被触发
EV_SIGNAL: POSIX信号量
EV_PERSIST: 不指定这个属性的话,回调函数被触发后事件会被删除
EV_ET: Edge-Trigger边缘触发,相当于EPOLL的ET模式

事件创建添加之后,就可以处理发生的事件了,相当于epoll里的epoll_wait,在libevent里使用event_base_dispatch启动event_base循环,直到不再有需要关注的事件。


有了上面的分析,结合之前做的epoll服务端程序,对于一个服务器程序,流程基本是这样的:

1. 创建socketbindlisten,设置为非阻塞模式

2. 创建一个event_base,即

[cpp] view plaincopy
  1. struct event_base *  event_base_new(void)  

3. 创建一个event,将该socket托管给event_base,指定要监听的事件类型,并绑定上相应的回调函数(及需要给它的参数)

[cpp] view plaincopy
  1. struct event *  event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, shortvoid *), void *arg)  

4. 启用该事件,即

[cpp] view plaincopy
  1. int  event_add(struct event *ev, const struct timeval *tv)  

5.  进入事件循环,即

[cpp] view plaincopy
  1. int  event_base_dispatch(struct event_base *event_base)  


有了上边的基础东西,可以进入memcached的阅读了。
二、memcached源码分析
main函数启动,首先会初始化很多数据,这里我们只涉及大网络这块,其他以后分析,先忽略。 1.首先初始化 主工作线程的的iblievet对象
[cpp] view plain copy  memcached源码阅读----使用libevent和多线程模型memcached源码阅读----使用libevent和多线程模型
  1. /* initialize main thread libevent instance */  
  2.   main_base = event_init();  

最后会调用
[cpp] view plain copy  memcached源码阅读----使用libevent和多线程模型memcached源码阅读----使用libevent和多线程模型
  1. /* enter the event loop */  
  2.  if (event_base_loop(main_base, 0) != 0) {  
  3.      retval = EXIT_FAILURE;  
  4.  }  

在该对象内部循环。不退出。
2.初始化连接的对象
[cpp] view plain copy  memcached源码阅读----使用libevent和多线程模型memcached源码阅读----使用libevent和多线程模型
  1. static void conn_init(void) {  
  2.     freetotal = 200;  
  3.     freecurr = 0;  
  4.     if ((freeconns = calloc(freetotal, sizeof(conn *))) == NULL) {  
  5.         fprintf(stderr, "Failed to allocate connection structures\n");  
  6.     }  
  7.     return;  
  8. }  


这里是先预先分配200个conn*的内存。等有连接上来,会从freeconns  取。 如下代码:
[cpp] view plain copy  memcached源码阅读----使用libevent和多线程模型memcached源码阅读----使用libevent和多线程模型
  1. /* 
  2.  * Returns a connection from the freelist, if any. 
  3.  */  
  4. conn *conn_from_freelist() {  
  5.     conn *c;  
  6.   
  7.     pthread_mutex_lock(&conn_lock);  
  8.     if (freecurr > 0) {  
  9.         c = freeconns[--freecurr];  
  10.     } else {  
  11.         c = NULL;  
  12.     }  
  13.     pthread_mutex_unlock(&conn_lock);  
  14.   
  15.     return c;  
  16. }  


3.那么conn的结构体内部长什么样子呢?
[cpp] view plain copy  memcached源码阅读----使用libevent和多线程模型memcached源码阅读----使用libevent和多线程模型
  1. typedef struct conn conn;  
  2. struct conn {  
  3.     int    sfd;  
  4.     sasl_conn_t *sasl_conn;  
  5.     enum conn_states  state;  
  6.     enum bin_substates substate;  
  7.     struct event event;  
  8.     short  ev_flags;  
  9.     short  which;   /** which events were just triggered */  
  10.   
  11.     char   *rbuf;   /** buffer to read commands into */  
  12.     char   *rcurr;  /** but if we parsed some already, this is where we stopped */  
  13.     int    rsize;   /** total allocated size of rbuf */  
  14.     int    rbytes;  /** how much data, starting from rcur, do we have unparsed */  
  15.   
  16.     char   *wbuf;  
  17.     char   *wcurr;  
  18.     int    wsize;  
  19.     int    wbytes;  
  20.     /** which state to go into after finishing current write */  
  21.     enum conn_states  write_and_go;  
  22.     void   *write_and_free; /** free this memory after finishing writing */  
  23.   
  24.     char   *ritem;  /** when we read in an item's value, it goes here */  
  25.     int    rlbytes;  
  26.   
  27.     /* data for the nread state */  
  28.   
  29.     /** 
  30.      * item is used to hold an item structure created after reading the command 
  31.      * line of set/add/replace commands, but before we finished reading the actual 
  32.      * data. The data is read into ITEM_data(item) to avoid extra copying. 
  33.      */  
  34.   
  35.     void   *item;     /* for commands set/add/replace  */  
  36.   
  37.     /* data for the swallow state */  
  38.     int    sbytes;    /* how many bytes to swallow */  
  39.   
  40.     /* data for the mwrite state */  
  41.     struct iovec *iov;  
  42.     int    iovsize;   /* number of elements allocated in iov[] */  
  43.     int    iovused;   /* number of elements used in iov[] */  
  44.   
  45.     struct msghdr *msglist;  
  46.     int    msgsize;   /* number of elements allocated in msglist[] */  
  47.     int    msgused;   /* number of elements used in msglist[] */  
  48.     int    msgcurr;   /* element in msglist[] being transmitted now */  
  49.     int    msgbytes;  /* number of bytes in current msg */  
  50.   
  51.     item   **ilist;   /* list of items to write out */  
  52.     int    isize;  
  53.     item   **icurr;  
  54.     int    ileft;  
  55.   
  56.     char   **suffixlist;  
  57.     int    suffixsize;  
  58.     char   **suffixcurr;  
  59.     int    suffixleft;  
  60.   
  61.     enum protocol protocol;   /* which protocol this con<pre name="code" class="cpp">  if (sigignore(SIGPIPE) == -1) {  
  62.         perror("failed to ignore SIGPIPE; sigaction");  
  63.         exit(EX_OSERR);  
  64.     }  

nection speaks */ enum network_transport transport; /* what transport is used by this connection */ /* data for UDP clients */ int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */ struct sockaddr request_addr; /* Who sent the most recent request */ socklen_t request_addr_size; unsigned char *hdrbuf; /* udp packet headers */ int hdrsize; /* number of headers' worth of space is allocated */ bool noreply; /* True if the reply should not be sent. */ /* current stats command */ struct { char *buffer; size_t size; size_t offset; } stats; /* Binary protocol stuff */ /* This is where the binary header goes */ protocol_binary_request_header binary_header; uint64_t cas; /* the cas to return */ short cmd; /* current command being processed */ int opaque; int keylen; conn *next; /* Used for generating a list of conn structures */ LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection *
上一篇:linux的进程与线程
下一篇:【常用算法思路分析系列】与二分搜索相关高频题

相关内容

热门推荐