skynet的服务

  • skynet 可以根据动态库(具有一定的规范)动态的创建出服务
  • skynet启用多线程处理服务处理消息
  • 服务并不会主动执行逻辑,只有当服务接受到消息时才会执行相应的逻辑(包括定时器,skynet 的定时器是以消息的方式通知给服务的)
  • skynet 自带有多个服务如 logger,snlua等,要想比较快的理解 skynet 的服务,可以从比较简单的服务入手(如 logger )
  • skynet 服务对应结构体 skynet_context
// skynet-src/skynet_server.c
struct skynet_context {
	void * instance;                // 服务实例指针
	struct skynet_module * mod;     // 动态库指针
	void * cb_ud;                   // 用于回调的指针
	skynet_cb cb;                   // 回调函数指针
	struct message_queue *queue;    // 服务消息队列
	FILE * logfile;                 // for 服务日志
	uint64_t cpu_cost;
	uint64_t cpu_start;
	char result[32];                // 存放性能指标的查询结果
	uint32_t handle;                // 服务的id
	int session_id;                 // 消息的session id分配器
	int ref;                        // 服务引用计数
	int message_count;              // 已处理过的消息总数
	bool init;                      // 初始化成功的标识
	bool endless;                   // 死循环标识
	bool profile;                   // cpu 性能指标开启开关

	CHECKCALLING_DECL
};
  • skynet 通过接口 skynet_context_new 动态创建服务
// skynet-src/skynet_server.c
124 struct skynet_context *
125 skynet_context_new(const char * name, const char *param) {
126     struct skynet_module * mod = skynet_module_query(name);
127
128     if (mod == NULL)
129         return NULL;
130
131     void *inst = skynet_module_instance_create(mod);
132     if (inst == NULL)
133         return NULL;
134     struct skynet_context * ctx = skynet_malloc(sizeof(*ctx));
135     CHECKCALLING_INIT(ctx)
136
137     ctx->mod = mod;
138     ctx->instance = inst;
139     ATOM_INIT(&ctx->ref , 2);
140     ctx->cb = NULL;
141     ctx->cb_ud = NULL;
142     ctx->session_id = 0;
143     ATOM_INIT(&ctx->logfile, (uintptr_t)NULL);
144
145     ctx->init = false;
146     ctx->endless = false;
147
148     ctx->cpu_cost = 0;
149     ctx->cpu_start = 0;
150     ctx->message_count = 0;
151     ctx->profile = G_NODE.profile;
152     // Should set to 0 first to avoid skynet_handle_retireall get an uninitialized handle
153     ctx->handle = 0;
154     ctx->handle = skynet_handle_register(ctx);
155     struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);
156     // init function maybe use ctx->handle, so it must init at last
157     context_inc();
158
159     CHECKCALLING_BEGIN(ctx)
160     int r = skynet_module_instance_init(mod, inst, ctx, param);
161     CHECKCALLING_END(ctx)
162     if (r == 0) {
163         struct skynet_context * ret = skynet_context_release(ctx);
164         if (ret) {
165             ctx->init = true;
166         }
167         skynet_globalmq_push(queue);
168         if (ret) {
169             skynet_error(ret, "LAUNCH %s %s", name, param ? param : "");
170         }
171         return ret;
172     } else {
173         skynet_error(ctx, "FAILED launch %s", name);
174         uint32_t handle = ctx->handle;
175         skynet_context_release(ctx);
176         skynet_handle_retire(handle);
177         struct drop_t d = { handle };
178         skynet_mq_release(queue, drop_message, &d);
179         return NULL;
180     }
181 }
  • 根据 name 通过接口 skynet_module_query 获取对应动态库句柄 mod( 126 行),mod 中包含四个函数指针(create,init,signal,release)
  • 调用mod 的 create 函数创建实例 inst( 131 行)
  • 创建 skynet_context 实例 ctx,并进行一系列赋值,把 ctx 注册到 handle_storage ( 134 ~ 154)
  • 创建该服务的消息队列 queue,调用 so 库的 init 函数进行初始化( 160 行),将 queue 加入到全局队列中
 // skynet-src/skynet_mq.c
 21 struct message_queue {
 22 >   struct spinlock lock;               // 自旋锁
 23 >   uint32_t handle;                    // 服务id
 24 >   int cap;
 25 >   int head;
 26 >   int tail;
 27 >   int release;
 28 >   int in_global;
 29 >   int overload;
 30 >   int overload_threshold;
 31 >   struct skynet_message *queue;       // 消息队列
 32 >   struct message_queue *next;         // 下个队列的指针
 33 };
  • 每个服务实例有都有一个消息队列 message_queue,服务待处理的消息都放在 queue 中
  • 所有的消息队列连成一个链表( next 指针),全局队列 global_queue 的 head 和 tail 存放这个链表的头指针和尾指针
  • 向服务发送消息,其实就是把消息压入服务的消息队列中
