skynet的网络

  • skynet 创建专门的 socket 线程用于处理 socket 相关的逻辑。skynet 将 socket 相关的操作提炼出 lua 接口供用于在 lua 层操作 socket
  • lua 层 与 socket 层并不在同一个线程内,lua 层向 socket 线程发送消息是通过管道,而 socket 层向 lua 层发送消息是通过 skynet 的消息系统
  • lua socket 接口代码在文件 lualib-src/lua-socket.c 中
  // lualib-src/lua-socket.c 
  820 LUAMOD_API int
  821 luaopen_skynet_socketdriver(lua_State *L) {
  822 >   luaL_checkversion(L);
  823 >   luaL_Reg l[] = {
  824 >   >   { "buffer", lnewbuffer },
  825 >   >   { "push", lpushbuffer },
  826 >   >   { "pop", lpopbuffer },
  827 >   >   { "drop", ldrop },
  828 >   >   { "readall", lreadall },
  829 >   >   { "clear", lclearbuffer },
  830 >   >   { "readline", lreadline },
  831 >   >   { "str2p", lstr2p },
  832 >   >   { "header", lheader },
  833 >   >   { "info", linfo },
  834
  835 >   >   { "unpack", lunpack },
  836 >   >   { NULL, NULL },
  837 >   };
  838 >   luaL_newlib(L,l);
  839 >   luaL_Reg l2[] = {
  840 >   >   { "connect", lconnect },
  841 >   >   { "close", lclose },
  842 >   >   { "shutdown", lshutdown },
  843 >   >   { "listen", llisten },
  844 >   >   { "send", lsend },
  845 >   >   { "lsend", lsendlow },
  846 >   >   { "bind", lbind },
  847 >   >   { "start", lstart },
  848 >   >   { "pause", lpause },
  849 >   >   { "nodelay", lnodelay },
  850 >   >   { "udp", ludp },
  851 >   >   { "udp_connect", ludp_connect },
  852 >   >   { "udp_send", ludp_send },
  853 >   >   { "udp_address", ludp_address },
  854 >   >   { "resolve", lresolve },
  855 >   >   { NULL, NULL },
  856 >   };
  857 >   lua_getfield(L, LUA_REGISTRYINDEX, "skynet_context");
  858 >   struct skynet_context *ctx = lua_touserdata(L,-1);
  859 >   if (ctx == NULL) {
  860 >   >   return luaL_error(L, "Init skynet context first");
  861 >   }
  862
  863 >   luaL_setfuncs(L,l2,1);
  864
  865 >   return 1;
  866 }
  • socket 操作为 l2 注册的 c 函数,这些函数最后都会调用 send_request 函数
