skynet的服务管理

  • skynet 是一个 actor 模型的框架,actor 就是 skynet 服务,对应代码中的结构体 skynet_context
  • skynet 中存在若干个 skynet_context ,这些对象通过 handle_storage 来进行管理,根据 handle_storage 可以进行增删查操作
 // skynet-src/skynet_handle.c
 19 struct handle_storage {
 20     struct rwlock lock;
 21
 22     uint32_t harbor;
 23     uint32_t handle_index;          // 最后一个服务在数组中的下标 
 24     int slot_size;                  // 服务数组的size(最大容量,不足时动态扩展)
 25     struct skynet_context ** slot;  // 服务的指针数组
 26
 27     int name_cap;
 28     int name_count;
 29     struct handle_name *name;       // 服务 name 数组,用于根据 name 查询到服务
 30 };
  • skynet 用结构体 handle_storge 来管理其服务
  • 服务的注册接口是 skynet_handle_register ,新创建的服务都是通过该接口加入到管理器中,函数返回服务在 slot 中的下标
 // skynet-src/skynet_handle.c
 34 uint32_t
 35 skynet_handle_register(struct skynet_context *ctx) {
 36     struct handle_storage *s = H;
 37
 38     rwlock_wlock(&s->lock);
 39
 40     for (;;) {
 41         int i;
 42         uint32_t handle = s->handle_index;
 43         for (i=0;i<s->slot_size;i++,handle++) {
 44             if (handle > HANDLE_MASK) {
 45                 // 0 is reserved
 46                 handle = 1;
 47             }
 48             int hash = handle & (s->slot_size-1);
 49             if (s->slot[hash] == NULL) {
 50                 s->slot[hash] = ctx;
 51                 s->handle_index = handle + 1;
 52
 53                 rwlock_wunlock(&s->lock);
 54
 55                 handle |= s->harbor;
 56                 return handle;
 57             }
 58         }
 59         assert((s->slot_size*2 - 1) <= HANDLE_MASK);
 60         struct skynet_context ** new_slot = skynet_malloc(s->slot_size * 2 * sizeof(struct skynet_context *));
 61         memset(new_slot, 0, s->slot_size * 2 * sizeof(struct skynet_context *));
 62         for (i=0;i<s->slot_size;i++) {
 63             if (s->slot[i]) {
 64                 int hash = skynet_context_handle(s->slot[i]) & (s->slot_size * 2 - 1);
 65                 assert(new_slot[hash] == NULL);
 66                 new_slot[hash] = s->slot[i];
 67             }
 68         }
 69         skynet_free(s->slot);
 70         s->slot = new_slot;
 71         s->slot_size *= 2;
 72     }
 73 }
  • 如果容量足够,直接加入到 slot 中( 43 ~ 56),并返回其在 slot 中的下标
  • 容量不足时动态扩容为原来容量的两倍,将旧数据迁移到新数组中( 60 ~ 71 )
// skynet-src/skynet_handle.c
138 struct skynet_context *
139 skynet_handle_grab(uint32_t handle) {
140     struct handle_storage *s = H;
141     struct skynet_context * result = NULL;
142
143     rwlock_rlock(&s->lock);
144
145     uint32_t hash = handle & (s->slot_size-1);
146     struct skynet_context * ctx = s->slot[hash];
147     if (ctx && skynet_context_handle(ctx) == handle) {
148         result = ctx;
149         skynet_context_grab(result);
150     }
151
152     rwlock_runlock(&s->lock);
153
154     return result;
155 }
  • skynet 提供根据 handle 查询到 skynet_context 的接口 skynet_handle_grab
  • skynet 提供为服务命名的接口( skynet_handle_namehandle ),该接口将 name 和 handle 建立对应关系,这样就可以凭借 name 查询到对应的服务( skynet_handle_findname ),代码都在文件 skynet-src/skynet_handle.h 中,这里不多赘述
  • skynet 启用多线程来分发消息,线程最终都会调用到函数 skynet_context_message_dispatch