700 int
701 skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz    ) {
702 >   if ((sz & MESSAGE_TYPE_MASK) != sz) {
703 >   >   skynet_error(context, "The message to %x is too large", destination);
704 >   >   if (type & PTYPE_TAG_DONTCOPY) {
705 >   >   >   skynet_free(data);
706 >   >   }
707 >   >   return -2;
708 >   }
709 >   _filter_args(context, type, &session, (void **)&data, &sz);
710
711 >   if (source == 0) {
712 >   >   source = context->handle;
713 >   }
714
715 >   if (destination == 0) {
716 >   >   if (data) {
717 >   >   >   skynet_error(context, "Destination address can't be 0");
718 >   >   >   skynet_free(data);
719 >   >   >   return -1;
720 >   >   }
721
722 >   >   return session;
723 >   }
724 >   if (skynet_harbor_message_isremote(destination)) {
725 >   >   struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
726 >   >   rmsg->destination.handle = destination;
727 >   >   rmsg->message = data;
728 >   >   rmsg->sz = sz & MESSAGE_TYPE_MASK;
729 >   >   rmsg->type = sz >> MESSAGE_TYPE_SHIFT;
730 >   >   skynet_harbor_send(rmsg, source, session);
731 >   } else {
732 >   >   struct skynet_message smsg;     // 通用的消息结构
733 >   >   smsg.source = source;           // 消息的来源,既发送消息的服务id
734 >   >   smsg.session = session;         // 消息的session
735 >   >   smsg.data = data;               // 消息的数据
736 >   >   smsg.sz = sz;                   // 消息类型和消息数据的长度
737
738 >   >   if (skynet_context_push(destination, &smsg)) {
739 >   >   >   skynet_free(data);
740 >   >   >   return -1;
741 >   >   }
742 >   }
743 >   return session;
744 }
  • 服务通过接口 skynet_send 向其他服务发送消息,实际上是把信息打包为 skynet 的通用消息 skynet_message,通过 skynet_context_push 接口将其 push 到全局队列中去
  • skynet 启用多个线程来跑函数 skynet_context_message_dispatch
// skynet-src/skynet_server.c
297 struct message_queue *-
298 skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
299 >   if (q == NULL) {
300 >   >   q = skynet_globalmq_pop();
301 >   >   if (q==NULL)
302 >   >   >   return NULL;
303 >   }
304
305 >   uint32_t handle = skynet_mq_handle(q);
306
307 >   struct skynet_context * ctx = skynet_handle_grab(handle);
308 >   if (ctx == NULL) {
309 >   >   struct drop_t d = { handle };
310 >   >   skynet_mq_release(q, drop_message, &d);
311 >   >   return skynet_globalmq_pop();
312 >   }
313
314 >   int i,n=1;
315 >   struct skynet_message msg;
316
317 >   for (i=0;i<n;i++) {
318 >   >   if (skynet_mq_pop(q,&msg)) {
319 >   >   >   skynet_context_release(ctx);
320 >   >   >   return skynet_globalmq_pop();
321 >   >   } else if (i==0 && weight >= 0) {
322 >   >   >   n = skynet_mq_length(q);
323 >   >   >   n >>= weight;
324 >   >   }
325 >   >   int overload = skynet_mq_overload(q);
326 >   >   if (overload) {
327 >   >   >   skynet_error(ctx, "May overload, message queue length = %d", overload);
328 >   >   }
329
330 >   >   skynet_monitor_trigger(sm, msg.source , handle);
331
332 >   >   if (ctx->cb == NULL) {
333 >   >   >   skynet_free(msg.data);
334 >   >   } else {
335 >   >   >   dispatch_message(ctx, &msg);
336 >   >   }
337
338 >   >   skynet_monitor_trigger(sm, 0,0);
339 >   }
340
341 >   assert(q == ctx->queue);
342 >   struct message_queue *nq = skynet_globalmq_pop();
343 >   if (nq) {
344 >   >   // If global mq is not empty , push q back, and return next queue (nq)
345 >   >   // Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
346 >   >   skynet_globalmq_push(q);
347 >   >   q = nq;
348 >   }-
349 >   skynet_context_release(ctx);
350
351 >   return q;
352 }
  • 先全局消息队列中弹出一个次级队列,根据权重获取本次处理消息的数量,每次循环从次级队列中弹出消息,调用 dispatch_message 处理消息,最后如果队列中还有消息就放回全局队列中(服务每次处理若干条消息,而不是一次处理完全部,这样做是防止其他服务饥饿)
// skynet-src/skynet_server.c
260 static void
261 dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
262 >   assert(ctx->init);
263 >   CHECKCALLING_BEGIN(ctx)
264 >   pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));
265 >   int type = msg->sz >> MESSAGE_TYPE_SHIFT;
266 >   size_t sz = msg->sz & MESSAGE_TYPE_MASK;
267 >   FILE *f = (FILE *)ATOM_LOAD(&ctx->logfile);
268 >   if (f) {
269 >   >   skynet_log_output(f, msg->source, type, msg->session, msg->data, sz);
270 >   }
271 >   ++ctx->message_count;
272 >   int reserve_msg;
273 >   if (ctx->profile) {
274 >   >   ctx->cpu_start = skynet_thread_time();
275 >   >   reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
276 >   >   uint64_t cost_time = skynet_thread_time() - ctx->cpu_start;
277 >   >   ctx->cpu_cost += cost_time;
278 >   } else {
279 >   >   reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
280 >   }
281 >   if (!reserve_msg) {
282 >   >   skynet_free(msg->data);
283 >   }
284 >   CHECKCALLING_END(ctx)
285 }
  • 解包消息 msg,并调用 ctx->cb ,最后根据返回值 reserve_msg 来决定是否释放 msg->data 的空间
  • 用户通过接口 skynet_callback 来设置 ctx->cb,通常是在动态库句柄的 init中,如 logger 服务的 callback 就是在 logger_init 中