void delCommand(client *c) { delGenericCommand(c,server.lazyfree_lazy_user_del); } /* This command implements DEL and LAZYDEL. */ void delGenericCommand(client *c, int lazy) { int numdel = 0, j; for (j = 1; j < c->argc; j ) { expireIfNeeded(c->db,c->argv[j]); // 根据配置确定DEL在执行时是否以lazy形式执行 int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) : dbSyncDelete(c->db,c->argv[j]); if (deleted) { signalModifiedKey(c,c->db,c->argv[j]); notifyKeyspaceEvent(NOTIFY_GENERIC, "del",c->argv[j],c->db->id); server.dirty ; numdel ; } } addReplyLongLong(c,numdel); }
/* Delete a key, value, and associated expiration entry if any, from the DB. * If there are enough allocations to free the value object may be put into * a lazy free list instead of being freed synchronously. The lazy free list * will be reclaimed in a different bio.c thread. */ #define LAZYFREE_THRESHOLD 64 int dbAsyncDelete(redisDb *db, robj *key) { /* Deleting an entry from the expires dict will not free the sds of * the key, because it is shared with the main dictionary. */ if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr); /* If the value is composed of a few allocations, to free in a lazy way * is actually just slower... So under a certain limit we just free * the object synchronously. */ dictEntry *de = dictUnlink(db->dict,key->ptr); if (de) { robj *val = dictGetVal(de); // 计算value的回收收益 size_t free_effort = lazyfreeGetFreeEffort(val); /* If releasing the object is too much work, do it in the background * by adding the object to the lazy free list. * Note that if the object is shared, to reclaim it now it is not * possible. This rarely happens, however sometimes the implementation * of parts of the Redis core may call incrRefCount() to protect * objects, and then call dbDelete(). In this case we'll fall * through and reach the dictFreeUnlinkedEntry() call, that will be * equivalent to just calling decrRefCount(). */ // 只有回收收益超过一定值,才会执行异步删除,否则还是会退化到同步删除 if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) { atomicIncr(lazyfree_objects,1); bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL); dictSetVal(db->dict,de,NULL); } } /* Release the key-val pair, or just the key if we set the val * field to NULL in order to lazy free it later. */ if (de) { dictFreeUnlinkedEntry(db->dict,de); if (server.cluster_enabled) slotToKeyDel(key->ptr); return 1; } else { return 0; } }
// 3.2.5版本ZSet节点实现,value定义robj *obj /* ZSETs use a specialized version of Skiplists */ typedef struct zskiplistNode { robj *obj; double score; struct zskiplistNode *backward; struct zskiplistLevel { struct zskiplistNode *forward; unsigned int span; } level[]; } zskiplistNode; // 6.0.10版本ZSet节点实现,value定义为sds ele /* ZSETs use a specialized version of Skiplists */ typedef struct zskiplistNode { sds ele; double score; struct zskiplistNode *backward; struct zskiplistLevel { struct zskiplistNode *forward; unsigned long span; } level[]; } zskiplistNode;
int handleClientsWithPendingReadsUsingThreads(void) { ... /* Distribute the clients across N different lists. */ listIter li; listNode *ln; listRewind(server.clients_pending_read,&li); int item_id = 0; // 将等待处理的客户端分配给I/O线程 while((ln = listNext(&li))) { client *c = listNodeValue(ln); int target_id = item_id % server.io_threads_num; listAddNodeTail(io_threads_list[target_id],c); item_id ; } ... /* Wait for all the other threads to end their work. */ // 轮训等待所有I/O线程处理完 while(1) { unsigned long pending = 0; for (int j = 1; j < server.io_threads_num; j ) pending = io_threads_pending[j]; if (pending == 0) break; } ... return processed; }
void *IOThreadMain(void *myid) { ... while(1) { ... // I/O线程执行读写操作 while((ln = listNext(&li))) { client *c = listNodeValue(ln); // io_threads_op判断是读还是写事件 if (io_threads_op == IO_THREADS_OP_WRITE) { writeToClient(c,0); } else if (io_threads_op == IO_THREADS_OP_READ) { readQueryFromClient(c->conn); } else { serverPanic("io_threads_op value is unknown"); } } listEmpty(io_threads_list[id]); io_threads_pending[id] = 0; if (tio_debug) printf("[%ld] Done\n", id); } }
END 十期推荐 【221期】面试官:谈谈内存泄漏和内存溢出的联系与区别 【222期】ZooKeeper 相关面试演练 【223期】面试官:在MySQL查询中,为什么要用小表驱动大表 【224期】MySQL索引相关面试演练 【225期】面试官:公司项目中Java的多线程一般用在哪些场景? 【226期】面试官:内存耗尽后Redis会发生什么 【227期】面试官:说说双重检查加锁单例模式为什么两次判断? 【228期】面试高频:Java常用的八大排序算法一网打尽! 【229期】面试官:怎么解决Eureka某一个服务挂掉的问题? 【230期】面试官:讲讲Bean的加载过程 ? ~