// skynet-src/skynet_server.c
297 struct message_queue *-
298 skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
299 >   if (q == NULL) {
300 >   >   q = skynet_globalmq_pop();
301 >   >   if (q==NULL)
302 >   >   >   return NULL;
303 >   }
304
305 >   uint32_t handle = skynet_mq_handle(q);
306
307 >   struct skynet_context * ctx = skynet_handle_grab(handle);
308 >   if (ctx == NULL) {
309 >   >   struct drop_t d = { handle };
310 >   >   skynet_mq_release(q, drop_message, &d);
311 >   >   return skynet_globalmq_pop();
312 >   }
313
314 >   int i,n=1;
315 >   struct skynet_message msg;
316
317 >   for (i=0;i<n;i++) {
318 >   >   if (skynet_mq_pop(q,&msg)) {
319 >   >   >   skynet_context_release(ctx);
320 >   >   >   return skynet_globalmq_pop();
321 >   >   } else if (i==0 && weight >= 0) {
322 >   >   >   n = skynet_mq_length(q);
323 >   >   >   n >>= weight;
324 >   >   }
325 >   >   int overload = skynet_mq_overload(q);
326 >   >   if (overload) {
327 >   >   >   skynet_error(ctx, "May overload, message queue length = %d", overload);
328 >   >   }
329
330 >   >   skynet_monitor_trigger(sm, msg.source , handle);
331
332 >   >   if (ctx->cb == NULL) {
333 >   >   >   skynet_free(msg.data);
334 >   >   } else {
335 >   >   >   dispatch_message(ctx, &msg);
336 >   >   }
337
338 >   >   skynet_monitor_trigger(sm, 0,0);
339 >   }
340
341 >   assert(q == ctx->queue);
342 >   struct message_queue *nq = skynet_globalmq_pop();
343 >   if (nq) {
344 >   >   // If global mq is not empty , push q back, and return next queue (nq)
345 >   >   // Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
346 >   >   skynet_globalmq_push(q);
347 >   >   q = nq;
348 >   }-
349 >   skynet_context_release(ctx);
350
351 >   return q;
352 }
  • 先在全局队列中取出次级队列 q( 299 ~ 303),根据 q 的 handle 字段查询对应的服务 ctx,根据该线程分配的权重计算出一次处理消息的数量 n (318 ~ 324),调用 dispatch_message 处理单条消息
  • 这里先从全局队列中取出次级队列,就表示次级队列不会同时处于二个或多个线程当中,就不存在冲突的问题了
 // skynet-src/skynet_server.c
 75 int
 76 skynet_handle_retire(uint32_t handle) {
 77     int ret = 0;
 78     struct handle_storage *s = H;
 79
 80     rwlock_wlock(&s->lock);
 81
 82     uint32_t hash = handle & (s->slot_size-1);
 83     struct skynet_context * ctx = s->slot[hash];
 84
 85     if (ctx != NULL && skynet_context_handle(ctx) == handle) {
 86         s->slot[hash] = NULL;
 87         ret = 1;
 88         int i;
 89         int j=0, n=s->name_count;
 90         for (i=0; i<n; ++i) {
 91             if (s->name[i].handle == handle) {
 92                 skynet_free(s->name[i].name);
 93                 continue;
 94             } else if (i!=j) {
 95                 s->name[j] = s->name[i];
 96             }
 97             ++j;
 98         }
 99         s->name_count = j;
100     } else {
101         ctx = NULL;
102     }
103
104     rwlock_wunlock(&s->lock);
105
106     if (ctx) {
107         // release ctx may call skynet_handle_* , so wunlock first.
108         skynet_context_release(ctx);
109     }
110
111     return ret;
112 }
  • 根据 handle 找到 ctx,清空该 ctx 的 name 和 handle 的关联,调用 skynet_context_release 释放 ctx