前言
如果想兼顾开发效率,又能保证高并发,协程就是最好的选择。它可以在保持异步化运行机制的同时,用同步方式写代码(goroutine-per-connection),这在实现高并发的同时,缩短了开发周期,是高性能服务未来的发展方向。
整体理念
Go语言TCP Socket编程从tcp socket诞生后,网络编程架构模型也几经演化,大致是:“每进程一个连接” –> “每线程一个连接” –> “Non-Block + I/O多路复用(linux epoll/windows iocp/freebsd darwin kqueue/solaris Event Port)”。伴随着模型的演化,服务程序愈加强大,可以支持更多的连接,获得更好的处理性能。不过I/O多路复用也给使用者带来了不小的复杂度,以至于后续出现了许多高性能的I/O多路复用框架, 比如libevent、libev、libuv等,以帮助开发者简化开发复杂性,降低心智负担。不过Go的设计者似乎认为I/O多路复用的这种通过回调机制割裂控制流的方式依旧复杂,且有悖于“一般逻辑”设计,为此Go语言将该“复杂性”隐藏在Runtime中了:Go开发者无需关注socket是否是 non-block的,也无需亲自注册文件描述符的回调,只需在每个连接对应的goroutine中以“block I/O”的方式对待socket处理即可。
The Go netpollerIn Go, all I/O is blocking. The Go ecosystem is built around the idea that you write against a blocking interface and then handle concurrency through goroutines and channels rather than callbacks and futures.An example is the HTTP server in the “net/http” package. Whenever it accepts a connection, it will create a new goroutine to handle all the requests that will happen on that connection. This construct means that the request handler can be written in a very straightforward manner. First do this, then do that. Unfortunately, using the blocking I/O provided by the operating system isn’t suitable for constructing our own blocking I/O interface.
netty 在屏蔽java nio底层细节方面做得不错, 但因为java/jvm的限制,“回调机制割裂控制流”的问题依然无法避免。
原理
- 多路复用 有赖于 linux 的epoll 机制,具体的说 是 epoll_create/epoll_ctl/epoll_wait 三个函数
- epoll 机制包含 两个fd: epfd 和 待读写数据的fd(比如socket)。先创建efpd,然后向epfd 注册fd事件, 之后触发epoll_wait 轮询注册在epfd 的fd 事件发生了没有。
- netpoller 负责将 操作系统 提供的nio 转换为 goroutine 支持的blocking io。为屏蔽linux、windows 等底层nio 接口的差异,netpoller 定义一个 虚拟接口来封装底层接口。
func netpollinit() func netpollopen(fd uintptr, pd *pollDesc) int32 func netpoll(delta int64) gList func netpollBreak() func netpollIsPollDescriptor(fd uintptr) bool
本文 主要讲 netpoller 基于 linux 的epoll 接口 的实现 Go netpoller 网络模型之源码全面解析
Goroutine 让出线程并等待读写事件:当我们在文件描述符上执行读写操作时,如果文件描述符不可读或者不可写,当前 Goroutine 就会执行 runtime.poll_runtime_pollWait
检查 runtime.pollDesc
的状态并调用 runtime.netpollblock
等待文件描述符的可读或者可写。runtime.netpollblock
会使用运行时提供的 runtime.gopark
让出当前线程,将 Goroutine 转换到休眠状态并等待运行时的唤醒。
I/O 多路复用需要使用特定的系统调用/select,java 语言需要显式调用select ,而golang 则通过netpoller 组件将select调用 重新隐藏了。
多路复用等待读写事件的发生并返回:netpoller并不是由runtime中的某一个线程独立运行的,runtime中的调度和系统调用会通过 runtime.netpoll 与网络轮询器交换消息,获取待执行的 Goroutine 列表,恢复Goroutine 为运行状态,并将待执行的 Goroutine 加入运行队列等待处理。
io 前后的GPM
G1 正在 M 上执行,还有 3 个 Goroutine 在 LRQ 上等待执行。网络轮询器空闲着,什么都没干。
G1 想要进行网络系统调用,因此它被移动到网络轮询器并且处理异步网络系统调用。然后,M 可以从LRQ 执行另外的 Goroutine。此时,G2 就被上下文切换到 M 上了。
异步网络系统调用由网络轮询器完成,G1 被移回到 P 的 LRQ 中。一旦 G1 可以在 M 上进行上下文切换,它负责的 Go 相关代码就可以再次执行。
执行网络系统调用不需要额外的 M。网络轮询器使用系统线程,它时刻处理一个有效的事件循环/eventloop。
实现
核心数据结构
connect/accept/read/write 都会 转换为 pollDesc 操作。
调用 internal/poll.pollDesc.init
初始化文件描述符时不止会初始化网络轮询器,会通过 runtime.poll_runtime_pollOpen
函数重置轮询信息 runtime.pollDesc
并调用 runtime.netpollopen
初始化轮询事件。runtime.netpollopen
会调用 epollctl 向全局的轮询文件描述符 epfd 中加入新的轮询事件监听文件描述符的可读和可写状态
轮询 以获取 可执行的Goroutine
这里类似 netty 的eventloop
// src/runtime/netpoll_epoll.go
func netpoll(delay int64) gList {
// 根据传入的 delay 计算 epoll 系统调用需要等待的时间;
var waitms int32
if delay < 0 {
waitms = -1
} else if delay == 0 {
waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
waitms = 1e9
}
var events [128]epollevent
retry:
// 调用 epollwait 等待可读或者可写事件的发生;
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if waitms > 0 {
return gList{}
}
goto retry
}
// 在循环中依次处理 epollevent 事件;
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
...
continue
}
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
...
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
netpollready(&toRun, pd, mode)
}
}
return toRun
计算了需要等待的时间之后,runtime.netpoll 会执行 epollwait 等待文件描述符转换成可读或者可写。当 epollwait 函数返回的值大于 0 时,就意味着被监控的文件描述符出现了待处理的事件。处理的事件总共包含两种,一种是调用 runtime.netpollBreak
函数触发的事件,该函数的作用是中断网络轮询器;另一种是其他文件描述符的正常读写事件,对于这些事件,我们会交给 runtime.netpollready
处理
各个场景下的代码示例
tcp 代码示例
个典型的Go server端程序大致如下:
func handleConn(c net.Conn) {
defer c.Close()
for {
// read from the connection
// ... ...
// write to the connection
//... ...
}
}
func main() {
l, err := net.Listen("tcp", ":8888")
if err != nil {
fmt.Println("listen error:", err)
return
}
for {
c, err := l.Accept()
if err != nil {
fmt.Println("accept error:", err)
break
}
// start a new goroutine to handle
// the new connection.
go handleConn(c)
}
}
http 代码示例
func helloHandler(w http.ResponseWriter, req *http.Request) {
io.WriteString(w, "hello, world!\n")
}
func main() {
http.HandleFunc("/", helloHandler)
http.ListenAndServe(":12345", nil)
}
grpc 代码示例
demo helloworld helloworld.proto helloworld.pb.go ## 基于helloworld.proto 生成 server main.go client main.go
服务端main.go 示例
package main
const (
port = ":50051"
)
// server is used to implement helloworld.GreeterServer.
type server struct{}
// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("Received: %v", in.Name)
return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}
func main() {
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
helloworld.RegisterGreeterServer(s, &server{})
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
helloworld.pb.go 中定义了RegisterGreeterServer 方法,除传入grpc.Server外,第二个参数是定义好的 GreeterServer interface。 由此可见,grpc 与java thrift 异曲同工
- 定义thrift 文件
- thrift 命令基于thrift 文件生成 对应语言的 代码文件,包含了服务 接口
- 开发者提供 接口实现类