使用 Kqueue 实现简单的 TCP 服务器
项目地址
设计
TCP 服务器由 TCP socket,来自客户端连接的 socket,kqueue 以及轮询 kqueue 的 event loop 组成。
当一个客户端想要连接服务器,连接请求会被放入 TCP 连接队列。内核会将该事件放入 kqueue,然后 event loop 轮询 kqueue 获取该事件,并创建一个新的客户端 socket。
当客户端写入数据,内核会将该事件放入 kqueue,然后 event loop 轮询获取该事件,获取到 socket,并从这个 socket 中读取数据。
实现
- Create, bind, and listen on a new socket
- Create new kqueue
- Subscribe to socket events
- Poll for new events in a loop and handle them
socket 模块用于封装 socket 相关的所用功能,kqueue 模块用于封装 event loop 的所用功能。
在 main 模块中调用他们,实现 tcp 服务。
socket
用 Go 类型定义一个 Socket,需要将文件描述符进行保存。
1 2 3
| type Socket struct { FileDescriptor int }
|
实现读 io.Reader,写 io.Writer,关闭 io.Closer 等接口。
1 2 3 4 5 6 7 8 9 10 11
| func (socket Socket) Read(bytes []byte) (int, error) { if len(bytes) == 0 { return 0, nil } numBytesRead, err := syscall.Read(socket.FileDescriptor, bytes) if err != nil { numBytesRead = 0 } return numBytesRead, err }
|
1 2 3 4 5 6 7 8
| func (socket Socket) Write(bytes []byte) (int, error) { numBytesWritten, err := syscall.Write(socket.FileDescriptor, bytes) if err != nil { numBytesWritten = 0 } return numBytesWritten, err }
|
1 2 3
| func (socket *Socket) Close() error { return syscall.Close(socket.FileDescriptor) }
|
1 2 3
| func (socket *Socket) String() string { return strconv.Itoa(socket.FileDescriptor) }
|
根据提供的 ip 和端口,创建并监听
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| func Listen(ip string, port int) (*Socket, error) { socket := &Socket{} socketFileDescriptor, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, 0) if err != nil { return nil, fmt.Errorf("failed to create socket (%v)", err) } socket.FileDescriptor = socketFileDescriptor
socketAddress := &syscall.SockaddrInet4{Port: port} copy(socketAddress.Addr[:], net.ParseIP(ip)) if err = syscall.Bind(socket.FileDescriptor, socketAddress); err != nil { return nil, fmt.Errorf("failed to bind socket (%v)", err) }
if err = syscall.Listen(socket.FileDescriptor, syscall.SOMAXCONN); err != nil { return nil, fmt.Errorf("failed to listen on socket (%v)", err) }
return socket, nil }
|
kqueue
定义一个 EventLoop 结构体
分别保存 kqueue 文件描述符以及用于监听客户端连接的套接字文件描述符
1 2 3 4
| type EventLoop struct { KqueueFileDescriptor int SocketFileDescriptor int }
|
定义一个创建 EventLoop 的函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| func NewEventLoop(s *socket.Socket) (*EventLoop, error) { kQueue, err := syscall.Kqueue() if err != nil { return nil, fmt.Errorf("failed to create kqueue file descriptor (%v)", err) } changeEvent := syscall.Kevent_t{ Ident: uint64(s.FileDescriptor), Filter: syscall.EVFILT_READ, Flags: syscall.EV_ADD | syscall.EV_ENABLE, Fflags: 0, Data: 0, Udata: nil, }
changeEventRegistered, err := syscall.Kevent( kQueue, []syscall.Kevent_t{changeEvent}, nil, nil ) if err != nil || changeEventRegistered == -1 { return nil, fmt.Errorf("failed to register change event (%v)", err) }
return &EventLoop{ KqueueFileDescriptor: kQueue, SocketFileDescriptor: s.FileDescriptor }, nil }
|
定义函数对 kquue 轮询处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| func (eventLoop *EventLoop) Handle(handler Handler) { for { newEvents := make([]syscall.Kevent_t, 10) numNewEvents, err := syscall.Kevent( eventLoop.KqueueFileDescriptor, nil, newEvents, nil ) if err != nil { continue }
for i := 0; i < numNewEvents; i++ { currentEvent := newEvents[i] eventFileDescriptor := int(currentEvent.Ident)
if currentEvent.Flags&syscall.EV_EOF != 0 { syscall.Close(eventFileDescriptor) } else if eventFileDescriptor == eventLoop.SocketFileDescriptor { socketConnection, _, err := syscall.Accept(eventFileDescriptor) if err != nil { continue } socketEvent := syscall.Kevent_t{ Ident: uint64(socketConnection), Filter: syscall.EVFILT_READ, Flags: syscall.EV_ADD, Fflags: 0, Data: 0, Udata: nil, } socketEventRegistered, err := syscall.Kevent( eventLoop.KqueueFileDescriptor, []syscall.Kevent_t{socketEvent}, nil, nil ) if err != nil || socketEventRegistered == -1 { continue } } else if currentEvent.Filter&syscall.EVFILT_READ != 0 { handler(&socket.Socket{ FileDescriptor: int(eventFileDescriptor) }) }
} } }
|
main
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| func main() { s, err := socket.Listen("127.0.0.1", 8080) if err != nil { log.Println("Failed to create Socket:", err) os.Exit(1) }
eventLoop, err := kqueue.NewEventLoop(s) if err != nil { log.Println("Failed to create event loop:", err) os.Exit(1) }
log.Println("Server started. Waiting for incoming connections. ^C to exit.")
eventLoop.Handle(func(s *socket.Socket) { reader := bufio.NewReader(s) for { line, err := reader.ReadString('\n') if err != nil || strings.TrimSpace(line) == "" { break } s.Write([]byte(line)) } s.Close() }) }
|
模拟连接、发送数据、关闭连接
1 2 3 4 5 6 7 8 9 10 11
| func main() { c,err := net.Dial("tcp","127.0.0.1:8080") if err!= nil { log.Fatalln(err) } defer c.Close() _,_ = c.Write([]byte("hello world\n")) log.Println("send msg") time.Sleep(time.Second*5) log.Println("client closed") }
|