// skynet-src/socket_server.c
1786 static void
1787 send_request(struct socket_server *ss, struct request_package *request, char type, int len) {
1788     request->header[6] = (uint8_t)type;
1789     request->header[7] = (uint8_t)len;
1790     const char * req = (const char *)request + offsetof(struct request_package, header[6]);
1791     for (;;) {
1792         ssize_t n = write(ss->sendctrl_fd, req, len+2);
1793         if (n<0) {
1794             if (errno != EINTR) {
1795                 skynet_error(NULL, "socket-server : send ctrl command error %s.", strerror(errno));
1796             }
1797             continue;
1798         }
1799         assert(n == len+2);
1800         return;
1801     }
1802 }
  • send_request 的作用是把对应的操作及参数序列化并写入 socket sendctrl_fd 中
  • socket 线程那边通过监听 sendctrl_fd 获取对应 socket 操作类型及参数
  • socket 线程主循环就是不断调用 socket_server_poll,主要做两件事
    • 调用 socket_server_poll 处理 socket
    • 调用 forward_message 把处理的结果打包发送给 lua 层
   // skynet-src/skynet_socket.c
   78 int-
   79 skynet_socket_poll() {
   80 >   struct socket_server *ss = SOCKET_SERVER;
   81 >   assert(ss);
   82 >   struct socket_message result;
   83 >   int more = 1;
   84 >   int type = socket_server_poll(ss, &result, &more);
   85 >   switch (type) {
   86 >   case SOCKET_EXIT:
   87 >   >   return 0;
   88 >   case SOCKET_DATA:
   89 >   >   forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result);
   90 >   >   break;
   91 >   case SOCKET_CLOSE:
   92 >   >   forward_message(SKYNET_SOCKET_TYPE_CLOSE, false, &result);
   93 >   >   break;
   94 >   case SOCKET_OPEN:
   95 >   >   forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result);
   96 >   >   break;
   97 >   case SOCKET_ERR:
   98 >   >   forward_message(SKYNET_SOCKET_TYPE_ERROR, true, &result);
   99 >   >   break;
  100 >   case SOCKET_ACCEPT:
  101 >   >   forward_message(SKYNET_SOCKET_TYPE_ACCEPT, true, &result);
  102 >   >   break;
  103 >   case SOCKET_UDP:
  104 >   >   forward_message(SKYNET_SOCKET_TYPE_UDP, false, &result);
  105 >   >   break;
  106 >   case SOCKET_WARNING:
  107 >   >   forward_message(SKYNET_SOCKET_TYPE_WARNING, false, &result);
  108 >   >   break;
  109 >   default:
  110 >   >   skynet_error(NULL, "Unknown socket message type %d.",type);
  111 >   >   return -1;
  112 >   }
  113 >   if (more) {
  114 >   >   return -1;
  115 >   }
  116 >   return 1;
  117 }
  • socket_server_pool 主要做两件事情
    • 调用 has_cmd 函数检查 sendctrl_fd 是否有事件,如果有就调用 ctrl_cmd 根据操作类型进行对应处理(如 “B” 对应 bind,“L” 对应 listen, “K” 对应 close 等)
    • 调用 epoll_wait 获取监听 socket 的事件,并对应处理( 1726 ~1771)
