Cwww3's Blog

Record what you think

0%

kqueue

使用 Kqueue 实现简单的 TCP 服务器

项目地址

设计

TCP 服务器由 TCP socket,来自客户端连接的 socket,kqueue 以及轮询 kqueue 的 event loop 组成。

image-20211223235715574

当一个客户端想要连接服务器,连接请求会被放入 TCP 连接队列。内核会将该事件放入 kqueue,然后 event loop 轮询 kqueue 获取该事件,并创建一个新的客户端 socket。

image-20211224000209004

当客户端写入数据,内核会将该事件放入 kqueue,然后 event loop 轮询获取该事件,获取到 socket,并从这个 socket 中读取数据。

实现

  1. Create, bind, and listen on a new socket
  2. Create new kqueue
  3. Subscribe to socket events
  4. Poll for new events in a loop and handle them

socket 模块用于封装 socket 相关的所用功能,kqueue 模块用于封装 event loop 的所用功能。

在 main 模块中调用他们,实现 tcp 服务。

socket

用 Go 类型定义一个 Socket,需要将文件描述符进行保存。

1
2
3
type Socket struct {
FileDescriptor int
}

实现读 io.Reader,写 io.Writer,关闭 io.Closer 等接口。

1
2
3
4
5
6
7
8
9
10
11
func (socket Socket) Read(bytes []byte) (int, error) {
if len(bytes) == 0 {
return 0, nil
}
numBytesRead, err :=
syscall.Read(socket.FileDescriptor, bytes)
if err != nil {
numBytesRead = 0
}
return numBytesRead, err
}
1
2
3
4
5
6
7
8
func (socket Socket) Write(bytes []byte) (int, error) {
numBytesWritten, err :=
syscall.Write(socket.FileDescriptor, bytes)
if err != nil {
numBytesWritten = 0
}
return numBytesWritten, err
}
1
2
3
func (socket *Socket) Close() error {
return syscall.Close(socket.FileDescriptor)
}
1
2
3
func (socket *Socket) String() string {
return strconv.Itoa(socket.FileDescriptor)
}

根据提供的 ip 和端口,创建并监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func Listen(ip string, port int) (*Socket, error) {
// 1. 创建套接字
socket := &Socket{}
// AF_INET 表示IPV4
// SOCK_STREAM 表示有序的、可靠的、基于双向连接的字节流
// 0在SOCK_STREAM套接字中表示TCP
socketFileDescriptor, err :=
syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, 0)
if err != nil {
return nil, fmt.Errorf("failed to create socket (%v)", err)
}
socket.FileDescriptor = socketFileDescriptor

// 2. 绑定ip和端口
socketAddress := &syscall.SockaddrInet4{Port: port}
copy(socketAddress.Addr[:], net.ParseIP(ip))
if err = syscall.Bind(socket.FileDescriptor, socketAddress);
err != nil {
return nil, fmt.Errorf("failed to bind socket (%v)", err)
}

// 3. 开始监听,接收连接请求 SOMAXCONN设置最大等待的连接数
if err = syscall.Listen(socket.FileDescriptor, syscall.SOMAXCONN);
err != nil {
return nil, fmt.Errorf("failed to listen on socket (%v)", err)
}

return socket, nil
}

kqueue

定义一个 EventLoop 结构体

分别保存 kqueue 文件描述符以及用于监听客户端连接的套接字文件描述符

1
2
3
4
type EventLoop struct {
KqueueFileDescriptor int
SocketFileDescriptor int
}

定义一个创建 EventLoop 的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func NewEventLoop(s *socket.Socket) (*EventLoop, error) {
// 创建kqueue,返回对应的文件描述符
kQueue, err := syscall.Kqueue()
if err != nil {
return nil,
fmt.Errorf("failed to create kqueue file descriptor (%v)", err)
}
// 定义监听的事件类型以及处理方法
// Ident设置创建的socket文件描述符
// Filter 设置成EVFILT_READ表示对传入的连接事件感兴趣
// Flags 设置相应的响应动作 EV_ADD表示添加到kqueue中,EV_ENABLE表示启用
changeEvent := syscall.Kevent_t{
Ident: uint64(s.FileDescriptor),
Filter: syscall.EVFILT_READ,
Flags: syscall.EV_ADD | syscall.EV_ENABLE,
Fflags: 0,
Data: 0,
Udata: nil,
}

// 将定义的事件类型注册要kqueue上
changeEventRegistered, err := syscall.Kevent(
kQueue,
[]syscall.Kevent_t{changeEvent},
nil,
nil
)
if err != nil || changeEventRegistered == -1 {
return nil,
fmt.Errorf("failed to register change event (%v)", err)
}

return &EventLoop{
KqueueFileDescriptor: kQueue,
SocketFileDescriptor: s.FileDescriptor
}, nil
}

定义函数对 kquue 轮询处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func (eventLoop *EventLoop) Handle(handler Handler) {
for {
// 轮询kqueue获取事件的数量
newEvents := make([]syscall.Kevent_t, 10)
numNewEvents, err := syscall.Kevent(
eventLoop.KqueueFileDescriptor,
nil,
newEvents,
nil
)
if err != nil {
continue
}

// 处理每一个事件
for i := 0; i < numNewEvents; i++ {
currentEvent := newEvents[i]
eventFileDescriptor := int(currentEvent.Ident)

if currentEvent.Flags&syscall.EV_EOF != 0 {
// 客户端关闭事件
syscall.Close(eventFileDescriptor)
} else if eventFileDescriptor == eventLoop.SocketFileDescriptor {
// 客户端连接事件
socketConnection, _, err :=
syscall.Accept(eventFileDescriptor)
if err != nil {
continue
}
// 获取客户端连接请求创建对应的套接字和文件描述符
// 并定义订阅读事件,注册到kqueue上
socketEvent := syscall.Kevent_t{
Ident: uint64(socketConnection),
Filter: syscall.EVFILT_READ,
Flags: syscall.EV_ADD,
Fflags: 0,
Data: 0,
Udata: nil,
}
socketEventRegistered, err := syscall.Kevent(
eventLoop.KqueueFileDescriptor,
[]syscall.Kevent_t{socketEvent},
nil,
nil
)
if err != nil || socketEventRegistered == -1 {
continue
}
} else if currentEvent.Filter&syscall.EVFILT_READ != 0 {
// 处理已经建立连接的客户端所发来的数据
handler(&socket.Socket{
FileDescriptor: int(eventFileDescriptor)
})
}

// ignore all other events
}
}
}

main

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func main() {
s, err := socket.Listen("127.0.0.1", 8080)
if err != nil {
log.Println("Failed to create Socket:", err)
os.Exit(1)
}

eventLoop, err := kqueue.NewEventLoop(s)
if err != nil {
log.Println("Failed to create event loop:", err)
os.Exit(1)
}

log.Println("Server started. Waiting for incoming connections. ^C to exit.")

// 定义以建立连接的客户端发来数据时的处理函数
eventLoop.Handle(func(s *socket.Socket) {
reader := bufio.NewReader(s)
for {
line, err := reader.ReadString('\n')
if err != nil || strings.TrimSpace(line) == "" {
break
}
s.Write([]byte(line))
}
s.Close()
})
}

模拟连接、发送数据、关闭连接

1
2
3
4
5
6
7
8
9
10
11
func main() {
c,err := net.Dial("tcp","127.0.0.1:8080")
if err!= nil {
log.Fatalln(err)
}
defer c.Close()
_,_ = c.Write([]byte("hello world\n"))
log.Println("send msg")
time.Sleep(time.Second*5)
log.Println("client closed")
}
Donate comment here.