您好,欢迎来到三六零分类信息网!老站,搜索引擎当天收录,欢迎发信息
免费发信息
三六零分类信息网 > 黄南分类信息网,免费分类信息发布

Redis源码整体运行流程详解

2024/3/10 10:08:04发布25次查看
redis server端处理client请求的流程图 main函数 main函数主要的功能为:调用initserverconfig函数,进行默认的redisserver数据结构的参数初始化;调用daemonize函数,为服务器开始守护进程,对于守护进行相关详细信息见http://blog.csdn.net/acceptedxukai/
redis server端处理client请求的流程图
main函数main函数主要的功能为:调用initserverconfig函数,进行默认的redisserver数据结构的参数初始化;调用daemonize函数,为服务器开始守护进程,对于守护进行相关详细信息见http://blog.csdn.net/acceptedxukai/article/details/8743189;调用initserver函数,初始化服务器;调用loadserverconfig函数,读取redis的配置文件,使用配置文件中的参数替换默认的参数值;调用aemain函数,开启事件循环,整个服务器开始工作。
initserver函数该函数主要为初始化服务器,需要初始化的内容比较多,主要有:
1、创建事件循环
server.el = aecreateeventloop(server.maxclients+redis_eventloop_fdset_incr);
2、创建tcp与udp server,启动服务器,完成bind与listen/* open the tcp listening socket for the user commands. */ //server.ipfd是个int数组,启动服务器,完成bind,listen if (listentoport(server.port,server.ipfd,&server.ipfd_count) == redis_err) exit(1); /* open the listening unix domain socket. */ if (server.unixsocket != null) { unlink(server.unixsocket); /* don't care if this fails */ server.sofd = anetunixserver(server.neterr,server.unixsocket,server.unixsocketperm); if (server.sofd == anet_err) { redislog(redis_warning, opening socket: %s, server.neterr); exit(1); } }
redis2.8.2 tcp同时支持ipv4与ipv6,同时与之前版本的redis不同,此版本支持多个tcp服务器,listentoport函数主要还是调用anettcpserver函数,完成socket()-->bind()-->listen(),下面详细查看下tcpserver的创建,udp直接忽略吧,我也不知道udp具体用在哪。static int anetlisten(char *err, int s, struct sockaddr *sa, socklen_t len) { //绑定bind if (bind(s,sa,len) == -1) { anetseterror(err, bind: %s, strerror(errno)); close(s); return anet_err; } /* use a backlog of 512 entries. we pass 511 to the listen() call because * the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1); * which will thus give us a backlog of 512 entries */ //监听 if (listen(s, 511) == -1) { anetseterror(err, listen: %s, strerror(errno)); close(s); return anet_err; } return anet_ok;}static int _anettcpserver(char *err, int port, char *bindaddr, int af){ int s, rv; char _port[6]; /* strlen(65535) */ struct addrinfo hints, *servinfo, *p; snprintf(_port,6,%d,port); memset(&hints,0,sizeof(hints)); hints.ai_family = af; hints.ai_socktype = sock_stream; //套接字地址用于监听绑定 hints.ai_flags = ai_passive; /* no effect if bindaddr != null */ //可以加上hints.ai_protocol = ipproto_tcp; /**getaddrinfo(const char *hostname, const char *servicename, const struct addrinfo *hint,struct addrinfo **res); hostname:主机名 servicename: 服务名 hint: 用于过滤的模板,仅能使用ai_family, ai_flags, ai_protocol, ai_socktype,其余字段为0 res:得到所有可用的地址 */ if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) { anetseterror(err, %s, gai_strerror(rv)); return anet_err; } //轮流尝试多个地址,找到一个允许连接到服务器的地址时便停止 for (p = servinfo; p != null; p = p->ai_next) { if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1) continue; if (af == af_inet6 && anetv6only(err,s) == anet_err) goto error; //设置套接字选项setsockopt,采用地址复用 if (anetsetreuseaddr(err,s) == anet_err) goto error; //bind, listen if (anetlisten(err,s,p->ai_addr,p->ai_addrlen) == anet_err) goto error; goto end; } if (p == null) { anetseterror(err, unable to bind socket); goto error; }error: s = anet_err;end: freeaddrinfo(servinfo); return s;}//if server.ipfd_count = 0, bindaddr = nullint anettcpserver(char *err, int port, char *bindaddr){ return _anettcpserver(err, port, bindaddr, af_inet);}
3、将listen的端口加入到事件监听中,进行监听,由aecreatefileevent函数完成,其注册的listen端口可读事件处理函数为accepttcphandler,这样在listen端口有新连接的时候会调用accepttcphandler,后者在accept这个新连接,然后就可以处理后续跟这个客户端连接相关的事件了。/* create an event handler for accepting new connections in tcp and unix * domain sockets. */ //文件事件,用于处理响应外界的操作请求,事件处理函数为accepttcphandler/acceptunixhandler //在networking.c for (j = 0; j 0 && aecreatefileevent(server.el,server.sofd,ae_readable, acceptunixhandler,null) == ae_err) redispanic(unrecoverable error creating server.sofd file event.);
accepttcphandler函数上面介绍了,initserver完成listen端口后,会加入到事件循环中,该事件为可读事件,并记录处理函数为fe->rfileproc = accepttcphandler;该函数分两步操作:用accepttcphandler接受这个客户端连接;然第二部初始化这个客户端连接的相关数据,将clientfd加入事件里面,设置的可读事件处理函数为readqueryfromclient,也就是读取客户端请求的函数。
void accepttcphandler(aeeventloop *el, int fd, void *privdata, int mask) { int cport, cfd; char cip[redis_ip_str_len]; redis_notused(el);//无意义 redis_notused(mask); redis_notused(privdata); //cfd为accept函数返回的客户端文件描述符,accept使服务器完成一个客户端的链接 cfd = anettcpaccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == ae_err) { redislog(redis_warning,accepting client connection: %s, server.neterr); return; } redislog(redis_verbose,accepted %s:%d, cip, cport); //将cfd加入事件循环并设置回调函数为readqueryfromclient,并初始化redisclient acceptcommonhandler(cfd,0);}
第一步很简单即完成accept,主要关注第二步acceptcommonhandler函数
static void acceptcommonhandler(int fd, int flags) { redisclient *c; if ((c = createclient(fd)) == null) {//创建新的客户端 redislog(redis_warning, error registering fd event for the new client: %s (fd=%d), strerror(errno),fd); close(fd); /* may be already closed, just ignore errors */ return; } /* if maxclient directive is set and this is one client more... close the * connection. note that we create the client instead to check before * for this condition, since now the socket is already set in non-blocking * mode and we can send an error for free using the kernel i/o */ //当前连接的客户端数目大于服务器最大运行的连接数,则拒绝连接 if (listlength(server.clients) > server.maxclients) { char *err = -err max number of clients reached\r\n; /* that's a best effort error message, don't check write errors */ if (write(c->fd,err,strlen(err)) == -1) { /* nothing to do, just to avoid the warning... */ } server.stat_rejected_conn++; freeclient(c); return; } server.stat_numconnections++; c->flags |= flags;}
createclient函数 此函数用来为新连接的客户端初始化一个redisclient数据结构,该数据结构有比较多的参数,详见redis.h。该函数完成两个操作,第一、为客户端创建事件处理函数readqueryfromclient专门接收客户端发来的指令,第二、初始化redisclient数据结构相关参数。redisclient *createclient(int fd) { redisclient *c = zmalloc(sizeof(redisclient)); /* passing -1 as fd it is possible to create a non connected client. * this is useful since all the redis commands needs to be executed * in the context of a client. when commands are executed in other * contexts (for instance a lua script) we need a non connected client. */ /** 因为 redis 命令总在客户端的上下文中执行, 有时候为了在服务器内部执行命令,需要使用伪客户端来执行命令 在 fd == -1 时,创建的客户端为伪终端 */ if (fd != -1) { //下面三个都是设置socket属性 anetnonblock(null,fd);//非阻塞 anetenabletcpnodelay(null,fd);//no delay if (server.tcpkeepalive) anetkeepalive(null,fd,server.tcpkeepalive);//keep alive //创建一个accept fd的fileevent事件,事件的处理函数是readqueryfromclient if (aecreatefileevent(server.el,fd,ae_readable, readqueryfromclient, c) == ae_err) { close(fd); zfree(c); return null; } } selectdb(c,0);//默认选择第0个db, db.c c->fd = fd;//文件描述符 c->name = null; c->bufpos = 0;//将指令结果发送给客户端的字符串长度 c->querybuf = sdsempty();//请求字符串初始化 c->querybuf_peak = 0;//请求字符串顶峰时的长度值 c->reqtype = 0;//请求类型 c->argc = 0;//参数个数 c->argv = null;//参数内容 c->cmd = c->lastcmd = null;//操作指令 c->multibulklen = 0;//块个数 c->bulklen = -1;//每个块的长度 c->sentlen = 0; c->flags = 0;//客户类型的标记,比较重要 c->ctime = c->lastinteraction = server.unixtime; c->authenticated = 0; c->replstate = redis_repl_none; c->reploff = 0; c->repl_ack_off = 0; c->repl_ack_time = 0; c->slave_listening_port = 0; c->reply = listcreate();//存放服务器应答的数据 c->reply_bytes = 0; c->obuf_soft_limit_reached_time = 0; listsetfreemethod(c->reply,decrrefcountvoid); listsetdupmethod(c->reply,dupclientreplyvalue); c->bpop.keys = dictcreate(&setdicttype,null);//下面三个参数在list数据阻塞操作时使用 c->bpop.timeout = 0; c->bpop.target = null; c->io_keys = listcreate(); c->watched_keys = listcreate();//事务命令cas中使用 listsetfreemethod(c->io_keys,decrrefcountvoid); c->pubsub_channels = dictcreate(&setdicttype,null); c->pubsub_patterns = listcreate(); listsetfreemethod(c->pubsub_patterns,decrrefcountvoid); listsetmatchmethod(c->pubsub_patterns,listmatchobjects); // 如果不是伪客户端,那么将客户端加入到服务器客户端列表中 if (fd != -1) listaddnodetail(server.clients,c);//添加到server的clients链表 initclientmultistate(c);//初始化事务指令状态 return c;}
客户端的请求指令字符串始终存放在querybuf中,在对querybuf解析后,将指令参数的个数存放在argc中,具体的指令参数存放在argv中;但是服务器应答的结果有两种存储方式:buf字符串、reply列表。readqueryfromclient函数readqueryfromclient函数用来读取客户端的请求命令行数据,并调用processinputbuffer函数依照redis通讯协议对数据进行解析。服务器使用最原始的read函数来读取客户端发送来的请求命令,并将字符串存储在querybuf中,根据需要对querybuf进行扩展。
void readqueryfromclient(aeeventloop *el, int fd, void *privdata, int mask) { redisclient *c = (redisclient*) privdata; int nread, readlen; size_t qblen; redis_notused(el); redis_notused(mask); server.current_client = c; readlen = redis_iobuf_len; //1024 * 16 /* if this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query * buffer contains exactly the sds string representing the object, even * at the risk of requiring more read(2) calls. this way the function * processmultibulkbuffer() can avoid copying buffers to create the * redis object representing the argument. */ if (c->reqtype == redis_req_multibulk && c->multibulklen && c->bulklen != -1 && c->bulklen >= redis_mbulk_big_arg) { int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf); if (remaining querybuf); if (c->querybuf_peak querybuf_peak = qblen; //对querybuf的空间进行扩展 c->querybuf = sdsmakeroomfor(c->querybuf, readlen); //读取客户端发来的操作指令 nread = read(fd, c->querybuf+qblen, readlen); if (nread == -1) { if (errno == eagain) { nread = 0; } else { redislog(redis_verbose, reading from client: %s,strerror(errno)); freeclient(c); return; } } else if (nread == 0) { redislog(redis_verbose, client closed connection); freeclient(c); return; } if (nread) { //改变querybuf的实际长度和空闲长度,len += nread, free -= nread; sdsincrlen(c->querybuf,nread); c->lastinteraction = server.unixtime; if (c->flags & redis_master) c->reploff += nread; } else { server.current_client = null; return; } //客户端请求的字符串长度大于服务器最大的请求长度值 if (sdslen(c->querybuf) > server.client_max_querybuf_len) { sds ci = getclientinfostring(c), bytes = sdsempty(); bytes = sdscatrepr(bytes,c->querybuf,64); redislog(redis_warning,closing client that reached max query buffer length: %s (qbuf initial bytes: %s), ci, bytes); sdsfree(ci); sdsfree(bytes); freeclient(c); return; } //解析请求 processinputbuffer(c); server.current_client = null;}
processinputbuffer函数主要用来处理请求的解析工作,redis有两种解析方式;行指令解析与多重指令解析,行指令解析直接忽略,下面详解多重指令解析。void processinputbuffer(redisclient *c) { /* keep processing while there is something in the input buffer */ while(sdslen(c->querybuf)) { /* immediately abort if the client is in the middle of something. */ if (c->flags & redis_blocked) return; /* redis_close_after_reply closes the connection once the reply is * written to the client. make sure to not let the reply grow after * this flag has been set (i.e. don't process more commands). */ if (c->flags & redis_close_after_reply) return; /* determine request type when unknown. */ //当请求类型未知时,先确定属于哪种请求 if (!c->reqtype) { if (c->querybuf[0] == '*') { c->reqtype = redis_req_multibulk;//多重指令解析 } else { c->reqtype = redis_req_inline;//按行解析 } } if (c->reqtype == redis_req_inline) { if (processinlinebuffer(c) != redis_ok) break; } else if (c->reqtype == redis_req_multibulk) { if (processmultibulkbuffer(c) != redis_ok) break; } else { redispanic(unknown request type); } /* multibulk processing could see a argc == 0) { resetclient(c); } else { /* only reset the client when the command was executed. */ //执行相应指令 if (processcommand(c) == redis_ok) resetclient(c); } }}
多重指令解析的处理函数为processmultibulkbuffer,下面先简单介绍下redis的通讯协议:以下是这个协议的一般形式:* cr lf$ cr lf cr lf...$ cr lf cr lf举个例子,以下是一个命令协议的打印版本:*3$3set$3foo$3bar这个命令的实际协议值如下:*3\r\n$3\r\nset\r\n$3\r\foo\r\n$3\r\bar\r\n
/** 例:querybuf = *3\r\n$3\r\nset\r\n$3\r\nfoo\r\n$3\r\nbar\r\n*/int processmultibulkbuffer(redisclient *c) { char *newline = null; int pos = 0, ok; long long ll; if (c->multibulklen == 0) {//参数数目为0,表示这是新的请求指令 /* the client should have been reset */ redisassertwithinfo(c,null,c->argc == 0); /* multi bulk length cannot be read without a \r\n */ newline = strchr(c->querybuf,'\r'); if (newline == null) { if (sdslen(c->querybuf) > redis_inline_max_size) { addreplyerror(c,protocol error: too big mbulk count string); setprotocolerror(c,0); } return redis_err; } /* buffer should also contain \n */ if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2)) return redis_err; /* we know for sure there is a whole line since newline != null, * so go ahead and find out the multi bulk length. */ redisassertwithinfo(c,null,c->querybuf[0] == '*'); //将字符串转为long long整数,转换得到的结果存到ll中,ll就是后面参数的个数 ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll); if (!ok || ll > 1024*1024) { addreplyerror(c,protocol error: invalid multibulk length); setprotocolerror(c,pos); return redis_err; } pos = (newline-c->querybuf)+2;//跳过\r\n if (ll ello world */ sdsrange(c->querybuf,pos,-1);//querybuf=$3\r\nset\r\n$3\r\nfoo\r\n$3\r\nbar\r\n return redis_ok; } c->multibulklen = ll;//得到指令参数个数 /* setup argv array on client structure */ if (c->argv) zfree(c->argv); c->argv = zmalloc(sizeof(robj*) * c->multibulklen);//申请参数内存空间 } redisassertwithinfo(c,null,c->multibulklen > 0); /** 开始抽取字符串 querybuf = *3\r\n$3\r\nset\r\n$3\r\nfoo\r\n$3\r\nbar\r\n pos = 4 */ while(c->multibulklen) { /* read bulk length if unknown */ if (c->bulklen == -1) {//参数的长度为-1,这里用来处理每个参数的字符串长度值 /**newline = \r\nset\r\n$3\r\nfoo\r\n$3\r\nbar\r\n*/ newline = strchr(c->querybuf+pos,'\r'); if (newline == null) { if (sdslen(c->querybuf) > redis_inline_max_size) { addreplyerror(c,protocol error: too big bulk count string); setprotocolerror(c,0); } break; } /* buffer should also contain \n */ if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2)) break; //每个字符串以$开头,后面的数字表示其长度 if (c->querybuf[pos] != '$') { addreplyerrorformat(c, protocol error: expected '$', got '%c', c->querybuf[pos]); setprotocolerror(c,pos); return redis_err; } //得到字符串的长度值,ll ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll); if (!ok || ll 512*1024*1024) { addreplyerror(c,protocol error: invalid bulk length); setprotocolerror(c,pos); return redis_err; } //pos = 8 pos += newline-(c->querybuf+pos)+2;//跳过\r\n set\r\n$3\r\nfoo\r\n$3\r\nbar\r if (ll >= redis_mbulk_big_arg) {//字符串长度超过1024*32,需要扩展 size_t qblen; /* if we are going to read a large object from network * try to make it likely that it will start at c->querybuf * boundary so that we can optimize object creation * avoiding a large copy of data. */ /** sdsrange(querybuf,pos,-1)是将[pos,len-1]之间的字符串使用memmove前移, 然后后面的直接截断 */ sdsrange(c->querybuf,pos,-1);//set\r\n$3\r\nfoo\r\n$3\r\nbar\r pos = 0; qblen = sdslen(c->querybuf); /* hint the sds library about the amount of bytes this string is * going to contain. */ if (qblen querybuf = sdsmakeroomfor(c->querybuf,ll+2-qblen); } c->bulklen = ll; } /* read bulk argument */ //读取参数,没有\r\n表示数据不全,也就是说服务器接收到的数据不完整 if (sdslen(c->querybuf)-pos bulklen+2)) { /* not enough data (+2 == trailing \r\n) */ break; } else {//数据完整 /* optimization: if the buffer contains just our bulk element * instead of creating a new object by *copying* the sds we * just use the current sds string. */ if (pos == 0 && c->bulklen >= redis_mbulk_big_arg && (signed) sdslen(c->querybuf) == c->bulklen+2) {//数据刚好完整,那么就直接使用c->querybuf,然后清空querybuf,注意这里只可能在最后一个字符串才可能出现 c->argv[c->argc++] = createobject(redis_string,c->querybuf); sdsincrlen(c->querybuf,-2); /* remove crlf */ c->querybuf = sdsempty(); /* assume that if we saw a fat argument we'll see another one * likely... */ c->querybuf = sdsmakeroomfor(c->querybuf,c->bulklen+2); pos = 0; } else { //抽取出具体的字符串,比如set,建立一个stringobject c->argv[c->argc++] = createstringobject(c->querybuf+pos,c->bulklen); pos += c->bulklen+2;//跳过\r\n } c->bulklen = -1; c->multibulklen--; } } /** 由于采用的是非阻塞读取客户端数据的方式,那么如果c->multibulklen != 0,那么就表示 数据没有接收完全,首先需要将当前的querybuf数据截断 */ /* trim to pos */ if (pos) sdsrange(c->querybuf,pos,-1); /* we're done when c->multibulk == 0 */ if (c->multibulklen == 0) return redis_ok; /* still not read to process the command */ return redis_err;}
processcommand与call函数客户端指令解析完之后,需要执行该指令,执行指令的两个函数为processcommand与call函数,这两个函数除了单纯的执行指令外,还做了许多其他的工作,这里不详解,看代码仅仅找到指令如何执行还是很简单的。
指令执行完之后,需要将得到的结果集返回给客户端,这部分是如何工作的,下面开始分析。
在networking.c中可以发现许多以addrelpy为前缀的函数名,这些函数都是用来处理各种不同类型的结果的,我们以典型的addreply函数为例,进行分析。
addreply函数
该函数第一步工作就是调用prepareclienttowrite函数为客户端创建一个写文件事件,事件的处理函数即将结果集发送给客户端的函数为sendreplytoclient.
int prepareclienttowrite(redisclient *c) { if (c->flags & redis_lua_client) return redis_ok; if ((c->flags & redis_master) && !(c->flags & redis_master_force_reply)) return redis_err; if (c->fd bufpos == 0 && listlength(c->reply) == 0 && (c->replstate == redis_repl_none || c->replstate == redis_repl_online) && aecreatefileevent(server.el, c->fd, ae_writable, sendreplytoclient, c) == ae_err) return redis_err; return redis_ok;}
第二步,就是根据相应的条件,将得到的结果rboj数据存储到buf中或者reply链表中。对于存储的策略:redis优先将数据存储在固定大小的buf中,也就是redisclient结构体buf[redis_reply_chunk_bytes]里,默认大小为16k。如果有数据没有发送完或c->buf空间不足,就会放到c->reply链表里面,链表每个节点都是内存buf,后来的数据放入最后面。具体的处理函数为_addreplytobuffer和_addreplystringtolist两个函数。void addreply(redisclient *c, robj *obj) { if (prepareclienttowrite(c) != redis_ok) return; /* this is an important place where we can avoid copy-on-write * when there is a saving child running, avoiding touching the * refcount field of the object if it's not needed. * * if the encoding is raw and there is room in the static buffer * we'll be able to send the object to the client without * messing with its page. */ if (obj->encoding == redis_encoding_raw) {//字符串类型 //是否能将数据追加到c->buf中 if (_addreplytobuffer(c,obj->ptr,sdslen(obj->ptr)) != redis_ok) _addreplyobjecttolist(c,obj);//添加到c->reply链表中 } else if (obj->encoding == redis_encoding_int) {//整数类型 /* optimization: if there is room in the static buffer for 32 bytes * (more than the max chars a 64 bit integer can take as string) we * avoid decoding the object and go for the lower level approach. */ //追加到c->buf中 if (listlength(c->reply) == 0 && (sizeof(c->buf) - c->bufpos) >= 32) { char buf[32]; int len; len = ll2string(buf,sizeof(buf),(long)obj->ptr);//整型转string if (_addreplytobuffer(c,buf,len) == redis_ok) return; /* else... continue with the normal code path, but should never * happen actually since we verified there is room. */ } obj = getdecodedobject(obj);//64位整数,先转换为字符串 if (_addreplytobuffer(c,obj->ptr,sdslen(obj->ptr)) != redis_ok) _addreplyobjecttolist(c,obj); decrrefcount(obj); } else { redispanic(wrong obj->encoding in addreply()); }}
/** server将数据发送给client,有两种存储数据的缓冲形式,具体参见redisclient结构体 1、response buffer int bufpos; //回复 char buf[redis_reply_chunk_bytes]; //长度为16 * 1024 2、list *reply; unsigned long reply_bytes; tot bytes of objects in reply list int sentlen; 已发送的字节数 如果已经使用reply的形式或者buf已经不够存储,那么就将数据添加到list *reply中 否则将数据添加到buf中*/int _addreplytobuffer(redisclient *c, char *s, size_t len) { size_t available = sizeof(c->buf)-c->bufpos;//计算出c->buf的剩余长度 if (c->flags & redis_close_after_reply) return redis_ok; /* if there already are entries in the reply list, we cannot * add anything more to the static buffer. */ if (listlength(c->reply) > 0) return redis_err; /* check that the buffer has enough space available for this string. */ if (len > available) return redis_err; //回复数据追加到buf中 memcpy(c->buf+c->bufpos,s,len); c->bufpos+=len; return redis_ok;}/** 1、如果链表长度为0: 新建一个节点并直接将robj追加到链表的尾部 2、链表长度不为0: 首先取出链表的尾部节点 1)、尾部节点的字符串长度 + robj中ptr字符串的长度 ptr追加到尾节点的tail->ptr后面 2)、反之: 新建一个节点并直接将robj追加到链表的尾部*/void _addreplyobjecttolist(redisclient *c, robj *o) { robj *tail; if (c->flags & redis_close_after_reply) return; //链表长度为0 if (listlength(c->reply) == 0) { incrrefcount(o);//增加引用次数 listaddnodetail(c->reply,o);//添加到链表末尾 c->reply_bytes += zmalloc_size_sds(o->ptr); //计算o->ptr的占用内存大小 } else { //取出链表尾中的数据 tail = listnodevalue(listlast(c->reply)); /* append to this object when possible. */ // 如果最后一个节点所保存的回复加上新回复内容总长度小于等于 redis_reply_chunk_bytes // 那么将新回复追加到节点回复当中。 if (tail->ptr != null && sdslen(tail->ptr)+sdslen(o->ptr) reply_bytes -= zmalloc_size_sds(tail->ptr); tail = duplastobjectifneeded(c->reply); tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr)); c->reply_bytes += zmalloc_size_sds(tail->ptr); } else {//为新回复单独创建一个节点 incrrefcount(o); listaddnodetail(c->reply,o); c->reply_bytes += zmalloc_size_sds(o->ptr); } } // 如果突破了客户端的最大缓存限制,那么关闭客户端 asynccloseclientonoutputbufferlimitreached(c);}
sendreplytoclient函数终于到了最后一步,把c->buf与c->reply中的数据发送给客户端即可,发送同样使用的是最原始的write函数。发送完成之后,redis将当前客户端释放,并且删除写事件,代码比较简单,不详细解释。
小结本文粗略的介绍了redis整体运行的流程,从服务器的角度,介绍redis是如何初始化,创建socket,接收客户端请求,解析请求及指令的执行,反馈执行的结果集给客户端等。如果读者想更深入的了解redis的运行机制,需要亲自阅读源码,本文可以用作参考。同时也是学习linux socket编程的好工具,原本简简单单的socket->bind->listen->accept->read->write也可以用来做许多高效的业务,是linux socket学习的不二选择。
黄南分类信息网,免费分类信息发布

VIP推荐

免费发布信息,免费发布B2B信息网站平台 - 三六零分类信息网 沪ICP备09012988号-2
企业名录