- skynet 创建专门的 socket 线程用于处理 socket 相关的逻辑。skynet 将 socket 相关的操作提炼出 lua 接口供用于在 lua 层操作 socket
- lua 层 与 socket 层并不在同一个线程内,lua 层向 socket 线程发送消息是通过管道,而 socket 层向 lua 层发送消息是通过 skynet 的消息系统
- lua socket 接口代码在文件 lualib-src/lua-socket.c 中
// lualib-src/lua-socket.c
820 LUAMOD_API int
821 luaopen_skynet_socketdriver(lua_State *L) {
822 > luaL_checkversion(L);
823 > luaL_Reg l[] = {
824 > > { "buffer", lnewbuffer },
825 > > { "push", lpushbuffer },
826 > > { "pop", lpopbuffer },
827 > > { "drop", ldrop },
828 > > { "readall", lreadall },
829 > > { "clear", lclearbuffer },
830 > > { "readline", lreadline },
831 > > { "str2p", lstr2p },
832 > > { "header", lheader },
833 > > { "info", linfo },
834
835 > > { "unpack", lunpack },
836 > > { NULL, NULL },
837 > };
838 > luaL_newlib(L,l);
839 > luaL_Reg l2[] = {
840 > > { "connect", lconnect },
841 > > { "close", lclose },
842 > > { "shutdown", lshutdown },
843 > > { "listen", llisten },
844 > > { "send", lsend },
845 > > { "lsend", lsendlow },
846 > > { "bind", lbind },
847 > > { "start", lstart },
848 > > { "pause", lpause },
849 > > { "nodelay", lnodelay },
850 > > { "udp", ludp },
851 > > { "udp_connect", ludp_connect },
852 > > { "udp_send", ludp_send },
853 > > { "udp_address", ludp_address },
854 > > { "resolve", lresolve },
855 > > { NULL, NULL },
856 > };
857 > lua_getfield(L, LUA_REGISTRYINDEX, "skynet_context");
858 > struct skynet_context *ctx = lua_touserdata(L,-1);
859 > if (ctx == NULL) {
860 > > return luaL_error(L, "Init skynet context first");
861 > }
862
863 > luaL_setfuncs(L,l2,1);
864
865 > return 1;
866 }
- socket 操作为 l2 注册的 c 函数,这些函数最后都会调用 send_request 函数
// skynet-src/socket_server.c
1786 static void
1787 send_request(struct socket_server *ss, struct request_package *request, char type, int len) {
1788 request->header[6] = (uint8_t)type;
1789 request->header[7] = (uint8_t)len;
1790 const char * req = (const char *)request + offsetof(struct request_package, header[6]);
1791 for (;;) {
1792 ssize_t n = write(ss->sendctrl_fd, req, len+2);
1793 if (n<0) {
1794 if (errno != EINTR) {
1795 skynet_error(NULL, "socket-server : send ctrl command error %s.", strerror(errno));
1796 }
1797 continue;
1798 }
1799 assert(n == len+2);
1800 return;
1801 }
1802 }
- send_request 的作用是把对应的操作及参数序列化并写入 socket sendctrl_fd 中
- socket 线程那边通过监听 sendctrl_fd 获取对应 socket 操作类型及参数
- socket 线程主循环就是不断调用 socket_server_poll,主要做两件事
- 调用 socket_server_poll 处理 socket
- 调用 forward_message 把处理的结果打包发送给 lua 层
// skynet-src/skynet_socket.c
78 int-
79 skynet_socket_poll() {
80 > struct socket_server *ss = SOCKET_SERVER;
81 > assert(ss);
82 > struct socket_message result;
83 > int more = 1;
84 > int type = socket_server_poll(ss, &result, &more);
85 > switch (type) {
86 > case SOCKET_EXIT:
87 > > return 0;
88 > case SOCKET_DATA:
89 > > forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result);
90 > > break;
91 > case SOCKET_CLOSE:
92 > > forward_message(SKYNET_SOCKET_TYPE_CLOSE, false, &result);
93 > > break;
94 > case SOCKET_OPEN:
95 > > forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result);
96 > > break;
97 > case SOCKET_ERR:
98 > > forward_message(SKYNET_SOCKET_TYPE_ERROR, true, &result);
99 > > break;
100 > case SOCKET_ACCEPT:
101 > > forward_message(SKYNET_SOCKET_TYPE_ACCEPT, true, &result);
102 > > break;
103 > case SOCKET_UDP:
104 > > forward_message(SKYNET_SOCKET_TYPE_UDP, false, &result);
105 > > break;
106 > case SOCKET_WARNING:
107 > > forward_message(SKYNET_SOCKET_TYPE_WARNING, false, &result);
108 > > break;
109 > default:
110 > > skynet_error(NULL, "Unknown socket message type %d.",type);
111 > > return -1;
112 > }
113 > if (more) {
114 > > return -1;
115 > }
116 > return 1;
117 }
- socket_server_pool 主要做两件事情
- 调用 has_cmd 函数检查 sendctrl_fd 是否有事件,如果有就调用 ctrl_cmd 根据操作类型进行对应处理(如 “B” 对应 bind,“L” 对应 listen, “K” 对应 close 等)
- 调用 epoll_wait 获取监听 socket 的事件,并对应处理( 1726 ~1771)
1671 int-
1672 socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
1673 > for (;;) {
1674 > > if (ss->checkctrl) {
1675 > > > if (has_cmd(ss)) {
1676 > > > > int type = ctrl_cmd(ss, result);
1677 > > > > if (type != -1) {
1678 > > > > > clear_closed_event(ss, result, type);
1679 > > > > > return type;
1680 > > > > } else
1681 > > > > > continue;
1682 > > > } else {
1683 > > > > ss->checkctrl = 0;
1684 > > > }
1685 > > }
1686 > > if (ss->event_index == ss->event_n) {
1687 > > > ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT);
1688 > > > ss->checkctrl = 1;
1689 > > > if (more) {
1690 > > > > *more = 0;
1691 > > > }
1692 > > > ss->event_index = 0;
1693 > > > if (ss->event_n <= 0) {
1694 > > > > ss->event_n = 0;
1695 > > > > int err = errno;
1696 > > > > if (err != EINTR) {
1697 > > > > > skynet_error(NULL, "socket-server: %s", strerror(err));
1698 > > > > }
1699 > > > > continue;
1700 > > > }
1701 > > }
...
1726 > > default:
1727 > > > if (e->read) {
1728 > > > > int type;
1729 > > > > if (s->protocol == PROTOCOL_TCP) {
1730 > > > > > type = forward_message_tcp(ss, s, &l, result);
1731 > > > > > if (type == SOCKET_MORE) {
1732 > > > > > > --ss->event_index;
1733 > > > > > > return SOCKET_DATA;
1734 > > > > > }
1735 > > > > } else {
1736 > > > > > type = forward_message_udp(ss, s, &l, result);
1737 > > > > > if (type == SOCKET_UDP) {
1738 > > > > > > // try read again
1739 > > > > > > --ss->event_index;
1740 > > > > > > return SOCKET_UDP;
1741 > > > > > }
1742 > > > > }
1743 > > > > if (e->write && type != SOCKET_CLOSE && type != SOCKET_ERR) {
1744 > > > > > // Try to dispatch write message next step if write flag set.
1745 > > > > > e->read = false;
1746 > > > > > --ss->event_index;
1747 > > > > }
1748 > > > > if (type == -1)
1749 > > > > > break;> > > >
1750 > > > > return type;
1751 > > > }
1752 > > > if (e->write) {
1753 > > > > int type = send_buffer(ss, s, &l, result);
1754 > > > > if (type == -1)
1755 > > > > > break;
1756 > > > > return type;
1757 > > > }
1758 > > > if (e->error) {
1759 > > > > int error;
1760 > > > > socklen_t len = sizeof(error);--
1761 > > > > int code = getsockopt(s->fd, SOL_SOCKET, SO_ERROR, &error, &len);--
1762 > > > > const char * err = NULL;
1763 > > > > if (code < 0) {
1764 > > > > > err = strerror(errno);
1765 > > > > } else if (error != 0) {
1766 > > > > > err = strerror(error);
1767 > > > > } else {
1768 > > > > > err = "Unknown error";
1769 > > > > }
1770 > > > > return report_error(s, result, err);
1771 > > > }
1772 > > > if (e->eof) {
1773 > > > > // For epoll (at least), FIN packets are exchanged both ways.
1774 > > > > // See: https://stackoverflow.com/questions/52976152/tcp-when-is-epollhup-generated
1775 > > > > int halfclose = halfclose_read(s);
1776 > > > > force_close(ss, s, &l, result);
1777 > > > > if (!halfclose) {
1778 > > > > > return SOCKET_CLOSE;
1779 > > > > }
1780 > > > }
1781 > > > break;
1782 > > }
1783 > }
1784 }
- 最后把操作的结果打包通过 skyet 的消息系统发送给 lua 层
// skynet-src/skynet_socket.c
38 static void
39 forward_message(int type, bool padding, struct socket_message * result) {
40 > struct skynet_socket_message *sm;
41 > size_t sz = sizeof(*sm);
42 > if (padding) {
43 > > if (result->data) {
44 > > > size_t msg_sz = strlen(result->data);
45 > > > if (msg_sz > 128) {
46 > > > > msg_sz = 128;
47 > > > }
48 > > > sz += msg_sz;
49 > > } else {
50 > > > result->data = "";
51 > > }
52 > }
53 > sm = (struct skynet_socket_message *)skynet_malloc(sz);
54 > sm->type = type;
55 > sm->id = result->id;
56 > sm->ud = result->ud;
57 > if (padding) {
58 > > sm->buffer = NULL;
59 > > memcpy(sm+1, result->data, sz - sizeof(*sm));
60 > } else {
61 > > sm->buffer = result->data;
62 > }
63
64 > struct skynet_message message;
65 > message.source = 0;
66 > message.session = 0;
67 > message.data = sm;
68 > message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT);
69 >
70 > if (skynet_context_push((uint32_t)result->opaque, &message)) {
71 > > // todo: report somewhere to close socket
72 > > // don't call skynet_socket_close here (It will block mainloop)
73 > > skynet_free(sm->buffer);
74 > > skynet_free(sm);
75 > }
76 }