- skynet 可以根据动态库(具有一定的规范)动态的创建出服务
- skynet启用多线程处理服务处理消息
- 服务并不会主动执行逻辑,只有当服务接受到消息时才会执行相应的逻辑(包括定时器,skynet 的定时器是以消息的方式通知给服务的)
- skynet 自带有多个服务如 logger,snlua等,要想比较快的理解 skynet 的服务,可以从比较简单的服务入手(如 logger )
- skynet 服务对应结构体 skynet_context
// skynet-src/skynet_server.c
struct skynet_context {
void * instance; // 服务实例指针
struct skynet_module * mod; // 动态库指针
void * cb_ud; // 用于回调的指针
skynet_cb cb; // 回调函数指针
struct message_queue *queue; // 服务消息队列
FILE * logfile; // for 服务日志
uint64_t cpu_cost;
uint64_t cpu_start;
char result[32]; // 存放性能指标的查询结果
uint32_t handle; // 服务的id
int session_id; // 消息的session id分配器
int ref; // 服务引用计数
int message_count; // 已处理过的消息总数
bool init; // 初始化成功的标识
bool endless; // 死循环标识
bool profile; // cpu 性能指标开启开关
CHECKCALLING_DECL
};
- skynet 通过接口 skynet_context_new 动态创建服务
// skynet-src/skynet_server.c
124 struct skynet_context *
125 skynet_context_new(const char * name, const char *param) {
126 struct skynet_module * mod = skynet_module_query(name);
127
128 if (mod == NULL)
129 return NULL;
130
131 void *inst = skynet_module_instance_create(mod);
132 if (inst == NULL)
133 return NULL;
134 struct skynet_context * ctx = skynet_malloc(sizeof(*ctx));
135 CHECKCALLING_INIT(ctx)
136
137 ctx->mod = mod;
138 ctx->instance = inst;
139 ATOM_INIT(&ctx->ref , 2);
140 ctx->cb = NULL;
141 ctx->cb_ud = NULL;
142 ctx->session_id = 0;
143 ATOM_INIT(&ctx->logfile, (uintptr_t)NULL);
144
145 ctx->init = false;
146 ctx->endless = false;
147
148 ctx->cpu_cost = 0;
149 ctx->cpu_start = 0;
150 ctx->message_count = 0;
151 ctx->profile = G_NODE.profile;
152 // Should set to 0 first to avoid skynet_handle_retireall get an uninitialized handle
153 ctx->handle = 0;
154 ctx->handle = skynet_handle_register(ctx);
155 struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);
156 // init function maybe use ctx->handle, so it must init at last
157 context_inc();
158
159 CHECKCALLING_BEGIN(ctx)
160 int r = skynet_module_instance_init(mod, inst, ctx, param);
161 CHECKCALLING_END(ctx)
162 if (r == 0) {
163 struct skynet_context * ret = skynet_context_release(ctx);
164 if (ret) {
165 ctx->init = true;
166 }
167 skynet_globalmq_push(queue);
168 if (ret) {
169 skynet_error(ret, "LAUNCH %s %s", name, param ? param : "");
170 }
171 return ret;
172 } else {
173 skynet_error(ctx, "FAILED launch %s", name);
174 uint32_t handle = ctx->handle;
175 skynet_context_release(ctx);
176 skynet_handle_retire(handle);
177 struct drop_t d = { handle };
178 skynet_mq_release(queue, drop_message, &d);
179 return NULL;
180 }
181 }
- 根据 name 通过接口 skynet_module_query 获取对应动态库句柄 mod( 126 行),mod 中包含四个函数指针(create,init,signal,release)
- 调用mod 的 create 函数创建实例 inst( 131 行)
- 创建 skynet_context 实例 ctx,并进行一系列赋值,把 ctx 注册到 handle_storage ( 134 ~ 154)
- 创建该服务的消息队列 queue,调用 so 库的 init 函数进行初始化( 160 行),将 queue 加入到全局队列中
// skynet-src/skynet_mq.c
21 struct message_queue {
22 > struct spinlock lock; // 自旋锁
23 > uint32_t handle; // 服务id
24 > int cap;
25 > int head;
26 > int tail;
27 > int release;
28 > int in_global;
29 > int overload;
30 > int overload_threshold;
31 > struct skynet_message *queue; // 消息队列
32 > struct message_queue *next; // 下个队列的指针
33 };
- 每个服务实例有都有一个消息队列 message_queue,服务待处理的消息都放在 queue 中
- 所有的消息队列连成一个链表( next 指针),全局队列 global_queue 的 head 和 tail 存放这个链表的头指针和尾指针
- 向服务发送消息,其实就是把消息压入服务的消息队列中
700 int
701 skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz ) {
702 > if ((sz & MESSAGE_TYPE_MASK) != sz) {
703 > > skynet_error(context, "The message to %x is too large", destination);
704 > > if (type & PTYPE_TAG_DONTCOPY) {
705 > > > skynet_free(data);
706 > > }
707 > > return -2;
708 > }
709 > _filter_args(context, type, &session, (void **)&data, &sz);
710
711 > if (source == 0) {
712 > > source = context->handle;
713 > }
714
715 > if (destination == 0) {
716 > > if (data) {
717 > > > skynet_error(context, "Destination address can't be 0");
718 > > > skynet_free(data);
719 > > > return -1;
720 > > }
721
722 > > return session;
723 > }
724 > if (skynet_harbor_message_isremote(destination)) {
725 > > struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
726 > > rmsg->destination.handle = destination;
727 > > rmsg->message = data;
728 > > rmsg->sz = sz & MESSAGE_TYPE_MASK;
729 > > rmsg->type = sz >> MESSAGE_TYPE_SHIFT;
730 > > skynet_harbor_send(rmsg, source, session);
731 > } else {
732 > > struct skynet_message smsg; // 通用的消息结构
733 > > smsg.source = source; // 消息的来源,既发送消息的服务id
734 > > smsg.session = session; // 消息的session
735 > > smsg.data = data; // 消息的数据
736 > > smsg.sz = sz; // 消息类型和消息数据的长度
737
738 > > if (skynet_context_push(destination, &smsg)) {
739 > > > skynet_free(data);
740 > > > return -1;
741 > > }
742 > }
743 > return session;
744 }
- 服务通过接口 skynet_send 向其他服务发送消息,实际上是把信息打包为 skynet 的通用消息 skynet_message,通过 skynet_context_push 接口将其 push 到全局队列中去
- skynet 启用多个线程来跑函数 skynet_context_message_dispatch
// skynet-src/skynet_server.c
297 struct message_queue *-
298 skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
299 > if (q == NULL) {
300 > > q = skynet_globalmq_pop();
301 > > if (q==NULL)
302 > > > return NULL;
303 > }
304
305 > uint32_t handle = skynet_mq_handle(q);
306
307 > struct skynet_context * ctx = skynet_handle_grab(handle);
308 > if (ctx == NULL) {
309 > > struct drop_t d = { handle };
310 > > skynet_mq_release(q, drop_message, &d);
311 > > return skynet_globalmq_pop();
312 > }
313
314 > int i,n=1;
315 > struct skynet_message msg;
316
317 > for (i=0;i<n;i++) {
318 > > if (skynet_mq_pop(q,&msg)) {
319 > > > skynet_context_release(ctx);
320 > > > return skynet_globalmq_pop();
321 > > } else if (i==0 && weight >= 0) {
322 > > > n = skynet_mq_length(q);
323 > > > n >>= weight;
324 > > }
325 > > int overload = skynet_mq_overload(q);
326 > > if (overload) {
327 > > > skynet_error(ctx, "May overload, message queue length = %d", overload);
328 > > }
329
330 > > skynet_monitor_trigger(sm, msg.source , handle);
331
332 > > if (ctx->cb == NULL) {
333 > > > skynet_free(msg.data);
334 > > } else {
335 > > > dispatch_message(ctx, &msg);
336 > > }
337
338 > > skynet_monitor_trigger(sm, 0,0);
339 > }
340
341 > assert(q == ctx->queue);
342 > struct message_queue *nq = skynet_globalmq_pop();
343 > if (nq) {
344 > > // If global mq is not empty , push q back, and return next queue (nq)
345 > > // Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
346 > > skynet_globalmq_push(q);
347 > > q = nq;
348 > }-
349 > skynet_context_release(ctx);
350
351 > return q;
352 }
- 先全局消息队列中弹出一个次级队列,根据权重获取本次处理消息的数量,每次循环从次级队列中弹出消息,调用 dispatch_message 处理消息,最后如果队列中还有消息就放回全局队列中(服务每次处理若干条消息,而不是一次处理完全部,这样做是防止其他服务饥饿)
// skynet-src/skynet_server.c
260 static void
261 dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
262 > assert(ctx->init);
263 > CHECKCALLING_BEGIN(ctx)
264 > pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));
265 > int type = msg->sz >> MESSAGE_TYPE_SHIFT;
266 > size_t sz = msg->sz & MESSAGE_TYPE_MASK;
267 > FILE *f = (FILE *)ATOM_LOAD(&ctx->logfile);
268 > if (f) {
269 > > skynet_log_output(f, msg->source, type, msg->session, msg->data, sz);
270 > }
271 > ++ctx->message_count;
272 > int reserve_msg;
273 > if (ctx->profile) {
274 > > ctx->cpu_start = skynet_thread_time();
275 > > reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
276 > > uint64_t cost_time = skynet_thread_time() - ctx->cpu_start;
277 > > ctx->cpu_cost += cost_time;
278 > } else {
279 > > reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
280 > }
281 > if (!reserve_msg) {
282 > > skynet_free(msg->data);
283 > }
284 > CHECKCALLING_END(ctx)
285 }
- 解包消息 msg,并调用 ctx->cb ,最后根据返回值 reserve_msg 来决定是否释放 msg->data 的空间
- 用户通过接口 skynet_callback 来设置 ctx->cb,通常是在动态库句柄的 init中,如 logger 服务的 callback 就是在 logger_init 中