1671 int-
1672 socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
1673 >   for (;;) {
1674 >   >   if (ss->checkctrl) {
1675 >   >   >   if (has_cmd(ss)) {
1676 >   >   >   >   int type = ctrl_cmd(ss, result);
1677 >   >   >   >   if (type != -1) {
1678 >   >   >   >   >   clear_closed_event(ss, result, type);
1679 >   >   >   >   >   return type;
1680 >   >   >   >   } else
1681 >   >   >   >   >   continue;
1682 >   >   >   } else {
1683 >   >   >   >   ss->checkctrl = 0;
1684 >   >   >   }
1685 >   >   }
1686 >   >   if (ss->event_index == ss->event_n) {
1687 >   >   >   ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT);
1688 >   >   >   ss->checkctrl = 1;
1689 >   >   >   if (more) {
1690 >   >   >   >   *more = 0;
1691 >   >   >   }
1692 >   >   >   ss->event_index = 0;
1693 >   >   >   if (ss->event_n <= 0) {
1694 >   >   >   >   ss->event_n = 0;
1695 >   >   >   >   int err = errno;
1696 >   >   >   >   if (err != EINTR) {
1697 >   >   >   >   >   skynet_error(NULL, "socket-server: %s", strerror(err));
1698 >   >   >   >   }
1699 >   >   >   >   continue;
1700 >   >   >   }
1701 >   >   }
...
1726 >   >   default:
1727 >   >   >   if (e->read) {
1728 >   >   >   >   int type;
1729 >   >   >   >   if (s->protocol == PROTOCOL_TCP) {
1730 >   >   >   >   >   type = forward_message_tcp(ss, s, &l, result);
1731 >   >   >   >   >   if (type == SOCKET_MORE) {
1732 >   >   >   >   >   >   --ss->event_index;
1733 >   >   >   >   >   >   return SOCKET_DATA;
1734 >   >   >   >   >   }
1735 >   >   >   >   } else {
1736 >   >   >   >   >   type = forward_message_udp(ss, s, &l, result);
1737 >   >   >   >   >   if (type == SOCKET_UDP) {
1738 >   >   >   >   >   >   // try read again
1739 >   >   >   >   >   >   --ss->event_index;
1740 >   >   >   >   >   >   return SOCKET_UDP;
1741 >   >   >   >   >   }
1742 >   >   >   >   }
1743 >   >   >   >   if (e->write && type != SOCKET_CLOSE && type != SOCKET_ERR) {
1744 >   >   >   >   >   // Try to dispatch write message next step if write flag set.
1745 >   >   >   >   >   e->read = false;
1746 >   >   >   >   >   --ss->event_index;
1747 >   >   >   >   }
1748 >   >   >   >   if (type == -1)
1749 >   >   >   >   >   break;> >   >   >
1750 >   >   >   >   return type;
1751 >   >   >   }
1752 >   >   >   if (e->write) {
1753 >   >   >   >   int type = send_buffer(ss, s, &l, result);
1754 >   >   >   >   if (type == -1)
1755 >   >   >   >   >   break;
1756 >   >   >   >   return type;
1757 >   >   >   }
1758 >   >   >   if (e->error) {
1759 >   >   >   >   int error;
1760 >   >   >   >   socklen_t len = sizeof(error);--
1761 >   >   >   >   int code = getsockopt(s->fd, SOL_SOCKET, SO_ERROR, &error, &len);--
1762 >   >   >   >   const char * err = NULL;
1763 >   >   >   >   if (code < 0) {
1764 >   >   >   >   >   err = strerror(errno);
1765 >   >   >   >   } else if (error != 0) {
1766 >   >   >   >   >   err = strerror(error);
1767 >   >   >   >   } else {
1768 >   >   >   >   >   err = "Unknown error";
1769 >   >   >   >   }
1770 >   >   >   >   return report_error(s, result, err);
1771 >   >   >   }
1772 >   >   >   if (e->eof) {
1773 >   >   >   >   // For epoll (at least), FIN packets are exchanged both ways.
1774 >   >   >   >   // See: https://stackoverflow.com/questions/52976152/tcp-when-is-epollhup-generated
1775 >   >   >   >   int halfclose = halfclose_read(s);
1776 >   >   >   >   force_close(ss, s, &l, result);
1777 >   >   >   >   if (!halfclose) {
1778 >   >   >   >   >   return SOCKET_CLOSE;
1779 >   >   >   >   }
1780 >   >   >   }
1781 >   >   >   break;
1782 >   >   }
1783 >   }
1784 }
  • 最后把操作的结果打包通过 skyet 的消息系统发送给 lua 层
 // skynet-src/skynet_socket.c
 38 static void
 39 forward_message(int type, bool padding, struct socket_message * result) {
 40 >   struct skynet_socket_message *sm;
 41 >   size_t sz = sizeof(*sm);
 42 >   if (padding) {
 43 >   >   if (result->data) {
 44 >   >   >   size_t msg_sz = strlen(result->data);
 45 >   >   >   if (msg_sz > 128) {
 46 >   >   >   >   msg_sz = 128;
 47 >   >   >   }
 48 >   >   >   sz += msg_sz;
 49 >   >   } else {
 50 >   >   >   result->data = "";
 51 >   >   }
 52 >   }
 53 >   sm = (struct skynet_socket_message *)skynet_malloc(sz);
 54 >   sm->type = type;
 55 >   sm->id = result->id;
 56 >   sm->ud = result->ud;
 57 >   if (padding) {
 58 >   >   sm->buffer = NULL;
 59 >   >   memcpy(sm+1, result->data, sz - sizeof(*sm));
 60 >   } else {
 61 >   >   sm->buffer = result->data;
 62 >   }
 63
 64 >   struct skynet_message message;
 65 >   message.source = 0;
 66 >   message.session = 0;
 67 >   message.data = sm;
 68 >   message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT);
 69 >
 70 >   if (skynet_context_push((uint32_t)result->opaque, &message)) {
 71 >   >   // todo: report somewhere to close socket
 72 >   >   // don't call skynet_socket_close here (It will block mainloop)
 73 >   >   skynet_free(sm->buffer);
 74 >   >   skynet_free(sm);
 75 >   }
 76 }