which := identifyDataDirOrDie(cfg.ec.GetLogger(), cfg.ec.Dir) // 检查存储数据的目录, 返回数据存储的类型是成员还是 proxy 等 if which != dirEmpty { // 节点目录不为空,证明不是第一次使用,恢复之前的配置 ... switch which { case dirMember: // 如果是成员类型,需要开启一个 etcd 服务 stopped, errc, err = startEtcd(&cfg.ec) case dirProxy: // 如果是proxy, 则开启一个 proxy 服务 err = startProxy(cfg) default: ... } } else { // 如果为空,则根据参数启动服务 shouldProxy := cfg.isProxy() if !shouldProxy { // 如果不是 proxy, 则启动一个正常的 server stopped, errc, err = startEtcd(&cfg.ec) if derr, ok := err.(*etcdserver.DiscoveryError); ok && derr.Err == v2discovery.ErrFullCluster { if cfg.shouldFallbackToProxy() { ... shouldProxy = true } } ... if shouldProxy { // 如果是 proxy ,则启动一个 proxy 服务 err = startProxy(cfg) } } ... osutil.HandleInterrupts(lg) // 接收外界信号
// At this point, the initialization of etcd is done. // The listeners are listening on the TCP ports and ready // for accepting connections. The etcd instance should be // joined with the cluster and ready to serve incoming // connections. notifySystemd(lg) // 把型号发送给正在运行的 etcd 守护进程
select { // 进入阻塞状态,除非出现错误或者服务关闭 case lerr := <-errc: // fatal out on listener errors if lg != nil { lg.Fatal("listener failed", zap.Error(lerr)) } else { plog.Fatal(lerr) } case <-stopped: } osutil.Exit(0) }
// startEtcd runs StartEtcd in addition to hooks needed for standalone etcd. funcstartEtcd(cfg *embed.Config)(<-chanstruct{}, <-chan error, error) { e, err := embed.StartEtcd(cfg) // 根据配置开启一个 etcd server if err != nil { returnnil, nil, err } osutil.RegisterInterruptHandler(e.Close) // 注册通过信号关闭时的回调函数 select { // 进入阻塞,除非接收到下面的信号 case <-e.Server.ReadyNotify(): // wait for e.Server to join the cluster 阻塞,直到当前 server 注册到了集群中 case <-e.Server.StopNotify(): // publish aborted from 'ErrStopped' 注册失败 } return e.Server.StopNotify(), e.Err(), nil }
// StartEtcd 回开启一个 etcd server, 并且接收 HTTP 请求,但是这个函数并不会保证加入到了集群中 // 加入集群是由 Etcd.Server.ReadyNotify() 来实现的 // StartEtcd launches the etcd server and HTTP handlers for client/server communication. // The returned Etcd.Server is not guaranteed to have joined the cluster. Wait // on the Etcd.Server.ReadyNotify() channel to know when it completes and is ready for use. funcStartEtcd(inCfg *Config)(e *Etcd, err error) { if err = inCfg.Validate(); err != nil { // 检查一些参数是否合法 returnnil, err } serving := false e = &Etcd{cfg: *inCfg, stopc: make(chanstruct{})} cfg := &e.cfg deferfunc() { if e == nil || err == nil { return } if !serving { // errored before starting gRPC server for serveCtx.serversC for _, sctx := range e.sctxs { close(sctx.serversC) } } e.Close() e = nil }() // 开启 peer server, 默认端口 2380 if e.Peers, err = configurePeerListeners(cfg); err != nil { // 节点数据赋值 return e, err } // 开启 client server, 默认端口 2379,并且支持多协议(grpc,http, https) if e.sctxs, err = configureClientListeners(cfg); err != nil { // client 数据赋值 return e, err }
for _, sctx := range e.sctxs { // 当前 server ctx 记录 e.Clients = append(e.Clients, sctx.l) }
// 注册 token memberInitialized := true if !isMemberInitialized(cfg) { memberInitialized = false urlsmap, token, err = cfg.PeerURLsMapAndToken("etcd") if err != nil { return e, fmt.Errorf("error setting up initial cluster: %v", err) } }
// AutoCompactionRetention defaults to "0" if not set. iflen(cfg.AutoCompactionRetention) == 0 { cfg.AutoCompactionRetention = "0" } autoCompactionRetention, err := parseCompactionRetention(cfg.AutoCompactionMode, cfg.AutoCompactionRetention) if err != nil { return e, err }
backendFreelistType := parseBackendFreelistType(cfg.ExperimentalBackendFreelistType) // 根据配置新建一个 server 对象 if e.Server, err = etcdserver.NewServer(srvcfg); err != nil { return e, err }
// buffer channel 保证服务关闭的时候不会阻塞 // buffer channel so goroutines on closed connections won't wait forever e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
// newly started member ("memberInitialized==false") // does not need corruption check if memberInitialized { if err = e.Server.CheckInitialHashKV(); err != nil { // set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()" // (nothing to close since rafthttp transports have not been started) e.Server = nil return e, err } } e.Server.Start() // 开启一个 etcd server
// 与每个 peer 保持通信 if err = e.servePeers(); err != nil { return e, err } // 开启 server 与每个 client 保持通信, 可以同时支持多重协议 if err = e.serveClients(); err != nil { return e, err } // 与每个 metrics 保持通信 if err = e.serveMetrics(); err != nil { return e, err } ... serving = true return e, nil }
// Start performs any initialization of the Server necessary for it to // begin serving requests. It must be called before Do or Process. // Start must be non-blocking; any long-running server functionality // should be implemented in goroutines. func(s *EtcdServer)Start() { s.start() // 下面的函数将在 server 关闭后等执行完成 s.goAttach(func() { s.adjustTicks() }) s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) }) s.goAttach(s.purgeFile) s.goAttach(func() { monitorFileDescriptor(s.getLogger(), s.stopping) }) s.goAttach(s.monitorVersions) s.goAttach(s.linearizableReadLoop) s.goAttach(s.monitorKVHash) }
// start prepares and starts server in a new goroutine. It is no longer safe to // modify a server's fields after it has been sent to Start. // This function is just used for testing. func(s *EtcdServer)start() { ... // 通过一个 goroutine 开启服务 // TODO: if this is an empty log, writes all peer infos // into the first entry go s.run() }
func(s *EtcdServer)run() { lg := s.getLogger() sn, err := s.r.raftStorage.Snapshot()
deferfunc() { s.wgMu.Lock() // block concurrent waitgroup adds in goAttach while stopping close(s.stopping) s.wgMu.Unlock() s.cancel()
sched.Stop()
// wait for gouroutines before closing raft so wal stays open s.wg.Wait()
s.SyncTicker.Stop()
// must stop raft after scheduler-- etcdserver can leak rafthttp pipelines // by adding a peer after raft stops the transport s.r.stop()
// kv, lessor and backend can be nil if running without v3 enabled // or running unit tests. if s.lessor != nil { s.lessor.Stop() } if s.kv != nil { s.kv.Close() } if s.authStore != nil { s.authStore.Close() } if s.be != nil { s.be.Close() } if s.compactor != nil { s.compactor.Stop() } close(s.done) }() var expiredLeaseC <-chan []*lease.Lease if s.lessor != nil { expiredLeaseC = s.lessor.ExpiredLeasesC() } for { select { case ap := <-s.r.apply(): // 数据更新 f := func(context.Context) { s.applyAll(&ep, &ap) } sched.Schedule(f) case leases := <-expiredLeaseC: // 租期过期处理 s.goAttach(func() { ... }) case err := <-s.errorc: // 出现错误,退出 server ... return case <-getSyncC(): // 定期同步数据 if s.v2store.HasTTLKeys() { s.sync(s.Cfg.ReqTimeout()) } case <-s.stop: // 停止信号,停止 server return } } }
// etcdserver/raft.go.start // raft node 启动,保持心跳 // start prepares and starts raftNode in a new goroutine. It is no longer safe // to modify the fields after it has been started. func(r *raftNode)start(rh *raftReadyHandler) { }
多协议支持
如何做到监听一个端口, 开启多个协议的 server 进行处理呢? 借助 github.com/soheilhy/cmux 来完成的。