- skynet 是一个 actor 模型的框架,actor 就是 skynet 服务,对应代码中的结构体 skynet_context
- skynet 中存在若干个 skynet_context ,这些对象通过 handle_storage 来进行管理,根据 handle_storage 可以进行增删查操作
// skynet-src/skynet_handle.c
19 struct handle_storage {
20 struct rwlock lock;
21
22 uint32_t harbor;
23 uint32_t handle_index; // 最后一个服务在数组中的下标
24 int slot_size; // 服务数组的size(最大容量,不足时动态扩展)
25 struct skynet_context ** slot; // 服务的指针数组
26
27 int name_cap;
28 int name_count;
29 struct handle_name *name; // 服务 name 数组,用于根据 name 查询到服务
30 };
- skynet 用结构体 handle_storge 来管理其服务
- 服务的注册接口是 skynet_handle_register ,新创建的服务都是通过该接口加入到管理器中,函数返回服务在 slot 中的下标
// skynet-src/skynet_handle.c
34 uint32_t
35 skynet_handle_register(struct skynet_context *ctx) {
36 struct handle_storage *s = H;
37
38 rwlock_wlock(&s->lock);
39
40 for (;;) {
41 int i;
42 uint32_t handle = s->handle_index;
43 for (i=0;i<s->slot_size;i++,handle++) {
44 if (handle > HANDLE_MASK) {
45 // 0 is reserved
46 handle = 1;
47 }
48 int hash = handle & (s->slot_size-1);
49 if (s->slot[hash] == NULL) {
50 s->slot[hash] = ctx;
51 s->handle_index = handle + 1;
52
53 rwlock_wunlock(&s->lock);
54
55 handle |= s->harbor;
56 return handle;
57 }
58 }
59 assert((s->slot_size*2 - 1) <= HANDLE_MASK);
60 struct skynet_context ** new_slot = skynet_malloc(s->slot_size * 2 * sizeof(struct skynet_context *));
61 memset(new_slot, 0, s->slot_size * 2 * sizeof(struct skynet_context *));
62 for (i=0;i<s->slot_size;i++) {
63 if (s->slot[i]) {
64 int hash = skynet_context_handle(s->slot[i]) & (s->slot_size * 2 - 1);
65 assert(new_slot[hash] == NULL);
66 new_slot[hash] = s->slot[i];
67 }
68 }
69 skynet_free(s->slot);
70 s->slot = new_slot;
71 s->slot_size *= 2;
72 }
73 }
- 如果容量足够,直接加入到 slot 中( 43 ~ 56),并返回其在 slot 中的下标
- 容量不足时动态扩容为原来容量的两倍,将旧数据迁移到新数组中( 60 ~ 71 )
// skynet-src/skynet_handle.c
138 struct skynet_context *
139 skynet_handle_grab(uint32_t handle) {
140 struct handle_storage *s = H;
141 struct skynet_context * result = NULL;
142
143 rwlock_rlock(&s->lock);
144
145 uint32_t hash = handle & (s->slot_size-1);
146 struct skynet_context * ctx = s->slot[hash];
147 if (ctx && skynet_context_handle(ctx) == handle) {
148 result = ctx;
149 skynet_context_grab(result);
150 }
151
152 rwlock_runlock(&s->lock);
153
154 return result;
155 }
- skynet 提供根据 handle 查询到 skynet_context 的接口 skynet_handle_grab
- skynet 提供为服务命名的接口( skynet_handle_namehandle ),该接口将 name 和 handle 建立对应关系,这样就可以凭借 name 查询到对应的服务( skynet_handle_findname ),代码都在文件 skynet-src/skynet_handle.h 中,这里不多赘述
- skynet 启用多线程来分发消息,线程最终都会调用到函数 skynet_context_message_dispatch
// skynet-src/skynet_server.c
297 struct message_queue *-
298 skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
299 > if (q == NULL) {
300 > > q = skynet_globalmq_pop();
301 > > if (q==NULL)
302 > > > return NULL;
303 > }
304
305 > uint32_t handle = skynet_mq_handle(q);
306
307 > struct skynet_context * ctx = skynet_handle_grab(handle);
308 > if (ctx == NULL) {
309 > > struct drop_t d = { handle };
310 > > skynet_mq_release(q, drop_message, &d);
311 > > return skynet_globalmq_pop();
312 > }
313
314 > int i,n=1;
315 > struct skynet_message msg;
316
317 > for (i=0;i<n;i++) {
318 > > if (skynet_mq_pop(q,&msg)) {
319 > > > skynet_context_release(ctx);
320 > > > return skynet_globalmq_pop();
321 > > } else if (i==0 && weight >= 0) {
322 > > > n = skynet_mq_length(q);
323 > > > n >>= weight;
324 > > }
325 > > int overload = skynet_mq_overload(q);
326 > > if (overload) {
327 > > > skynet_error(ctx, "May overload, message queue length = %d", overload);
328 > > }
329
330 > > skynet_monitor_trigger(sm, msg.source , handle);
331
332 > > if (ctx->cb == NULL) {
333 > > > skynet_free(msg.data);
334 > > } else {
335 > > > dispatch_message(ctx, &msg);
336 > > }
337
338 > > skynet_monitor_trigger(sm, 0,0);
339 > }
340
341 > assert(q == ctx->queue);
342 > struct message_queue *nq = skynet_globalmq_pop();
343 > if (nq) {
344 > > // If global mq is not empty , push q back, and return next queue (nq)
345 > > // Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
346 > > skynet_globalmq_push(q);
347 > > q = nq;
348 > }-
349 > skynet_context_release(ctx);
350
351 > return q;
352 }
- 先在全局队列中取出次级队列 q( 299 ~ 303),根据 q 的 handle 字段查询对应的服务 ctx,根据该线程分配的权重计算出一次处理消息的数量 n (318 ~ 324),调用 dispatch_message 处理单条消息
- 这里先从全局队列中取出次级队列,就表示次级队列不会同时处于二个或多个线程当中,就不存在冲突的问题了
// skynet-src/skynet_server.c
75 int
76 skynet_handle_retire(uint32_t handle) {
77 int ret = 0;
78 struct handle_storage *s = H;
79
80 rwlock_wlock(&s->lock);
81
82 uint32_t hash = handle & (s->slot_size-1);
83 struct skynet_context * ctx = s->slot[hash];
84
85 if (ctx != NULL && skynet_context_handle(ctx) == handle) {
86 s->slot[hash] = NULL;
87 ret = 1;
88 int i;
89 int j=0, n=s->name_count;
90 for (i=0; i<n; ++i) {
91 if (s->name[i].handle == handle) {
92 skynet_free(s->name[i].name);
93 continue;
94 } else if (i!=j) {
95 s->name[j] = s->name[i];
96 }
97 ++j;
98 }
99 s->name_count = j;
100 } else {
101 ctx = NULL;
102 }
103
104 rwlock_wunlock(&s->lock);
105
106 if (ctx) {
107 // release ctx may call skynet_handle_* , so wunlock first.
108 skynet_context_release(ctx);
109 }
110
111 return ret;
112 }
- 根据 handle 找到 ctx,清空该 ctx 的 name 和 handle 的关联,调用 skynet_context_release 释放 ctx