oohcode

$\bigodot\bigodot^H \rightarrow CODE$

Dependency inversion principle in Go

依赖反转原则在Go中使用

依赖反转原则

面向对象设计的设计原则有五个,分别是:

首字母 指代 概念
S 单一功能原则 认为对象应该仅具有一种单一功能的概念。
O 开闭原则 认为“软件体应该是对于扩展开放的,但是对于修改封闭的”的概念。
L 里氏替换原则 认为“程序中的对象应该是可以在不改变程序正确性的前提下被它的子类所替换的”的概念。 参考契约式设计。
I 接口隔离原则 认为“多个特定客户端接口要好于一个宽泛用途的接口” 的概念。
D 依赖反转原则 认为一个方法应该遵从“依赖于抽象而不是一个实例” 的概念。 依赖注入是该原则的一种实现方式。

这五个原则简称: SOLID
在面向对象编程领域中,依赖反转原则(Dependency inversion principle,DIP)是指一种特定的解耦(传统的依赖关系创建在高层次上,而具体的策略设置则应用在低层次的模块上)形式,使得高层次的模块不依赖于低层次的模块的实现细节,依赖关系被颠倒(反转),从而使得低层次模块依赖于高层次模块的需求抽象。
该原则规定:

  • 高层次的模块不应该依赖于低层次的模块,两者都应该依赖于抽象接口。
  • 抽象接口不应该依赖于具体实现。而具体实现则应该依赖于抽象接口。

该原则颠倒了一部分人对于面向对象设计的认识方式。如高层次和低层次对象都应该依赖于相同的抽象接口。

在传统的应用架构中,低层次的组件设计用于被高层次的组件使用,这一点提供了逐步的构建一个复杂系统的可能。在这种结构下,高层次的组件直接依赖于低层次的组件去实现一些任务。这种对于低层次组件的依赖限制了高层次组件被重用的可行性。

依赖反转原则的目的是把高层次组件从对低层次组件的依赖中解耦出来,这样使得重用不同层级的组件实现变得可能。把高层组件和低层组件划分到不同的包/库(在这些包/库中拥有定义了高层组件所必须的行为和服务的接口,并且存在高层组件的包)中的方式促进了这种解耦。由于低层组件是对高层组件接口的具体实现,因此低层组件包的编译是依赖于高层组件的,这颠倒了传统的依赖关系。众多的设计模式,比如插件,服务定位器或者依赖反转,则被用来在运行时把指定的低层组件实现提供给高层组件。

应用依赖反转原则同样被认为是应用了适配器模式,例如:高层的类定义了它自己的适配器接口(高层类所依赖的抽象接口)。被适配的对象同样依赖于适配器接口的抽象(这是当然的,因为它实现了这个接口),同时它的实现则可以使用它自身所在低层模块的代码。通过这种方式,高层组件则不依赖于低层组件,因为它(高层组件)仅间接的通过调用适配器接口多态方法使用了低层组件,而这些多态方法则是由被适配对象以及它的低层模块所实现的。

前面一大堆其实都是从wiki上copy过来的,自己的理解有以下几点:

  • 上层指调用者, 下层指被调用者
  • 原来的编程方式是上层调用下层的时候依赖下层具体的实现方式
  • 依赖反转(或叫:依赖倒置)是指下层的实现依赖上层调用的需求
  • 最终的解决方式是: 把上层的需求抽象成接口,上层依赖接口的抽象进行调用,下层依赖接口的抽象进行实现(下面要介绍的面相接口编程)

依赖注入

依赖注入是种实现控制反转用于解决依赖性设计模式。一个依赖关系指的是可被利用的一种对象(即服务提供端) 。依赖注入是将所依赖的传递给将使用的从属对象(即客户端)。该服务是将会变成客户端的状态的一部分。 传递服务给客户端,而非允许客户端来建立或寻找服务,是本设计模式的基本要求。

上面这段也是wiki上的, 自己理解:

  • 依赖注入就是: 把下层依赖注入(或叫传递)到上层调用
  • 要把提供服务的一方(也就是前面说的: 下层)作为实例传递给客户端(即:上层)
  • 不要客户端在内部自己实现服务端的实例化。
  • 这种方式的好处是: 可以通过传递不同的实例化对象来实现多态。

面相接口编程

面相接口编程是前面实现依赖反转原则的具体方式。
基于接口的编程将应用程序定义为组件的集合,其中组件间的应用程序接口(API)调用可能只通过抽象化接口完成,而没有具体的类。类的实例化一般通过使用如工厂模式等技术的其他接口完成。

这里也说一点自己的理解:
上面说到要通过依赖注入方式传递实例,这个实例如何生成呢?如果每次都生成一个,如果这个实例是有状态的,那么每个拿到的可能都是不一样的,这样就无法共享。所以一般都是通过工厂模式产生一个实例,其他调用方要共享的话都通过这个工厂拿到同一个实例

另一种定义描述: 在系统分析和架构中,分清层次和依赖关系,每个层次不是直接向其上层提供服务(即不是直接实例化在上层中),而是通过定义一组接口,仅向上层暴露其接口功能,上层对于下层仅仅是接口依赖,而不依赖具体类

面向接口编程和面向对象编程是什么关系:

首先,面向接口编程和面向对象编程并不是平级的,它并不是比面向对象编程更先进的一种独立的编程思想,而是附属于面向对象思想体系,属于其一部分。或者说,它是面向对象编程体系中的思想精髓之一。

接口的本质

接口是一组规则的集合,它规定了实现本接口的类或接口必须拥有的一组规则。体现了自然界“如果你是……则必须能……”的理念

例如,在自然界中,人都能吃饭,即“如果你是人,则必须能吃饭”。那么模拟到计算机程序中,就应该有一个Person接口,并有一个方法叫Eat(),然后我们规定,每一个表示“人”的类,必须实现Person接口,这就模拟了自然界“如果你是人,则必须能吃饭”这条规则。

从这里,我想各位也能看到些许面向对象思想的东西。面向对象思想的核心之一,就是模拟真实世界,把真实世界中的事物抽象成类,整个程序靠各个类的实例互相通信、互相协作完成系统功能,这非常符合真实世界的运行状况,也是面向对象思想的精髓。

接口是在一定粒度视图上同类事物的抽象表示。注意这里我强调了在一定粒度视图上,因为“同类事物”这个概念是相对的,它因为粒度视图不同而不同

例如,在我的眼里,我是一个人,和一头猪有本质区别,我可以接受我和我同学是同类这个说法,但绝不能接受我和一头猪是同类。但是,如果在一个动物学家眼里,我和猪应该是同类,因为我们都是动物,他可以认为“人”和“猪”都实现了Animal这个接口,而他在研究动物行为时,不会把我和猪分开对待,而会从“动物”这个较大的粒度上研究,但他会认为我和一棵树有本质区别。

面相接口编程的优点

  • 首先对系统灵活性大有好处。当下层需要改变时,只要接口及接口功能不变,则上层不用做任何修改。甚至可以在不改动上层代码时将下层整个替换掉。
  • 使用接口的另一个好处就是不同部件或层次的开发人员可以并行开工。

关于面相接口编程的归纳:

  • 面相接口是面向对象编程的重要部分
  • 接口本质上是一组规则的集合,是一定粒度上有相同特指的对象的的抽象
  • 面相接口编程可以提高编程的灵活性, 可以并行开发。

Go 中的应用

Go 中的接口

Go语言中,接口(interface)有其特殊的地方, 其他的语言一般要实现一个接口都需要显示的说明
例如PHP(这里没有贬低PHP的意思,大多数语言也是这种实现方式例如C++, Python, Rust等):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<?php

// Declare the interface 'iTemplate'
interface iTemplate
{
public function setVariable($name, $var);
public function getHtml($template);
}

// Implement the interface
// This will work
class Template implements iTemplate
{
private $vars = array();

public function setVariable($name, $var)
{
$this->vars[$name] = $var;
}

public function getHtml($template)
{
foreach($this->vars as $name => $value) {
$template = str_replace('{' . $name . '}', $value, $template);
}

return $template;
}
}

用到关键字 implements
todo: 对比优缺点

Go语言中,interfaceduck typing(鸭子类型: If it looks like a duck, and it quacks like a duck, then it is a duck), 也就是如果一个对象实现了某个接口的方法,那么这个对象就是这个接口类型了,不需要显示说明是否实现了某个接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Speaker types can say things.
type Speaker interface {
Say(string)
}

// Person is a strut with filed name
type Person struct {
name string
}

// Say funciton is defined by Speaker and implement by Person
func (p *Person) Say(message string) {
log.Println(p.name+":", message)
}

上面Person实现了函数Say, 所以Person就是Speaker类型了。

Go 中面相接口编程

这种编程方式不仅是在 Go 语言中是被推荐的,在几乎所有的编程语言中,我们都会推荐这种编程的方式,它为我们的程序提供了非常强的灵活性,想要构建一个稳定、健壮的 Go 语言项目,不使用接口是完全无法做到的。

如果一个略有规模的项目中没有出现任何 type … interface 的定义,那么作者可以推测出这在很大的概率上是一个工程质量堪忧并且没有多少单元测试覆盖的项目,我们确实需要认真考虑一下如何使用接口对项目进行重构。

事实上官方库也都是按照这个思想来实现的,比如net/http包(对这个包的分析参考 golang 的 webserver 是如何工作的)。当我们要启动一个http server时一般代码如下:

1
http.ListenAndServe(":8090", nil)

这个函数的定义

1
2
3
4
func ListenAndServe(addr string, handler Handler) error {
server := &Server{Addr: addr, Handler: handler}
return server.ListenAndServe()
}

第二个参数是Handler类型, 这个函数的类型定义如下:

1
2
3
type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}

定义的正是一个接口。这个接口只有一个函数ServeHTTP, 而最终对请求处理调用的也正是这个函数:

1
2
3
4
5
6
7
8
9
10
func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {
handler := sh.srv.Handler
if handler == nil {
handler = DefaultServeMux
}
if req.RequestURI == "*" && req.Method == "OPTIONS" {
handler = globalOptionsHandler{}
}
handler.ServeHTTP(rw, req)
}

由于第二个函数我们一般都会传nil, 所以会执行上面的逻辑

1
handler = DefaultServeMux

DefaultServeMux就是官方的默认实现。而我们也可以通过传递这个参数来实现自己的处理, 很多Web框架就是怎么做的,比如gin:

1
2
3
4
5
6
7
8
9
10
func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {
c := engine.pool.Get().(*Context)
c.writermem.reset(w)
c.Request = req
c.reset()

engine.handleHTTPRequest(c)

engine.pool.Put(c)
}

gin自己实现了连接的处理方式,并且把这个实现作为参数传给net/http, 具体代码如下:

1
2
3
4
5
6
7
8
func (engine *Engine) Run(addr ...string) (err error) {
defer func() { debugPrintError(err) }()

address := resolveAddress(addr)
debugPrint("Listening and serving HTTP on %s\n", address)
err = http.ListenAndServe(address, engine)
return
}

这个实现正是前面说讲的: 依赖接口编程,然后通过依赖注入把实例传入

测试

我们有一个user包,里面是处理用户相关的信息, 还有一个bank包,bank会调用user的方法来获取一些用户信息, 刚开始他们的代码实现分别如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package user

import (
"database/sql"
)

var db *sql.DB

// UserName return the name of user with uid
func UserName(uid int) (string, error) {
sql := "select name from user where uid = ?"
rows, err := db.Query(sql, uid)
if err != nil {
return "", err
}
defer rows.Close()

var name string
for rows.Next() {
if err := rows.Scan(&name); err != nil {
return "", err
}
}

return name, nil
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package bank

import (
"user"
)

// UserInfo if uid exist return the name of user
func UserInfo(uid int) string {
name, err := user.UserName(uid)
if err != nil {
return "something was wrong"
}
if name == "" {
return "not found this user"
}
return "user name is " + name
}

现在如果我们要给bankUserInfo添加单元测试应该怎么做呢? 这里有以下几点问题:

  • 我们要测试的是bankUserInfo函数,而不是为了测试这个函数都调用的函数,所以我们其实不太关心user.UserName里面的逻辑
  • 我们要测试UserInfo就必须要从UserName获取一些信息,但是UserName的信息需要调用db才能获取,这里涉及到一些网络访问,会给我们的测试带来很多麻烦
  • 我们需要把UserName给Mock掉
    关于如何把UserName Mock掉, 其实我们可以借助一些mock的框架(比如bou.ke/monkey)来进行处理, 但是这种方法回避了设计上的一些问题,过度依赖会导致我们的代码质量堪忧,还有一些场景要求我们必须替换这个方法的实现,比如我们不想使用user.UserName的查询方式了,我们换了一种实现,这样我们就无法复用原来的代码了。
    下面我们介绍如何利用上面介绍的知识来解决这个问题:

面向接口编程

我们看一下第二版的代码
user包:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import (
"database/sql"
)

var db *sql.DB

type User interface {
UserName(int) (string, error)
}

type DefaultUser struct{}

func New() User {
return DefaultUser{}
}

// UserName return the name of user with uid
func (DefaultUser) UserName(uid int) (string, error) {
sql := "select name from user where uid = ?"
rows, err := db.Query(sql, uid)
if err != nil {
return "", err
}
defer rows.Close()

var name string
for rows.Next() {
if err := rows.Scan(&name); err != nil {
return "", err
}
}

return name, nil
}

bank代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package bank

import (
"user"
)

var u = user.New()

// UserInfo if uid exist return the name of user
func UserInfo(uid int) string {
name, err := u.UserName(uid)
if err != nil {
return "something was wrong"
}
if name == "" {
return "not found this user"
}
return "user name is " + name
}

上面我们都做了那些修改呢?

  • 面向接口编程
    我们定义了一个接口类型User:
    1
    2
    3
    type User interface {
    UserName(int) (string, error)
    }

然后user包用DefaultUser来实现了这个方法,所以DefaultUser就是这个类型的包了
bank中定义了一个变量var u = user.New(), 由于user.New()的类型也是User,所以u的类型就是User, 然后在UserInfo函数中调用User类型的UserName方法
也就是说userbank都是面向User来进行编程的

  • 依赖注入
    我们第一个版本是直接调用user.UserName函数, 但是我们无法自己去修改这个函数的实现,所以我们通过var u = user.New()来获取user给我传递的一个对象,这样我们就可以通过u来调用UserName函数了,这时user.New就实现了依赖注入,这样做我们就可以通过覆盖u这个实例,来完成自己的实现了,下面

单元测试

面对版本二, 我们怎么实现bank.UserInfo的单元测试呢?
bank_test.go来看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package bank

import (
"errors"
"testing"
)

type mockUser struct{}

func (u mockUser) UserName(uid int) (string, error) {
if uid == 1 {
return "", errors.New("something was wrong")
}
if uid == 2 {
return "", nil
}
if uid == 3 {
return "John", nil
}
return "", errors.New("something was wrong")
}

func TestUserInfo(t *testing.T) {
u = mockUser{}
cases := []struct {
name string
uid int
res string
}{
{"test1", 1, "something was wrong"},
{"test2", 2, "not found this user"},
{"test3", 3, "user name is John"},
}
for _, v := range cases {
t.Run(v.name, func(t *testing.T) {
info := UserInfo(v.uid)
if info != v.res {
t.Errorf("got %s; want %s", info, v.res)
}
})
}
}

1
2
3
4
5
6
7
8
9
10
11
go test -v
=== RUN TestUserInfo
=== RUN TestUserInfo/test1
=== RUN TestUserInfo/test2
=== RUN TestUserInfo/test3
--- PASS: TestUserInfo (0.00s)
--- PASS: TestUserInfo/test1 (0.00s)
--- PASS: TestUserInfo/test2 (0.00s)
--- PASS: TestUserInfo/test3 (0.00s)
PASS
ok bank 0.013s

首先我们定义了一个mockUser, 然后实现了UserName函数,所以这时mockUser已经是User类型了,然后我们在测试函数里通过u = mockUser替换掉了运来的var u = user.New(), 这时候在执行UserInfo调用的其实就是mockUser.UserName函数了,完美的完成了我们的单元测试。

工厂模式

前面我们虽然用依赖注入的方式完成了调用,但是还有一个问题, 当我们依赖注入的时候用的是var u = user.New()的方式来获取的,但是在错综复杂的调用过程中,我们难免会多次调用user.New()函数,而且我们还要共享同一个User, 这时候就要求我们使用工厂模式保证不管多少次调用,返回的都是同一个User, 在上面的代码中其实很好改:
我们把user中的

1
2
3
func New() User {
return DefaultUser{}
}

改为下面的实现:

1
2
3
4
5
var defaultUser = DefaultUser{}

func New() User {
return defaultUser
}

这样我们每次返回的都是user内部的defaultUser这个实例,而这个实例只初始化了一次, 所有通过这个方法获取的实例都是同一个实例

简化调用

有时候我们会觉得每次调用都通过依赖注入的传递一个对象,会使得调用变的复杂起来,比如本来我们调用的时候只需要

1
user.UserName()

而现在可能我们的调用变成了

1
2
var u = user.New()
u.UserName()

那么我们如何使用更符合go的方式,直接使用包调用而不是每次都传递一个对象呢?我们可以改为下面的方式:
user的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package user

import (
"database/sql"
)

var db *sql.DB

type User interface {
UserName(int) (string, error)
}

type DefaultUser struct{}

var defaultUser = DefaultUser{}
var definedUser User

func New() User {
return defaultUser
}

func SetUser(u User) {
definedUser = u
}

// UserName return the name of user with uid
func (DefaultUser) UserName(uid int) (string, error) {
sql := "select name from user where uid = ?"
rows, err := db.Query(sql, uid)
if err != nil {
return "", err
}
defer rows.Close()

var name string
for rows.Next() {
if err := rows.Scan(&name); err != nil {
return "", err
}
}

return name, nil
}

func UserName(uid int) (string, error) {
if definedUser == nil {
return defaultUser.UserName(uid)
}
return definedUser.UserName(uid)
}

这里我们新增加了一个变量definedUser来表示用户自定义的实例,然后通过SetUser来对其进行复制,我们同时增加了一个包级别的UserName函数,里面的实现会判断如果有definedUser那么我们就是用这个自定义的实现,如果没有我们就调用默认的实现

bank的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package bank

import (
"user"
)

// UserInfo if uid exist return the name of user
func UserInfo(uid int) string {
name, err := user.UserName(uid)
if err != nil {
return "something was wrong"
}
if name == "" {
return "not found this user"
}
return "user name is " + name
}

bank的实现跟第一个版本一样,如果我们不需要修改默认实现,对于调用来说非常方便,我们不用关系其内部的具体实现。

bank_test的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package bank

import (
"errors"
"testing"
"user"
)

type mockUser struct{}

func (u mockUser) UserName(uid int) (string, error) {
if uid == 1 {
return "", errors.New("something was wrong")
}
if uid == 2 {
return "", nil
}
if uid == 3 {
return "John", nil
}
return "", errors.New("something was wrong")
}

func TestUserInfo(t *testing.T) {
user.SetUser(mockUser{})
cases := []struct {
name string
uid int
res string
}{
{"test1", 1, "something was wrong"},
{"test2", 2, "not found this user"},
{"test3", 3, "user name is John"},
}
for _, v := range cases {
t.Run(v.name, func(t *testing.T) {
info := UserInfo(v.uid)
if info != v.res {
t.Errorf("got %s; want %s", info, v.res)
}
})
}
}

bank_test由于要对UserName进行Mock, 用自己的实现来替换原来的实现,我们只需要在测试的时候调用SetUser函数,就完成了替换。

参考

面向对象设计的设计原则
依赖注入
面相接口编程
面向接口编程详解(一)——思想基础
如何写出优雅的 Golang 代码
使用Golang的interface接口设计原则
Duck typing in Go

Go 调度器抢占方式

OS 调度

Go 调度

被抢占后把 g 状态从 _Grunning 改为 _Grunnable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func goschedImpl(gp *g) {
status := readgstatus(gp)
if status&^_Gscan != _Grunning {
dumpgstatus(gp)
throw("bad g status")
}
casgstatus(gp, _Grunning, _Grunnable)
dropg()
lock(&sched.lock)
globrunqput(gp)
unlock(&sched.lock)

schedule()
}

Go 调度的问题

deadloop

Go的抢占需要依赖函数的调用,只有在函数调用(准确的说是函数调用产生morestack调用的时候)的时候才会进行真正的强占,那么对于下面的这个方式:

1
2
3
4
go func() {
for {
}
}

这是一个死循环,而且里面没有任何函数调用,也不会进行栈的扩张,所以这个goroutine永远不会被抢占。
参考Goroutine调度实例简要分析 这篇文档的说明,我们看一下具体的问题及解决方案。
// todo: 继续完善上篇文章中的例子

deadloop & GC

还有这样一个case:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import "runtime"

func main() {
var ch = make(chan int, 100)
go func() {
for i := 0; i < 100; i++ {
ch <- 1
if i == 88 {
runtime.GC()
}
}
}()

for {
// the wrong part
if len(ch) == 100 {
sum := 0
itemNum := len(ch)
for i := 0; i < itemNum; i++ {
sum += <-ch
}
if sum == itemNum {
return
}
}
}

}

上面这个程序也会hang死。

下面这段代码在主goroutine中运行

1
2
3
4
5
6
7
8
9
10
11
12
13
for {
// the wrong part
if len(ch) == 100 {
sum := 0
itemNum := len(ch)
for i := 0; i < itemNum; i++ {
sum += <-ch
}
if sum == itemNum {
return
}
}
}

上面这个程序由于没有函数的调用和Goshced()的主动调用所以会通过阻塞监控的方式被动弃权。

runtime.GC

当执行 runtime.GC()的时候都发生了什么?我们来看一下
通过dlv这个工具我们可以对这个程序进行断点调试:

1
dlv debug go run gc.go

函数会执行到 stopTheWorldWithSema 这个函数,这个函数主要作用是停止所有的P,然后进行垃圾回收,我们通过一步一步调试发现, 这个函数会下面这个循环中无法出来:

1
2
3
4
5
6
7
8
9
10
11
// wait for remaining P's to stop voluntarily
if wait {
for {
// wait for 100us, then try to re-preempt in case of any races
if notetsleep(&sched.stopnote, 100*1000) {
noteclear(&sched.stopnote)
break
}
preemptall()
}
}

为什么会在这个地方无法出来?下面分析一下具体原因。

GC种一个步骤是要把所有的 p 都设置为_Pgcstop 状态后才能继续进行。 下面看看这个步骤是否能够完成。

stopTheWorldWithSema函数更加详细的执行过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func stopTheWorldWithSema() {
_g_ := getg()

// If we hold a lock, then we won't be able to stop another M
// that is blocked trying to acquire the lock.
if _g_.m.locks > 0 {
throw("stopTheWorld: holding locks")
}

lock(&sched.lock)
sched.stopwait = gomaxprocs // 设置stopwait的初始值为最大的 p 的个数
atomic.Store(&sched.gcwaiting, 1) // 设置 gcwaiting = 1, 表示正在进入GC状态
preemptall() // 给所有的 p 发送抢占信号,如果成功,则对应的 p 进入 idle 状态
// stop current P
_g_.m.p.ptr().status = _Pgcstop // Pgcstop is only diagnostic.
sched.stopwait-- // 给他当前的设置状态后,stopwait个数减一
// try to retake all P's in Psyscall status
// 遍历所有的 p 如果满足条件(p的状态为 _Psyscall)则释放这个 p , 并且把 p 的状态都设置成 _Pgcstop ; 然后stopwait--
for _, p := range allp {
s := p.status
if s == _Psyscall && atomic.Cas(&p.status, s, _Pgcstop) {
if trace.enabled {
traceGoSysBlock(p)
traceProcStop(p)
}
p.syscalltick++
sched.stopwait--
}
}
// stop idle P's
for {
p := pidleget() //获取idle 状态的 p, 从 _Pidle list 获取
if p == nil {
break
}
p.status = _Pgcstop // 把 p 状态设置为 _Pgcstop
sched.stopwait-- // 计数 stopwait --
}
wait := sched.stopwait > 0
unlock(&sched.lock)

// wait for remaining P's to stop voluntarily
if wait {
for {
// wait for 100us, then try to re-preempt in case of any races
if notetsleep(&sched.stopnote, 100*1000) {
noteclear(&sched.stopnote)
break
}
// 再次给所有的 p 发送 抢占信号
preemptall()
}
}
...
}

上面函数把所有非_Prunning状态的 p 都设置为了 _Pgcstop 状态,对于 _Prunning 状态的 p 如何设置其为 _Pgcstop 状态呢? 主要是通过 preemptall()函数给每个 p 发送抢占信号
preemptall() 其实时调用了 preemptone() 前面我们已经讲了具体的原理。被抢占后 p 重新进入调度阶段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func schedule() {
_g_ := getg()

if _g_.m.locks != 0 {
throw("schedule: holding locks")
}

if _g_.m.lockedg != 0 {
stoplockedm()
execute(_g_.m.lockedg.ptr(), false) // Never returns.
}

// 我们不应该调度一个正在执行 cgo 调用的 g
// 因为 cgo 在使用当前 m 的 g0 栈
// We should not schedule away from a g that is executing a cgo call,
// since the cgo call is using the m's g0 stack.
if _g_.m.incgo {
throw("schedule: in cgo")
}

top:
if sched.gcwaiting != 0 {
// 如果还在等待 gc,则
gcstopm()
goto top // 循环执行
}
...

上面说调度器会会把 gcwaiting设置为1, 所以这里会进入 gcstopm(), 直到所有的 m 都被stop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func gcstopm() {
_g_ := getg()

if sched.gcwaiting == 0 {
throw("gcstopm: not waiting for gc")
}
if _g_.m.spinning {
_g_.m.spinning = false
// OK to just drop nmspinning here,
// startTheWorld will unpark threads as necessary.
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("gcstopm: negative nmspinning")
}
}
_p_ := releasep()
lock(&sched.lock)
_p_.status = _Pgcstop //设置 p 状态为 _Pgcstop
sched.stopwait--
if sched.stopwait == 0 {
notewakeup(&sched.stopnote)
}
unlock(&sched.lock)
stopm()
}

gcstopm() 会把 p 的状态置为 _Pgcstop

但是死循环的 g 不会被抢占,所以其 p 状态会一直是 Prunning 无法被设置为 Pgcstop

再回到前面进入死循环的地方:

1
2
3
4
5
6
7
8
9
10
11
// wait for remaining P's to stop voluntarily
if wait {
for {
// wait for 100us, then try to re-preempt in case of any races
if notetsleep(&sched.stopnote, 100*1000) {
noteclear(&sched.stopnote)
break
}
preemptall()
}
}

这里进入死循环的原因是条件

1
notetsleep(&sched.stopnote, 100*1000) == true

不满足
notetsleep函数内部每隔一段时间就会返回:

1
return atomic.Load(key32(&n.key)) != 0 // n.key 为参数 &shced.stopnote.key的值

这个函数意思是&sched.stopnote.key != 0
如果要想让返回值为 true 就需要满足上面的条件。 stopnote.key的值有两个函数可以控制:

  • notewakeupstopnote 设置为 1
  • noteclear 把stopnote设置为 0 所以我们需要调用notewakeup才行。而这个函数我们可以看到是在gcstopm()`种有调用:
    1
    2
    3
    4
    sched.stopwait--
    if sched.stopwait == 0 {
    notewakeup(&sched.stopnote)
    }

由于存在 g 无法被抢占,所以其对应的 p 不会释放, stopwait也就不能为0, 所以也就无法执行notewakeup,最终导致上面的循环无法出来。

死锁状态的发生:

  • GC: 要想进行GC就需要所有的P都转为空闲状态,而主goroutine无法被抢占,对应的P也无法进入空闲。所以GC会一直阻塞。
  • 新启动的goroutine: 由于新启动的goroutine也进入了空闲状态
  • goroutine: 由于新启动的goroutine进入了空闲状态,无法再给chan发信号,所以主goroutine也无法退出。
    由于上面三个都进入了阻塞状态,导致了整个程序进入了死锁状态。

参考

scheduling-in-go-part1
scheduling-in-go-part2
scheduling-in-go-part3
go-under-the-hood
non-cooperative-preemption
如何定位 golang 进程 hang 死的 bug
Goroutine调度实例简要分析

【译】 Go 语言机制之栈和指针 (1)

前言

这个系列的文章主要是帮助你理解指针,栈,堆,逃逸分析和值/指针语义的设计和机制。本系列一共有四篇,这是第一篇。本篇主要是介绍栈和指针。

本系列文章索引:
1) Go 语言机制之栈和指针
2) Go 语言机制之逃逸分析
3) Go 语言机制之内存性能分析
4) Go 语言机制之数据和语义的使用原则

简介

我并不打算美化指针,因为指针真的很难理解。如果我们使用不当,指针能够产生很难理解的 bug 甚至影响性能。 在写并发程序或者多线程程序时这种问题更为明显。这也难怪很多语言试图对开发者隐藏指针的使用。尽管如此,如果你是用 Go 来开发程序,你是没有办法避免使用指针的。相对于深入的理解指针,你更应该关注如何写出干净,简单并且有效的代码。

golang http client 连接池

golang标准库net/http做为client时有哪些细节需要注意呢,这里做一个详细的分析。

net/http client工作流程

首先分析一下client的工作流程。 下面是一般我们进行一个请求时的代码事例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func DoRequest(req *http.Request) (MyResponse, error) {
client := &http.Client{}
resp, err := client.Do(req)
if resp != nil {
defer resp.Body.Close()
}
if err != nil {
return nil, err
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}

response := MyResponse{}
response.Header = resp.Header
...
response.Body = body

return response, nil
}

代码中我们首先创建一个http.Client, 所有的值都是默认值,然后调用client.Do发请求,req是我们请求的结构体。这里我们也可以用client.Get, client.Post等函数来调用,从他们的源码来看都是调用的client.Do
client.Do的实现在net/http包的go/src/net/http/client.go源文件中。可以看到函数内部主要是实现了一些参数检查,默认值设置,以及对于多跳请求的处理,最为核心的就是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
...
if resp, didTimeout, err = c.send(req, deadline); err != nil {
// c.send() always closes req.Body
reqBodyClosed = true
if !deadline.IsZero() && didTimeout() {
err = &httpError{
err: err.Error() + " (Client.Timeout exceeded while awaiting headers)",
timeout: true,
}
}
return nil, uerr(err)
}

var shouldRedirect bool
redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0])
if !shouldRedirect {
return resp, nil
}
...

这里真正发请求的函数就是c.send, 这个函数的实现也比较简单, 主要是调用了send函数,这个函数的实现主要如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// didTimeout is non-nil only if err != nil.
func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
if c.Jar != nil {
for _, cookie := range c.Jar.Cookies(req.URL) {
req.AddCookie(cookie)
}
}
resp, didTimeout, err = send(req, c.transport(), deadline)
if err != nil {
return nil, didTimeout, err
}
if c.Jar != nil {
if rc := resp.Cookies(); len(rc) > 0 {
c.Jar.SetCookies(req.URL, rc)
}
}
return resp, nil, nil
}
1
2
3
4
5
6
7
8
9
10
// send issues an HTTP request.
// Caller should close resp.Body when done reading from it.
func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
...
stopTimer, didTimeout := setRequestCancel(req, rt, deadline)
...
resp, err = rt.RoundTrip(req)
...
return resp, nil, nil
}

这里真正进行网络交互的定位到的函数是rt.RoundTrip,这个函数的定义是一个interface,从其注释也可以看出他的主要作用是:

1
2
// RoundTrip executes a single HTTP transaction, returning
// a Response for the provided Request.`

由于这个函数是一个interface我们需要知道是谁实现了这个函数,看一下send的参数就可以找到,实现这个函数的是c.transport()的返回值,这个函数的实现如下:

1
2
3
4
5
6
func (c *Client) transport() RoundTripper {
if c.Transport != nil {
return c.Transport
}
return DefaultTransport
}

这里可以看到,返回的对象是c.Transport或者DefaultTransport, 由于我们创建client的时候没有设置c.Transport参数,所以这里返回的应该是DefaultTransport对象, 这个对象对RoundTripper函数的实现大概如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// RoundTrip implements the RoundTripper interface.
//
// For higher-level HTTP client support (such as handling of cookies
// and redirects), see Get, Post, and the Client type.
func (t *Transport) RoundTrip(req *Request) (*Response, error) {
...
for {
...
pconn, err := t.getConn(treq, cm)
...
if pconn.alt != nil {
// HTTP/2 path.
t.setReqCanceler(req, nil) // not cancelable with CancelRequest
resp, err = pconn.alt.RoundTrip(req)
} else {
resp, err = pconn.roundTrip(treq)
}
}
...
}

里面具体的细节我们先不关系,对于HTTP/2的处理我们也先不关心。这里需要重点关注的是t.getConn这个函数。t.getConn的作用是获取一个链接,这个链接该怎么获取,是一个值得深究的问题。下面看一下这个函数的关键实现细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// getConn dials and creates a new persistConn to the target as
// specified in the connectMethod. This includes doing a proxy CONNECT
// and/or setting up TLS. If this doesn't return an error, the persistConn
// is ready to write requests to.
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistConn, error) {
req := treq.Request
trace := treq.trace
ctx := req.Context()
if trace != nil && trace.GetConn != nil {
trace.GetConn(cm.addr())
}
if pc, idleSince := t.getIdleConn(cm); pc != nil {
if trace != nil && trace.GotConn != nil {
trace.GotConn(pc.gotIdleConnTrace(idleSince))
}
// set request canceler to some non-nil function so we
// can detect whether it was cleared between now and when
// we enter roundTrip
t.setReqCanceler(req, func(error) {})
return pc, nil
}
...
handlePendingDial := func() {
testHookPrePendingDial()
go func() {
if v := <-dialc; v.err == nil {
t.putOrCloseIdleConn(v.pc)
}
testHookPostPendingDial()
}()
}

cancelc := make(chan error, 1)
t.setReqCanceler(req, func(err error) { cancelc <- err })

go func() {
pc, err := t.dialConn(ctx, cm)
dialc <- dialRes{pc, err}
}()
idleConnCh := t.getIdleConnCh(cm)
select {
case v := <-dialc:
// Our dial finished.
if v.pc != nil {
if trace != nil && trace.GotConn != nil && v.pc.alt == nil {
trace.GotConn(httptrace.GotConnInfo{Conn: v.pc.conn})
}
return v.pc, nil
}
// Our dial failed. See why to return a nicer error
// value.
select {
case <-req.Cancel:
// It was an error due to cancelation, so prioritize that
// error value. (Issue 16049)
return nil, errRequestCanceledConn
case <-req.Context().Done():
return nil, req.Context().Err()
case err := <-cancelc:
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
default:
// It wasn't an error due to cancelation, so
// return the original error message:
return nil, v.err
}
case pc := <-idleConnCh:
// Another request finished first and its net.Conn
// became available before our dial. Or somebody
// else's dial that they didn't use.
// But our dial is still going, so give it away
// when it finishes:
handlePendingDial()
if trace != nil && trace.GotConn != nil {
trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()})
}
return pc, nil
case <-req.Cancel:
handlePendingDial()
return nil, errRequestCanceledConn
case <-req.Context().Done():
handlePendingDial()
return nil, req.Context().Err()
case err := <-cancelc:
handlePendingDial()
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
}
}

下面是这个过程的流程图:

从上面可以看到,获取链接会优先从连接池中获取,如果连接池中没有可用的连接,则会创建一个连接或者从刚刚释放的连接中获取一个,这两个过程时同时进行的,谁先获取到连接就用谁的。
当新创建一个连接, 创建连接的函数定义如下:

1
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (*persistConn, error)

最后这个函数会通过goroutine调用两个函数:

1
2
go pconn.readLoop()
go pconn.writeLoop()

其中readLoop主要是读取从server返回的数据,writeLoop主要发送请求到server,在readLoop函数中有这么一段代码:

1
2
3
4
5
6
7
8
9
10
// Put the idle conn back into the pool before we send the response
// so if they process it quickly and make another request, they'll
// get this same conn. But we use the unbuffered channel 'rc'
// to guarantee that persistConn.roundTrip got out of its select
// potentially waiting for this persistConn to close.
// but after
alive = alive &&
!pc.sawEOF &&
pc.wroteRequest() &&
tryPutIdleConn(trace)

这里可以看出,在处理完请求后,会立即把当前连接放到连接池中。

上面说到连接池,每个client的连接池结构是这样的:idleConn map[connectMethodKey][]*persistConn。其中connectMethodKey的值就是client连接的server的host值, map的值是一个*persistConn类型的slice结构,这里就是存放连接的地方,slice的长度由MaxIdleConnsPerHost这个值指定的,当我们不设置这个值的时候就取默认的设置:const DefaultMaxIdleConnsPerHost = 2

另外这里我们插一个知识点,对于HTTP协议,有一个header值”Connections”, 这个值的作用就是clientserver端发请求的时候,告诉server是否要保持连接。具体的可以参考rfc2616。 这个协议头的值有两种可能(参考MDN文档):

1
2
Connection: keep-alive
Connection: close

当值为keep-alive时,server端会保持连接,一直到连接超时。当值为close时,server端会在传输完response后主动断掉TCP连接。在HTTP/1.1之前,这个值默认是close, 之后是默认keep-alive, 而net/http默认的协议是HTTP/1.1也就是默认keep-alive, 这个值可以通过DisableKeepAlives来设置。

从上面的介绍我们可以看出,net/http默认是连接复用的,对于每个server会默认的连接池大小是2。
接下来我们看一下连接是如何放进连接池的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
if err := t.tryPutIdleConn(pconn); err != nil {
pconn.close(err)
}
}


// tryPutIdleConn adds pconn to the list of idle persistent connections awaiting
// a new request.
// If pconn is no longer needed or not in a good state, tryPutIdleConn returns
// an error explaining why it wasn't registered.
// tryPutIdleConn does not close pconn. Use putOrCloseIdleConn instead for that.
func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
return errKeepAlivesDisabled
}
if pconn.isBroken() {
return errConnBroken
}
if pconn.alt != nil {
return errNotCachingH2Conn
}
pconn.markReused()
key := pconn.cacheKey

t.idleMu.Lock()
defer t.idleMu.Unlock()
waitingDialer := t.idleConnCh[key]
select {
case waitingDialer <- pconn:
// We're done with this pconn and somebody else is
// currently waiting for a conn of this type (they're
// actively dialing, but this conn is ready
// first). Chrome calls this socket late binding. See
// https://insouciant.org/tech/connection-management-in-chromium/
return nil
default:
if waitingDialer != nil {
// They had populated this, but their dial won
// first, so we can clean up this map entry.
delete(t.idleConnCh, key)
}
}
if t.wantIdle {
return errWantIdle
}
if t.idleConn == nil {
t.idleConn = make(map[connectMethodKey][]*persistConn)
}
idles := t.idleConn[key]
if len(idles) >= t.maxIdleConnsPerHost() {
return errTooManyIdleHost
}
for _, exist := range idles {
if exist == pconn {
log.Fatalf("dup idle pconn %p in freelist", pconn)
}
}
t.idleConn[key] = append(idles, pconn)
t.idleLRU.add(pconn)
if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns {
oldest := t.idleLRU.removeOldest()
oldest.close(errTooManyIdle)
t.removeIdleConnLocked(oldest)
}
if t.IdleConnTimeout > 0 {
if pconn.idleTimer != nil {
pconn.idleTimer.Reset(t.IdleConnTimeout)
} else {
pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
}
}
pconn.idleAt = time.Now()
return nil
}

首先会尝试把连接放入到连接池中,如果不成功则关闭连接,大致流程如下:

如果DisableKeepAlivestrue表示不使用连接复用,所以请求完后会把连接关掉,但是这里需要注意的是,同时发请求的时候我们会设置Connections: close, 所以server端发送完数据后就会自动断开,所以这种情况的连接其实是server端发起的。

长连接与短连接

前面我们已经讲过net/http默认使用HTTP/1.1协议,也就是默认发送Connections: keep-alive的头,让服务端保持连接,就是所谓的长连接。
再看DefaultTransport的值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// DefaultTransport is the default implementation of Transport and is
// used by DefaultClient. It establishes network connections as needed
// and caches them for reuse by subsequent calls. It uses HTTP proxies
// as directed by the $HTTP_PROXY and $NO_PROXY (or $http_proxy and
// $no_proxy) environment variables.
var DefaultTransport RoundTripper = &Transport{
Proxy: ProxyFromEnvironment, //代理使用
DialContext: (&net.Dialer{
Timeout: 30 * time.Second, //连接超时时间
KeepAlive: 30 * time.Second, //连接保持超时时间
DualStack: true, //
}).DialContext,
MaxIdleConns: 100, //client对与所有host最大空闲连接数总和
IdleConnTimeout: 90 * time.Second, //空闲连接在连接池中的超时时间
TLSHandshakeTimeout: 10 * time.Second, //TLS安全连接握手超时时间
ExpectContinueTimeout: 1 * time.Second, //发送完请求到接收到响应头的超时时间
}

当我们使用DefaultTransport时,就是默认使用的长连接。但是默认的连接池MaxIdleConns为100, MaxIdleConnsPerHost为2,当超出这个范围时,客户端会主动关闭到连接。
如果我们想设置为短连接,有几种方法:

  1. 设置DisableKeepAlives = true: 这时就会发送Connections:close给server端,在server端响应后就会主动关闭连接。
  2. 设置MaxIdleConnsPerHost < 0: 当MaxIdleConnsPerHost < 0时,连接池是无法放置空闲连接的,所以无法复用,连接直接会在client端被关闭。

Server端出现大量的TIME_WAIT

当我们在实际使用时,会发现Server端出现了大量的TIME_WAIT,要想深入了解其原因,我们首先先回顾一下TCP三次握手和四次分手的过程:


图中可以看出,TIME_WAIT只会出现在主动关闭连接的一方,也就是server端出现了大量的主动关闭行为。
默认我们是使用长连接的,只有在超时的情况下server端才会主动关闭连接。前面也讲到,如果超出连接池的部分就会在client端主动关闭连接,连接池的连接会复用,看着似乎没有什么问题。问题出在我们每次请求都会new一个新的client,这样每个client的连接池里的连接并没有得到复用,而且这时client也不会主动关闭这个连接,所以server端出现了大量的keep-alive但是没有请求的连接,就会主动发起关闭。

todo:补充tcpdump的分析结果

要解决这个问题以下几个方案:

  1. client复用,也就是我们尽量复用client,来保证client连接池里面的连接得到复用,而减少出现超时关闭的情况。
  2. 设置MaxIdleConnsPerHost < 0:这样每次请求后都会由client发起主动关闭连接的请求,server端就不会出现大量的TIME_WAIT
  3. 修改server内核参数: 当出现大量的TIME_WAIT时危害就是导致fd不够用,无法处理新的请求。我们可以通过设置/etc/sysctl.conf文件中的

    1
    2
    3
    4
    #表示开启重用。允许将TIME-WAIT sockets重新用于新的TCP连接,默认为0,表示关闭  
    net.ipv4.tcp_tw_reuse = 1
    #表示开启TCP连接中TIME-WAIT sockets的快速回收,默认为0,表示关闭
    net.ipv4.tcp_tw_recycle = 1

    达到快速回收和重用的效果,不影响其对新连接的处理。

另外需要注意的是,虽然DisableKeepAlives = true也能满足连接池中不放空闲连接,但是这时候会发送Connections: close,这时server端还是会主动关闭连接,导致大量的TIME_WAIT出现,所以这种方法行不通。

以上就是我总结的关于net/http中连接池相关的知识。

亿级流量网站架构核心技术总结

最近读了《亿级流量网站架构核心技术——跟开涛学搭建高可用高并发系统》这本书,总体感觉这本书很实用,作者根据自己负责的项目经历以及业务的发展过程和业界的理论基础相结合讲解了如何搭建一个具有高并发和高可用特征的电商网站。作者是京东的架构师,进来随着京东业务的不断发展,6.18和双十一活动规模的不断扩大,作者都亲身经历了整个电商网站的发展过程,相对于单纯的理论,这本书更多的是能够在业务中快速应用的经验总结。下面就这两方面我把作者的思维导图搬过来,不断提醒自己要注意的整体思想,并能够深入浅出,根据思维导图的每一项都有一个自己更发散更深入的认识。

高可用

高可用

高并发

高并发

golang channels 的串联,扇入扇出及控制

如果说goroutine是Go语言程序的并发体的话,那么channels则是它们之间的通信机制。 一个channel是一个通信机制,它可以让一个goroutine通过它给另一个goroutine发送值信息。 channel之间可以进行串联,并联等组合,组成我们想要的运行方式。 不同goroutine之间需要同步,也需要控制,具体该如何处理这些情况,下面分别进行介绍。

channel基础

使用内置的make函数,我们可以创建一个channel:

1
ch := make(chan int) // ch has type 'chan int'

当我们复制一个channel或用于函数参数传递时,我们只是拷贝了一个channel引用,因此调用者和被调用者将引用同一个channel对象。和其它的引用类型一样,channel的零值也是nil。
两个相同类型的channel可以使用==运算符比较。如果两个channel引用的是相同的对象,那么比较的结果为真。一个channel也可以和nil进行比较。
一个channel有发送和接受两个主要操作,都是通信行为。

1
2
3
ch <- x  // a send statement
x = <-ch // a receive expression in an assignment statement
<-ch // a receive statement; result is discarded

Channel还支持close操作,用于关闭channel,随后对基于该channel的任何发送操作都将导致panic异常。对一个已经被close过的channel进行接收操作依然可以接受到之前已经成功发送的数据;如果channel中已经没有数据的话将产生一个零值的数据。

1
close(ch)

以最简单方式调用make函数创建的是一个无缓存的channel,但是我们也可以指定第二个整型参数,对应channel的容量。如果channel的容量大于零,那么该channel就是带缓存的channel。

1
2
3
ch = make(chan int)    // unbuffered channel
ch = make(chan int, 0) // unbuffered channel
ch = make(chan int, 3) // buffered channel with capacity 3

不带缓存的Channels

一个基于无缓存Channels的发送操作将导致发送者goroutine阻塞,直到另一个goroutine在相同的Channels上执行接收操作,当发送的值通过Channels成功传输之后,两个goroutine可以继续执行后面的语句。反之,如果接收操作先发生,那么接收者goroutine也将阻塞,直到有另一个goroutine在相同的Channels上执行发送操作。
基于无缓存Channels的发送和接收操作将导致两个goroutine做一次同步操作。因为这个原因,无缓存Channels有时候也被称为同步Channels。当通过一个无缓存Channels发送数据时,接收者收到数据发生在唤醒发送者goroutine之前。

对于不带缓存的Channels,我们使用的是有必须放到goroutine,因为如果直接调用chanx <- 1时,会报错fatal error: all goroutines are asleep - deadlock!

1
2
3
4
5
6
package main

func main() {
chanx := make(chan int)
chanx <- 1 //fatal error: all goroutines are asleep - deadlock!
<-chanx

由于主goroutine调用了 chanx <-1, 但是由于是顺序往下执行,执行时还不存在监听chanx的方法存在,所以数据放入chanx后无法唤醒接收的方法,只能等待下去,所以就产生了deadlock。
可以修改为下面的形式,把chanx <- 1放入到一个goroutine里,然后主goroutine监听了这个chanx,当往chanx放数据的时候就会有接收的方法被调用。

1
2
3
4
5
6
package main

func main() {
chanx := make(chan int)
go func() {chanx <- 1}() //right
<-chanx

当使用range遍历chan时别忘了close, 下面当没有使用close时:

1
2
3
4
5
6
7
8
9
10
11
12
13
package main
import "fmt"
func main() {
chanx := make(chan int)
go func() {
for i := 0; i < 3; i++ {
chanx <- i
}
}()
for v := range chanx {
fmt.Println(v)
}
}

output:

1
2
3
4
5
0
1
2
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:

range会从channel中接收数据直到channelclose为止,正常情况下close并不是必须的,只有在接收者需要知道没有更多的数据进入的时候才需要,而range正是需要知道这个信息的。所以代码改成下面这样就没问题了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package main
import "fmt"
func main() {
chanx := make(chan int)
go func() {
for i := 0; i < 3; i++ {
chanx <- i
}
close(chanx)
}()
for v := range chanx {
fmt.Println(v)
}
}

带缓存的Channels

带缓存的Channel内部持有一个元素队列。队列的最大容量是在调用make函数创建channel时通过第二个参数指定的。下面的语句创建了一个可以持有三个字符串元素的带缓存Channel。

1
ch = make(chan string, 3)

向缓存Channel的发送操作就是向内部缓存队列的尾部插入元素,接收操作则是从队列的头部删除元素。如果内部缓存队列是满的,那么发送操作将阻塞直到因另一个goroutine执行接收操作而释放了新的队列空间。相反,如果channel是空的,接收操作将阻塞直到有另一个goroutine执行发送操作而向队列插入元素。

队列元素为1的带缓存Channels与不带缓存的Channels是不同的,下面的例子可以看出:

1
2
3
4
5
6
7
8
9
10
11
package main

func main() {
chan_nobuffer := make(chan int)
chan_nobuffer <- 1 //error 必须放到goroutine中
<-chan_nobuffer

chan_buffer := make(chan int, 1)
chan_buffer <- 1 //right
<-chan_buffer
}

单方向的Channel

channel还有两种语法:<-chan intchan<- int,其意思是单方向的channel, 当定义为out chan<- int表示out只能被往里面放数据,不允许从out拿数据,否则程序会报错receive from send-only type chan<- int,如果定义为in <-chan intin只能往外输出数据,不允许往in里面放数据,否则报错send to receive-only type <-chan int

channel串联

Channels也可以用于将多个goroutine连接在一起,一个Channel的输出作为下一个Channel的输入。 这种串联的Channels就是所谓的管道(pipeline)。下图就是一个串联的channel示意:
串联channel
第一个goroutine Counter负责生成一个0,1,2,3,…形式的整数序列,然后把整数序列输入到一个channel中,通过这个channel传递个下一个goroutine Squarer, 负责将从channel接收到的数求平方,然后再把得出的结果通过channel传递给goroutine Printer, Printer负责将从channel接收的数据打印出来。
其程序实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import (
"fmt"
)

func main() {
chan1 := make(chan int)
chan2 := make(chan int)
go Counter(chan1)
go Squarer(chan2, chan1)
Printer(chan2)

}

func Counter(out chan<- int) {
for i := 1; i < 10; i++ {
out <- i
}
close(out)
}

func Squarer(out chan<- int, in <-chan int){
for v := range in {
out <- v * v
}
close(out)
}

func Printer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}

上面代码中我们创建了两个chan, 然后调用了CounterSquarer, 由于上面说:当我们复制一个channel或用于函数参数传递时,我们只是拷贝了一个channel引用,因此调用者和被调用者将引用同一个channel对象。所以我们对chan1和chan2的修改都是全局的。
Counter往chan1中陆续放入了0,1,2,3,...等数列,然后同步的Squarer接收到数据对其平方并放入chan2,最后Printerchan2中输出这些数据。
对于串联的Channel还有另外一种实现方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
"fmt"
)

func main() {
c := gen(2,3)
out := sq(c)

for v := range out {
fmt.Println(v)
}
}

func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}

func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n*n
}
close(out)
}()
return out
}

上面的gen函数用到了golang的可变参数这个特性,跟上面的Counter不一样的是,这个gen会把chan当做返回值返回,而不是作为参数传入。sq函数也跟Squarer函数不一样了:把上一个函数的chan最为参数,下一个输出的chan作为返回值。

channel扇入扇出

扇出:同一个 channel 可以被多个函数读取数据,直到channel关闭。 这种机制允许将工作负载分发到一组worker,以便更好地并行使用 CPU 和 I/O。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func main() {
c := gen(2,3)
c1 := sq(c)
c2 := sq(c)

for v := range c1 {
fmt.Println(v)
}
fmt.Println("-------------")

for v := range c2 {
fmt.Println(v)
}

}

下面是几种输出样式,可以知道当调用两次sq时,其实是对chan的扇出操作,既一个channel被多个函数读取了。每次读取的顺序和个数都不能保证。

1
2
3
4
5
6
7
8
9
10
11
12
13
#1 
4
------------------
9
#2
4
9
------------------
#3
9
4
------------------
...

扇入:多个 channel 的数据可以被同一个函数读取和处理,然后合并到一个 channel,直到所有 channel都关闭。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)

// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n //对于每个chan其中的元素都放到out中
}
wg.Done() //减少一个goroutine
}
wg.Add(len(cs)) //要执行的goroutine个数
for _, c := range cs {
go output(c) //对传入的多个channel执行output
}

// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait() //等待,直到所有goroutine都完成后
close(out) //所有的都放到out后关闭
}()
return out
}

merge函数的参数也是变长的,类型是chan, 这个函数还用到了sync这个包,这里主要的作用就是对一组goroutines进行同步。首先把传入的cs都通过output调用放入out中,每处理完一个c就调用wg.Done()更新剩余的次数, wg.Wait()等到所有的channels把数据放到out中,然后关闭out

1
2
3
4
5
6
7
8
func main() {
c := gen(2, 3, 4, 5, 6, 7, 8)
out2 := sq(c)
out1 := sq(c)
for v := range merge(out1, out2) {
fmt.Println(v)
}
}

下图就展示了扇入扇出的过程:
串联channel

goroutines控制

参考

golang的webserver是如何工作的

我们知道golang实现一个webserver非常简单,但是其内部是如何工作的呢,我们深入探究一下其原理。

实现一个webserver服务

下面我们就用golang内置的服务实现一个简单的webserver:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"fmt"
"log"
"net/http"
"strings"
)

func sayhelloName(w http.ResponseWriter, r *http.Request) {
r.ParseForm() //解析参数,默认是不会解析的
fmt.Println(r.Form) //这些信息是输出到服务器端的打印信息
fmt.Println("path", r.URL.Path)
fmt.Println("scheme", r.URL.Scheme)
fmt.Println(r.Form["url_long"])
for k, v := range r.Form {
fmt.Println("key:", k)
fmt.Println("val:", strings.Join(v, ""))
}
fmt.Fprintf(w, "Hello astaxie!") //这个写入到w的是输出到客户端的
}

func main() {
http.HandleFunc("/", sayhelloName) //设置访问的路由
http.HandleFunc("/hello", sayhelloName) //设置访问的路由
err := http.ListenAndServe(":8090", nil) //设置监听的端口
if err != nil {
log.Fatal("ListenAndServe: ", err)
}
}

我们可以通过go run main.go来开启Server服务, 当我们访问http://localhost:8090/http://localhost:8090/hello都会得到Hello astaxie!, 也就是都执行了sayhelloName函数。
下面让我们来分析一下服务的代码:
首先我们从main函数入口进入程序执行,首先执行了http.HandleFunc("/", sayhelloName)http.HandleFunc("/hello", sayhelloName)两个方法,这两个方法其实就是设置路由及其对应的处理函数。
然后执行http.ListenAndServe(":8090", nil)这个函数开始监听8090端口并把用户的请求根据之前设置的路由规则交给特定的函数进行处理。
下面我将针对这两个函数进行深入的分析。

http.HandleFunc

这个函数是net/http包中定义的, 第一个参数patternstring类型,表示匹配的URL, 第二个参数handler这是个函数类型,表示一个处理函数。其定义在net/http/server.go中,第一如下:

1
2
3
func HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {
DefaultServeMux.HandleFunc(pattern, handler)
}

这个函数调用了下面这个函数:

1
2
3
func (mux *ServeMux) HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {
mux.Handle(pattern, HandlerFunc(handler))
}

HandelrFunc定义如下, 声明为一个函数类型, HandlerFunc(handler)就是把handler强制类型转化为HandlerFunc类型

1
type HandlerFunc func(ResponseWriter, *Request)

mux.Handle的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (mux *ServeMux) Handle(pattern string, handler Handler) {
mux.mu.Lock()
defer mux.mu.Unlock()

if pattern == "" {
panic("http: invalid pattern " + pattern)
}
if handler == nil {
panic("http: nil handler")
}
if mux.m[pattern].explicit {
panic("http: multiple registrations for " + pattern)
}

if mux.m == nil {
mux.m = make(map[string]muxEntry)
}
mux.m[pattern] = muxEntry{explicit: true, h: handler, pattern: pattern}

if pattern[0] != '/' {
mux.hosts = true
}

// Helpful behavior:
// If pattern is /tree/, insert an implicit permanent redirect for /tree.
// It can be overridden by an explicit registration.
n := len(pattern)
if n > 0 && pattern[n-1] == '/' && !mux.m[pattern[0:n-1]].explicit {
// If pattern contains a host name, strip it and use remaining
// path for redirect.
path := pattern
if pattern[0] != '/' {
// In pattern, at least the last character is a '/', so
// strings.Index can't be -1.
path = pattern[strings.Index(pattern, "/"):]
}
url := &url.URL{Path: path}
mux.m[pattern[0:n-1]] = muxEntry{h: RedirectHandler(url.String(), StatusMovedPermanently), pattern: pattern}
}
}

可以看出这个函数会把patternhandler的对应关系读存储到mux.m这个map里了,mux类型是ServeMux,其定义如下:

1
2
3
4
5
type ServeMux struct {
mu sync.RWMutex
m map[string]muxEntry
hosts bool // whether any patterns contain hostnames
}

经过上面的处理后通过http.HandleFunc设置的patternhandler的对应关系都被存储到了DefaultServeMux这个对象的m中。

http.ListenAndServe

这个函数也是在net/http/server.go中定义的,其定义如下:

1
2
3
4
func ListenAndServe(addr string, handler Handler) error {
server := &Server{Addr: addr, Handler: handler}
return server.ListenAndServe()
}

上面函数最终对调用到下面这个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (srv *Server) Serve(l net.Listener) error {
defer l.Close()
if fn := testHookServerServe; fn != nil {
fn(srv, l)
}
var tempDelay time.Duration // how long to sleep on accept failure

if err := srv.setupHTTP2_Serve(); err != nil {
return err
}

srv.trackListener(l, true)
defer srv.trackListener(l, false)

baseCtx := context.Background() // base is always background, per Issue 16220
ctx := context.WithValue(baseCtx, ServerContextKey, srv)
ctx = context.WithValue(ctx, LocalAddrContextKey, l.Addr())
for {
rw, e := l.Accept()
if e != nil {
select {
case <-srv.getDoneChan():
return ErrServerClosed
default:
}
if ne, ok := e.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
srv.logf("http: Accept error: %v; retrying in %v", e, tempDelay)
time.Sleep(tempDelay)
continue
}
return e
}
tempDelay = 0
c := srv.newConn(rw)
c.setState(c.rwc, StateNew) // before Serve can return
go c.serve(ctx)
}
}

这里通过一个for循环不停的接收请求l.Accept()来得到接收的请求,然后再通过go c.serve(ctx)进行请求的处理。这里用到了协程,也就是每个请求其实是由单独的协程进行处理的,这也是golang作为webserver高效的原因所在。c.serve函数中有一个for循环,会不断读取同一个请求的数据,直到出现问题或者正确读取完毕。读取完请求后会调用serverHandler{c.server}.ServeHTTP(w, w.req)这个函数来处理请求。这个函数定义如下:

1
2
3
4
5
6
7
8
9
10
func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {
handler := sh.srv.Handler
if handler == nil {
handler = DefaultServeMux
}
if req.RequestURI == "*" && req.Method == "OPTIONS" {
handler = globalOptionsHandler{}
}
handler.ServeHTTP(rw, req)
}

当 handler 为 nil:

可以看到当我们不在ListenAndServe中传递handler时,也就是sh.srv.Handler = nilhanlder=DefaultServeMux,这个 DefaultServeMux正式我们前面通过http.HandleFunc来设置的。 下面调用了hanlder.ServeHTTP,这里也就是调用了DefaultServeMux.ServeHTTP, 这个函数定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// ServeHTTP dispatches the request to the handler whose
// pattern most closely matches the request URL.
func (mux *ServeMux) ServeHTTP(w ResponseWriter, r *Request) {
if r.RequestURI == "*" {
if r.ProtoAtLeast(1, 1) {
w.Header().Set("Connection", "close")
}
w.WriteHeader(StatusBadRequest)
return
}
h, _ := mux.Handler(r)
h.ServeHTTP(w, r)
}

这个函数中的mux.Handler从请求r中找到请求的URL然后在去mux.m的map结构中找到对应的映射关系从而得出h这个处理函数名。
由于上面说过h是转换为类型HandlerFunc, 这个类型定义的ServeHTTP函数如下:

1
2
3
4
// ServeHTTP calls f(w, r).
func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {
f(w, r)
}

所以调用h.ServeHTTP(w,r)就等于调用h(w,r),也就是我们调用我们自己的写的处理函数。
这些都完成后会执行收尾工作,并把得到的结构返回给请求用户。

当 handler 不为 nil:

这时调用h.ServerHTTP(w,r)其实就是调用自己传入的handlerServerHTTP函数,例如web框架revel的源码github.com/revel/cmd/harness/harness.go中执行revel run app是就会执行下面的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// Run the harness, which listens for requests and proxies them to the app
// server, which it runs and rebuilds as necessary.
func (h *Harness) Run() {
var paths []string
if revel.Config.BoolDefault("watch.gopath", false) {
gopaths := filepath.SplitList(build.Default.GOPATH)
paths = append(paths, gopaths...)
}
paths = append(paths, revel.CodePaths...)
watcher = revel.NewWatcher()
watcher.Listen(h, paths...)

go func() {
addr := fmt.Sprintf("%s:%d", revel.HTTPAddr, revel.HTTPPort)
revel.INFO.Printf("Listening on %s", addr)

var err error
if revel.HTTPSsl {
err = http.ListenAndServeTLS(
addr,
revel.HTTPSslCert,
revel.HTTPSslKey,
h)
} else {
err = http.ListenAndServe(addr, h)
}
if err != nil {
revel.ERROR.Fatalln("Failed to start reverse proxy:", err)
}
}()

// Kill the app on signal.
ch := make(chan os.Signal)
signal.Notify(ch, os.Interrupt, os.Kill)
<-ch
if h.app != nil {
h.app.Kill()
}
os.Exit(1)
}
```
这里也调用了`http.ListenAndServe`但是第二个参数`hanlder`传入了`h`,所以最终会调用`h.ServerHTTP`函数, 这个函数`revel`中是这么实现的:

```golang
// ServeHTTP handles all requests.
// It checks for changes to app, rebuilds if necessary, and forwards the request.
func (h *Harness) ServeHTTP(w http.ResponseWriter, r *http.Request) {
...
// Reverse proxy the request.
// (Need special code for websockets, courtesy of bradfitz)
if strings.EqualFold(r.Header.Get("Upgrade"), "websocket") {
proxyWebsocket(w, r, h.serverHost)
} else {
h.proxy.ServeHTTP(w, r)
}
}

Redis设计与实现总结——独立功能的实现

发布与订阅

通过执行SUBSCRIBE命令,客户端可以订阅一个或多个频道,从而成为这些频道的订阅者(subscriber):每当其他客户端向被订阅的频道发送消息时,频道的所有订阅者都会收到这条消息。
除了订阅频道之外,客户端还可以通过执行PSUBSCRIBE命令订阅一个或多个模式,从而成为这些模式的订阅者:每当有其他客户端祥某个频道发送消息时,消息不仅会被发送给这个频道所有订阅者,它还会被发送给所有与这个频道匹配的模式的订阅者。
Redis将所有频道的订阅管系都保存在服务器状态的pubsub_channels字典里面,这个字典的键是某个被订阅的频道,而键的值则是一个链表,链表里面记录了所有订阅这个频道的客户端。每当执行订阅命令时服务器都会将客户端与被订阅的频道着pubsub_channels字典中进行关联。如果执行退订命令,那么就会从pubsub_channels中删除这个客户端。
模式的订阅则是保存在服务器pubsub_patterns这个属性中,其操作过程与上面相同。
发送消息是就会遍历频道的pubsub_channelspubsub_patterns的客户端,将消息发送给订阅了这些频道和模式的客户端。

事务

Redis通过MULTI,EXEC,WATCH等命令来实现事务(transaction)功能。事务提供了一种将多个命令请求打包,然后一次性,按顺序地执行多个命令的机制,并且在事务执行期间(当接收到EXEC命令后才开始真正执行, 之前只是命令输入),服务器不会中断事务而改去执行其他客户端的命令请求,它会将事务中的所有命令都执行完毕,然后才去处理其他客户端的命令请求。
MULTI命令标识事务的开始,除了EXEC,DISCARD,WATCH,MULTI四个命令外的其他命令都会进入事务的队列中,当接收到EXEC命令时开始执行事务队列中的命令。
WATCH命令是一个乐观锁(optimistic locking), 它可以在EXEC命令执行之前,监视任意数量的数据库键,并在EXEC命令执行时,检查被监视的键是否至少有一个已经被修改过了,如果是的话,服务器将拒绝执行事务,并向客户端返回代表事务执行失败的回复。 (注意WATCH命令执行的顺序是在MULTI之前)。
WATCH命令执行的过程是:

  1. 将监控的键保存到watched_keys字典中,字典的值是所有监视相应数据库键的客户端。
  2. 所有对数据库进行修改的命令都会对watched_keys进行检查,如果键被修改了,就会把客户端的REDIS_DIRTY_CAS标识打开。
  3. 当接收到EXEC执行命令时,如果判断客户端的REDIS_DIRTY_CAS被打开了,标识客户端提交的事务已经不再安全,服务器拒绝客户端提交的事务。

事务的ACID性质: Redis中,事务总是具有原子性(Atomicity), 一致性(Consistency)和隔离性(Isolation),并且当Redis运行在某种特定持久化模式下时,事务也具有耐久性(Durability)

  • 事务的原子性指的是,数据库将事务中的多个操作当做一个整体来执行,服务器要么就执行事务中的所有操作,要么就一个操作也不执行。但是Redis的事务和传统的关系型数据库事务的最大区别在于,Redis不支持事务回滚机制(rollback),即事务队列中的某个命令在执行期间出现了错误,整个事务也会继续执行下去,知道将事务队列中的所有命令都执行完毕为止。
  • 事务具有一致性指的是,如果数据库在执行事务之前一致的,那么事务在执行之后,无论事务是否执行成功,数据库也应该仍然是一致的。一致指的是数据符合数据库本身的定义和要求,没有包含非法或者无效的错误数据。
  • 事务的隔离性指的是,即时数据库中有多个事务并发地执行,各个事务之间也不会相互影响,并且在并发状态下执行的事务和串行执行的事务产生的结果完全相同。因为Redis是使用单线程的方式执行事务,并且服务器保证,在执行事务期间不会对事务进行中断,因此,Redis中的事务总是以串行的方式运行的,并且事务也总是具有隔离性的。
  • 事务的耐久性指的是,当一个事务执行完毕时,执行这个事务所得的结果已经被保存到永久性存储介质里面了,即使服务器在事务执行完毕后停机,,执行事务所得的结果也不会丢失。Redis有RDBAOF两种持久化方案,但是要持久化方案要和性能进行兼顾。

Lua脚本

Redis从2.6版本开始引入对Lua脚本的支持,通过在服务器中嵌入Lua环境,Redis客户端可以使用Lua脚本,直接在服务器端原子地执行多个Redis命令。使用EVAL命令可以直接对输入的脚本进行求值,而EVALSHA命令则可以根据脚本的SHA1校验和来对脚本进行求值。
为了在Redis服务器中执行Lua脚本,Redis在服务器内嵌了一个Lua环境,并对这个Lua环境进行了一系列修改,从而确保这个Lua环境可以满足Redis服务器的需要。
Redis服务器创建并修改Lua环境的整个过程由以下步骤组成:

  1. 创建一个基础的 Lua环境(通过调用lua_open函数)
  2. 载入函数库(基础库,表格库,字符串库等), 让Lua脚本可以使用这些函数库来进行数据操作。
  3. 创建全局表格Redis,这个表格包含了对Redis进行操作的函数,比如用于在 Lua脚本中执行Redis命令的redis.call函数
  4. 使用Redis自制的随机函数来替换Lua原有的代有副作用的随机函数,从而避免在脚本中引入副作用。(关于副作用,纯函数的概念参考:wiki
  5. 创建排序辅助函数,Lua环境使用这个辅助函数来对一部分Redis命令的结果(比如集合)进行排序,从而消除这些命令的不确定性。
  6. 创建redis.pcall函数的错误报告辅助函数,这个函数可以提供更详细的出错信息。
  7. 对Lua环境中的全局环境进行保护,防止用户在执行Lua脚本过程中,将额外的全局变量添加到Lua环境中。
  8. 将完成修改的Lua环境保存到服务器状态的Lua属性中,等待执行服务器传来的Lua脚本。

除了创建并修改Lua环境之外,Redis服务器还创建了两个用于与 Lua环境进行协作的组件,它们分别是负责执行Lua脚本中的Redis命令的伪客户端,以及用于保存Lua脚本的lua_scripts字典。

  • 伪客户端: 执行Redis命令必须有响应的客户端状态,所以为了执行Lua脚本中包含的Redis命令,Redis服务器专门为Lua环境创建了一个伪客户端,并由这个伪客户端负责处理Lua脚本中包含的所有Redis命令。下图是Lua脚本执行Redis命令时的通信步骤:
    redis_lua命令执行步骤
  • lua_scirpts字典: 这个字典的键为某个Lua脚本的SHA1校验和,而字典的值则是SHA1校验和对应的Lua脚本。
    EVAL命令的执行过程可以分为以下三个步骤:
  1. 根据客户端给定的Lua脚本,在Lua环境中定义一个Lua函数。
  2. 将客户端给定的脚本保存到lua_scripts字典中,等待将来进一步使用。
  3. 执行刚刚在Lua环境中定义的函数,以此来执行客户端给定的Lua脚本。

Redis还有四个有关Lua脚本的命令:SCRIPT FLUSH, SCRIPT EXISTS, SCRIPT LOADSCRIPT KILL命令。

排序

Redis的SORT命令可以对列表建,集合键或者有序集合键的值进行排序。
SORT命令的实现原理是(以SORT numbers为例):

  1. 创建一个和要排序的对象numbers长度相同的数组,该数组的每个项都是一个redis.h/redisSortObject结构。
  2. 遍历数组,将各个数组项的obj指针分别指向numbers列表的各个项,构成obj指针和列表项之间一对一关系
  3. 遍历数组,将各个obj指针所指向的列表项转换成一个double类型的浮点数,并将这个浮点数保存在相应数组项的u.score属性里面
  4. 根据数组项u.score属性的值,对数组进行数字值排序(快速排序算法),排序后的数组项按u.score属性的值从小到大排列
  5. 遍历数组,将各个数组项的obj指针所指向的列表项作为排序结果返回给客户端。

其他的排序方式,比如按照字母顺序排列,降序排列,通过外部键进行排序等原理都差不多,变化的是排列的顺序,排列的依据u.score不一样。
更多SORT命令的具体使用和参数可以参考文档:Redis SORT命令

二进制位数组

Redis提供了SETBIT,GETBIT, BITCOUNT, BITOP四个命令用于处理二进制位数组(bit array, 又称为”位数组”)
Redis使用字符串对象来表示位数组,因为字符串对象使用的SDS数据结构是二进制安全的,所以程序可以直接使用SDS结构来保存位数组,并使用SDS结构的操作函数来处理位数组。
具体使用方法参考官方文档。

慢查询日志

Redis的慢查询日志功能用于记录执行时间超过给定时长的命令请求,用户可以通过这个功能产生的日志来监视和优化查询速度。
服务器有两个和慢查询有关的选项:

  • slowlog-log-slower-than选项执行执行时间超过多少微秒的命令请求会被记录到日志上。(可以通过CONFIG SET slowlog-log-slower-than N设置)
  • slowlog-max-len选项执行服务器最多保存多少条慢查询日志。(可以通过CONFIG SET slowlog-max-len N设置)

使用SLOWLOG GET命令可以查看服务器所保存的慢查询日志, 使用SLOWLOG LEN可以查看当前日志的数量。

监视器

通过执行MONITOR命令,客户端可以将自己变为一个监视器,实时地接收并打印服务器当前处理的命令请求的相关信息。当一个客户端使用MONITOR向服务器发送命令时,这个客户端的REDIS_MONITOR标识会被打开,并且客户端本身会被服务器添加到monitors链表的表尾。当服务器每次接收到请求时(处理命令之前), 都会调用replicationFeedMonitors函数,由这个函数将被处理的命令请求的相关信息发送给各个监视器。

Redis设计与实现总结——多机数据库的实现

复制

在Redis中用户可以通过执行SLAVEOF命令或者设置slaveof选项,让一个服务器去复制(repliacte)另一个服务器,被复制的服务器称为主服务器(master),而对服务器进行复制的服务器被称为从服务器(salve)。
复制功能分为同步(sync)和命令传播(command propagate)两个操作:

  • 同步操作用于将从服务器的数据库状态更新至主服务器当前所处的数据库状态。(从服务器主动向主服务器请求数据)
  • 命令传播操作用于在主服务器的数据库状态被修改,导致主从服务器数据库状态出现不一致时,让主服务器的数据库重新回到一致状态。

redis旧版复制

  • 同步过程:
    • 主服务器接收到从服务器发来的SYNC命令,执行BGSAVE命令,创建RDB文件,并使用缓冲区记录接下来执行的所有写命令。
    • 从服务器接收并载入主服务器发来的RDB文件。
    • 主服务器接着发送缓冲区的写命令到从服务器。
    • 从服务器接收命令。
  • 命令传播:
    每当主服务器执行写命令时,主服务器的数据库状态就可能被修改,并导致主从服务器不一致。为了再次回到一致状态,主服务器需要对从服务器执行命令传播操作: 主服务器会将自己执行的写命令发送给从服务器执行,当从服务器执行了相同的写命令后,主从服务器再次回到一致状态。

从服务器初次复制主服务器或者从服务器当前要复制的主服务器和上一次不一样时,RDB文件会完整的传输。在处于命令传播阶段的主从服务器因为网络原因而中断了复制,再次连接上时会重头开始复制。但是第二种情况的效率非常低,很多已经复制过的数据需要再次进行复制。这就是旧版复制功能的缺陷。
新版复制功能为了解决重复复制的问题,提出了一个PSYNC命令代替之前的SYNC命令。完整的复制与上面的第一种情况初次复制是一样的,部分重同步则用于处理断线后的情况: 断线再连接后,主服务器只发送断线期间的写命令到从服务器。
部分重同步的实现是通过复制偏移量:

  • 主服务器每次向从服务器转播N个字节的数据时,就将自己的复制偏移量的值+N
  • 从服务器每次收到主服务器传播来的N个字节数据时,就将自己的复制偏移量的值+N

通过对比主从服务器的复制偏移量,程序可以很容易地知道主从服务器是否处于一致状态:

  • 如果主从服务器处于一致状态,那么主从服务器两者的偏移量总是相同的
  • 相反,如果主从服务器两者的偏移量并不相同,那么说明主从服务器并未处于一致状态

复制积压缓冲区是一个由主服务器维护的固定长度,先进先出队列,默认大小为1MB。当主从断开连接,再次连接时,从服务器会通过PSYNC将自己的复制偏移量offset发送给主服务器:

  • 如果offset偏移量之后的数据存在于复制积压缓冲区,那么主服务器将对从服务器执行部分重同步操作。
  • 如果offset偏移量之后的数据已经不存在于复制积压缓冲区,那么主服务器会对从服务器执行完整重同步操作。

在命令传播阶段,从服务器默认会以每秒一次的频率,祥主服务器发送命令REPLICONF ACK <replication_offset>, 其中replication_offset是当前从服务器的复制偏移量, 这个心跳检测的作用如下:

  • 检测主从服务器的网络状态:如果主服务器超过一秒钟没收到从服务器发送的REPLICONF ACK命令,那么主服务器就知道主从服务器之间的连接出现问题了。
  • 辅助实现min-slaves选项:Redis的min-slaves-to-writemin-slaves-max-log两个选项可以防止主服务器在不安全的情况下执行写命令。
  • 检测命令丢失:如果因为网络故障,主服务器传播给从服务器的写命令半路丢失,那么从服务器发送的偏移量就会小于主服务器的偏移量,这时候主服务器会从复制积压缓冲区中重新把命令发送给从服务器。(2.8版本之前没有这个功能,所以会出现丢失的情况)

Sentinel

Sentinel(哨岗,哨兵)是Redsi的高可用性(high availability)解决方案:由一个或多个Sentinel实例(instance)组成的Sentinel系统可以监视任意多个主服务器,以及这些主服务器属下的所有从服务器,并在被监视的主服务器进入下线状态时,自动将下线的主服务器属下的某个从服务器升级为新的主服务器,然后由新的主服务器代替已下线的主服务器继续处理命令请求。另外Sentinel还会继续监视已下线的服务器,并在它重新上时,将它设置为新的主服务器的从服务器(降级)。
启动Sentinel可以使用命令: redis-sentinel /path/to/your/sentinel.confredis-server /path/to/your/sentinel.conf --sentinel, 启动时需要执行一下步骤:

  • 初始化服务器: Sentinel本质上是一个运行在特殊模式下的Redis服务器,启动初始换与原来有所不同。
  • 将普通Redis服务器使用的代码替换成Sentinel专用代码:初始换Sentinel可以执行的命令,替换之前的默认命令。
  • 初始化Sentinel状态:初始化sentinel.c/sentinelState结构,这个结构保存了服务器中所有Sentinel相关的状态。
  • 根据跟定的配置文件,初始化Sentinel的监视主服务器列表:Sentinel状态中的masters字典记录了所有被Sentinel监视的主服务器的相关信息,其中字典的键是被监视主服务器的名字;而字典的值则是被监视主服务器对应的sentinel.c/sentinelRedisInstance结构。
  • 创建连向主服务器的网络连接: 最后一步是创建连向被监视主服务器的网络连接,Sentinel将成为主服务器的客户端,可以向主服务器发送命令,并从命令回复中获取相关的信息。对于每个被Sentinel监视的主服务器来说,Sentinel会创建两个连向主服务器的异步网络连接:
    • 一个是命令连接,这个连接专门用于向主服务器发送命令,并接收命令回复。
    • 另一个是订阅连接,这个链接专门用于订阅主服务器的__sentinel__:hello频道。

为什么有两个连接?
在Redis目前的发布与订阅功能中,被发送的信息不回保存在Redis服务器里,如果发送信息时,接收信息的客户端不在线或者断线,那么这个客户端就会丢失这条信息。为了不丢失任何信息,必须专门用一个订阅连接来接收该频道的信息(原理?)。另外除了订阅频道,Sentinel还必须向主服务器发送命令,以此来与主服务器进行通信,所以Sentinel还必须向主服务器创建命令连接。

Sentinel网络拓扑

Sentinel与主服务器,从服务器及其他Sentinel之间都是彼此连接的:

  • 首先Sentinel默认每10秒一次向主服务器发送INFO命令,Sentinel可以得到主服务器信息以及主服务器的从服务器信息;
  • Sentinel会更新自己的主服务器和从服务器信息,还会创建连接到从服务器的命令连接和订阅连接。
  • Sentinel还会默认每2秒一次通过命令连接向所有被监视的主服务器和从服务器发送命令,这条命令会向服务器的__sentinel__:hello频道发送一条信息
  • 由于Sentinel订阅了主服务器和从服务器的消息,所以所有订阅的Sentinel都会收到上面的信息,接收消息的Sentinel就会感知到发消息的Sentinel存在,并记录到sentinels属性中(可以实现自动发现功能)

故障处理

检测主观下线状态

默认情况下Sentinel会以每秒一次的频率向所有与它创建了命令连接的实例(包括主服务器,从服务器,其他Sentinel等)发送PING命令, 并通过实例返回的PING命令回复判断是否在线。由于每个Sentinel设置的下线时间标准可能不一样,所以会出现不同的Sentinel认为服务器的状态不一致,所以这种情况称为主观下线状态。

检测客观下线状态

当Sentinel从其他Sentinel那里接收的足够数量的已下线判断之后,Sentinel就会认为将主服务器判定为客观下线状态,并对主服务器执行故障转移操作。

选举领头Sentinel

当主服务器被判断为客观下线时,监视这个下线主服务器的各个Sentinel会进行协商,选举出一个领头的Sentinel,并由领头Sentinel对下线服务器执行故障转移。
选举策略是每个检测到主服务器下线的Sentinel都向其他Sentinel发送想要成为领头的命令,收到命令的Sentinel会将发送命令的Sentinel设置为局部领头,如果一个Sentinel被半数以上的Sentinel设置为局部领头,它就胜出,否则会进行再次选举。

故障转移

选举出领头Sentinel后,领头Sentinel将对已下线的主服务器执行故障转移操作:

  • 在已下线服务器属下的所有从服务器里面,挑选出一个从服务器,并将其转换为主服务器: 选择优先级高,复制偏移量大的从服务器,使用命令SLAVE of one使其变为主服务器。
  • 让已下线主服务器属下的所有从服务器改为复制新的主服务器: 领头Sentinel向其他从服务器发送SLAVEOF命令。
  • 将已下线的主服务器设置为心的主服务器的从服务器,当这个旧的主服务器重新上线时,它就会成为新的主服务器的从服务器。

集群

Redis集群是Redis提供的分布式数据库方案,集群通过分片(sharding)来进行数据共享,并提供复制和故障转移功能。

节点与槽

Redis集群通常由多个节点(node)组成,开始每个节点都是图例的,它们都处于一个只包含自己的集群中,当要组建一个真正可工作的集群,我们必须将节点连接起来,构成一个包含多个节点的集群。使用CLUSTER MEET <ip> <port>命令来完成。另外Redis服务器启动时也可以根据cluster-enabled配置选项来判断是否开启集群模式。节点信息保存在cluster.h/clusterNode结构中,clusterNode结构保存了一个节点的当前状态,比如节点的创建时间,节点的名等;clusterNodelink属性是一个clusterLink结构,该结构保存了连接节点所需的有关信息,比如套接字描述符,输入缓冲区和输出缓冲区; 每个节点都保存着一个clusterState结构,这个结构记录了当前节点的视角下,集群目前所处的状态,例如机器是在线还是下线,集群包含多少节点等。
Redis集群通过分片的方式来保存数据库中的键值对:集群的整个数据库被分为16384(=2048*8)个槽(slot),数据库中的每个键都属于这16384个槽的其中一个,集群中的每个节点可以处理0个或最多16384个槽。当数据库中的16384个槽有节点在处理时,集群处于一个上线状态(ok);相反地,如果数据库中任何一个槽没有得到处理,那么集群处于下线状态(fail)。
槽指派信息记录在clusterNode.slots[16384/8]属性中, numslots记录了节点负责处理的槽的数量。Redis以0为起始索引,16383为终止索引,对slots数组中的16384个二进制位进行编号,并根据索引i上的二进制位来判断节点是否负责处理槽i:

  • 如果slots数组在索引i上的二进制位值为1,那么表示节点负责处理槽i。
  • 如果slots数组在索引i上的二进制位值为0, 那么表示节点不负责处理槽i。

节点会把自己处理的槽信息发送给其他集群中的其他节点,因此集群中的每个节点都会知道数据库中16384个槽分别被指派给了集群中哪些节点。
clusterState结构中的slots[16384]数组则更上面的正好反过来,它记录了每个槽是由哪个节点在管理的。之所以会有这两种结构是为了在查找节点管理了哪些槽和槽由哪个节点管理的复杂度都降低了。

集群中的执行命令

当客户端向节点发送与数据库键有关的命令时,接收命令的节点会计算出命令要处理的数据库键属于哪个槽(使用crc16(key)&16383算法得出槽位置),并检查这个槽是否指派给了自己(clusterState.slots[i]是否为自己):

  • 如果键所在的槽正好指派给了当前节点,那么节点直接执行这个命令。
  • 如果键所在的槽没有指派给了当前节点,那么节点回向客户端返回一个MOVED错误,指引客户端转向(redirect)至正确的节点,并再次发送之前想要执行的命令。

节点与单机服务器在数据库方面的区别是,节点只能使用0号数据库,而单机Redis服务器则没有这一限制。
节点还会使用clusterState结构中的slots_to_keys跳跃表来保存槽和键之间的关系,主要目的是方便节点对属于某个或某些槽的所有数据库键进行批量操作。

重新分片

Redis集群的重新分片操作可以将任意数量已经指派给某个节点(源节点)的槽改为指派给另一个节点(目的节点),并且相关槽所属的键值对也会从源节点移动到目的节点。这个过程可以在线进行,在重新分片过程中,集群不需要下线,并且源节点和目的节点都可以继续处理命令请求。
Redis的重新分片操作是由Redis的集群管理软件redis-trib负责执行的。迁移过程如下:
redis-trib
在执行第四步迁移的过程中,如果客户端向源节点发送一个与数据库键有关的命令,那么:

  • 源节点先在自己数据库里查找指定的键,如果找到就直接执行客户端发送的命令.
  • 如果没找到,那么这个键可能已经被迁移到了目标节点,源节点向客户端返回一个ASK错误,指引客户端转向正在导入槽的目的节点,并再次发送之前想要执行的命令。
    当客户端接收到ASK错误并转向正在执行导入槽节点时,客户端会先向节点发送一个ASKING命令,然后才重新发送想要执行的命令。ASKING命令会打开发送客户端的REDIS_ASKING标识。
    一般情况下如果客户端向节点发送一个关于槽i的命令,如果节点没有这个槽,那么就会返回MOVED,但是如果节点的clusterState.importing_slots_from[i]显示节点正在导入槽i,并且发送命令的客户端带有REDIS_ASKING(通过ASKING命令打开)标识,那么节点将执行这个关于槽i的命令一次

关于ASK错误与MOVED错误的区别:

  • MOVED错误代表槽的负责权已经从一个节点转移到了另一个节点,客户端收到关于槽i的MOVED错误后,每次遇到槽i请求是,都可以直接将命令发送至MOVED错误所指向的节点。
  • ASK错误只是两个节点在迁移槽的过程中使用的一种临时措施, 不会影响后面命令的发送。

复制与故障转移

Redis集群中的节点分为主节点(master)和从节点(slave),其中主节点用于处理槽,而从节点则用于复制某个主节点,并在被复制的主节点下线时,代替下线主节点继续处理命令请求。设置从节点的命令:CLUSTER REPLICATE <node_id>
集群中的每个节点都会定期地祥集群中其他节点发送PING消息,以此来检测对方是否在线,如果接收PING消息的节点没有在规定时间内,向发送PING消息的节点返回PONG消息,那么发送PING消息的节点就会将接收PING消息的节点标记位疑似下线(probable fail, PFAIL)。如果一个集群里,半数以上负责处理槽的主节点都将某个主节点X报告为疑似下线,那么这个主节点X将被标记为已下线(FAIL), 将主节点X标记为已下线的节点会向集群广播一条关于主节点X的FAIL消息,所有收到这条FAIL消息的节点都会立即将主节点X标记为下线。
当一个从节点发现自己正在复制的主节点进入了已下线状态,从节点将开始对下线主节点进行故障转移,下面是故障转移执行的步骤:

  1. 复制下线主节点的所有从节点里面,会有一个从节点被选中:选举过程和Sentinel差不多。
  2. 被选中的从节点会执行SLAVEOF no one命令,成为新的主节点
  3. 新的主节点会撤销所有对已下线主节点的槽指派,并将这些槽全部指派给自己
  4. 新的主节点向集群广播一条PONG消息,可以让集群中其他节点立即知道这个节点从从节点变为了主节点,并且这个主节点已经接管了原本由已下线主节点负责处理的槽。
  5. 新的主节点开始接收和自己负责处理的槽有关的命令请求,故障转移完成。

总结

前面主要讲了Redis在多机数据库下的功能特性,其中复制是实现数据备份,数据可靠性的保证。Sentinel实现高可用性的保证。在3.0版本之前的分布式方案都是自己实现的,然后利用Sentinel进行监控。后来Redis自己实现了集群方案,可以用其默认的集群方案来代替之前的自己实现方案。他们之间是相辅相成的,根据自己的需要进行选择。

参考

  1. Redis集群方案应该怎么做?
  2. 如何部署高可用的Redis集群架构

Redis设计与实现总结——单机数据库的实现

数据库

一个Redis Server可以有多个Redis数据库,这点类似于MySQL, 从Redis Server的源代码中可以看到,redisDb是Server数据库的指针,指向一个数据库组成的数组,而数据库的数量则由dbnum属性来表示。客户端可以通过SELECT命令选择当前要操作的数据库。

1
2
3
4
5
6
7
8
9
10
11
12
struct redisServer {

// ...

// 数据库数组指针
redisDb *db;

// 服务器的数据库数量
int dbnum;

// ...
}

数据库的定义在redis.h/redisDb中,定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
typedef struct redisDb {

// 数据库键空间,保存着数据库中的所有键值对
dict *dict; /* The keyspace for this DB */

// 键的过期时间,字典的键为键,字典的值为过期事件 UNIX 时间戳
dict *expires; /* Timeout of keys with a timeout set */

// 正处于阻塞状态的键
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */

// 可以解除阻塞的键
dict *ready_keys; /* Blocked keys that received a PUSH */

// 正在被 WATCH 命令监视的键
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */

struct evictionPoolEntry *eviction_pool; /* Eviction pool of keys */

// 数据库号码
int id; /* Database ID */

// 数据库的键的平均 TTL ,统计信息
long long avg_ttl; /* Average TTL, just for stats */

} redisDb;

  • dict: 是一个字典,保存了数据库中的所有键值对,我们将这个字典称为键空间(key space)。
  • expires: 也是一个字典,保存的是键值与这个键值过期时间的键值对。

一个简化的结构图如下:
db结构
设置生存时间和过期时间时,最终都是计算出最后生存时间,然后把这个值存入expires字典中。过期字典中找不到证明没有设置过期时间。过期删除策略Redis主要是使用惰性删除策略与定期删除两种策略。所谓惰性删除策略就是当用户获取键时,先判断其是否过期,如果过期则删除键,返回失败,如果没过期则正常返回。定期删除策略是Redis会周期行的从过期字典中随机出一部分键值,如果过期则删除键,否则保留。

持久化

RDB持久化

RDB(redis database)持久化既可以手动执行,也可以根据服务器配置选项定期执行,该功能可以将某个时间点上的数据库状态保存到一个RDB文件中(RDB文件默认的文件名为dump.rdb)。RDB持久化功能锁生成的RDB文件是一个经过压缩的二进制文件,通过该文件还可以还原生成RDB文件时的数据库状态。
有两个Redis命令可以用于生成RDB文件,一个是SAVE, 另一个是BGSAVESAVE会阻塞Redis服务进程,知道RDB文件创建完毕为止,在服务器进程阻塞期间,服务器不能处理任何请求。BGSAVE命令会派生出一个子进程,然后由子进程负责创建RDB文件,服务器进程(父进程)继续处理命令请求。
RDB文件是在服务器启动时自动执行的,只要Redis服务器启动时检测到RDB文件存在,它就会自动载入RDB文件。但是如果服务器开启了AOF持久化功能,就会优先使用AOF文件。因为AOF文件的更新频率通常比RDB文件高,所以数据是最新的可能性高。
用户可以通过save选项设置多个保存条件,但只要其中任意一条被满足,服务器就会执行BGSAVE命令。例如配置为下面三个:

1
2
3
save 900 1
save 300 10
save 60 10000

只要满足900s内至少一次修改,或300s内至少10次修改,或60s内10000次修改就会自动执行BGSAVE命令。
服务器维护一个dirty计数器,用于记录距离上次成功执行SAVEBGSAVE命令之后,服务器对数据库状态进行了多少次修改(包括写入,删除,更新等操作)。
服务器还维护一个lastsave属性,记录服务器上一次成功执行SAVEBGSAVE命令的时间。
RDB文件结构:

1
2
3
4
5
+-----+----------+---------+---+---------+
| | | | | |
|REDIS|db_version|databases|EOF|check_sum|
| | | | | |
+-----+----------+---------+---+---------+

  • REDIS: RDB文件开头是REDIS部分,这个部分长度为5字节,保存着”REDIS”五个字符。通过五个字符,快速检测是否为RDB文件。
  • db_version: 长度为4字节,它的值是一个字符串表示的整数,记录了RDB文件的版本号。
  • databases: 包含着零个或任意多个数据库,以及各个数据库中的键值对数据。
  • EOF: 长度为1字节,这个常量标志着RDB文件正文内容的结束,当读入程序遇到这个值的时候,它知道所有数据库的所有键值对都已经载入完毕。
  • check_sum: 8字节长的无符号整数,保存着一个校验和,这个校验和是程序通过对前面四部分的内容计算得出的。服务器载入RDB文件时,会将载入数据所计算出的校验和与check_sum所记录的检验和进行对比,以此来检查RDB文件是否出错或者有损坏的情况。
    可以使用od -c dump.rdbod -cx dump.rdb命令来对RDB文件内容进行分析。

AOF持久化

AOF(Append Only File)持久化功能是通过保存Redis服务器所执行的写命令来记录数据库状态的。AOF持久化功能的实现可以分为命令追加(append), 文件写入,文件同步(sync)三个步骤:

  • 命令追加: 服务器执行完一个写命令后,会以协议格式将被执行的写命令追加到服务器状态的aof_buf缓冲区的末尾。
  • AOF文件的写入与同步: 服务器的每次时间循环结束之前,都会调用flushAppendOnlyFile函数,考虑是否需要将aof_buf缓冲区中的内容写入和保存到AOF文件里。
    flushAppendOnlyFile函数的行为由服务器配置的appendfsync选项的值来决定:
appendfsync选项的值 flushAppendOnlyFile函数的行为 影响
always 将aof_buf缓冲区中的所有内容写入并同步到AOF文件 性能最低,但是安全性最高,发生故障停机最多丢失一个循环事件所产生的在缓冲区中的命令
everysec(默认值) 将aof_buf缓冲区中的所有内容写入到AOF文件,如果上次同步AOF文件的时间距离现在超过1s,那么再次对AOF文件进行同步,并且这个同步操作是由一个线程专门负责执行的 性能足够快,并且出现故障停机,最多丢失一秒钟的命令数据
no 将aof_buf缓冲区中的所有内容写入到AOF文件,但并不对AOF文件进行同步,何时同步由操作系统决定 性能最好,写入AOF速度最快,但是单次同步时间最长,出现故障丢失的命令最多

由于AOF文件记录了重建数据库所需的所有写命令,所以服务器只要读入并执行一遍AOF文件里么保持的写命令,就可以还原服务器关闭之前的状态。
由于AOF持久化是通过保存被执行的写命令来记录数据库状态的,随着时间的推移,写命令越来越多,这时候就需要AOF重写来减轻文件体积的膨胀。
AOF重写首先从数据库中读取键现在的值,然后用一条命令去记录键值对,代替之前记录的这个键值对的多条命令。但是在重写列表,哈希表,集合,有序集合等多个元素的键时,如果元素的数量超过了redis/REDIS_AOF_REWRITE_ITEMS_PER_CMD常量的值,会通过多条命令来记录键的值。
一个问题是在AOF重写期间,服务器还需要处理命令请求,而新的命令可能会对现有的数据库状态进行修改,从而使得服务器当前的数据库状态和重写后的AOF文件所保存的数据库状态不一致。为了解决这个问题,Redis服务器设置了一个AOF重写缓冲区,这个缓冲区在服务器创建子进程进行重写是开始使用,当Redis服务器执行完一个写命令后,它会同事将这个命令发送给AOF缓冲区和AOF重写缓冲区。当AOF重写工作完成后,向父进程发送信号,父进程就会将AOF重写缓冲区中的所有内容写到新的AOF文件中,对新的AOF文件进行改名,原子地 (atomic)覆盖现有的AOF文件,完成新旧两个AOF文件的替换。

事件

文件事件

文件事件(file event): Redis服务器通过套接字与客户端(或其他Redis服务器)进行连接,而文件事件就是服务器对套接字操作的抽象。服务器与客户端(或其他服务器)的通信会产生相应的文件事件,而服务器则通过监听并处理这些事件来完成一系列网络通信操作。
下图是Redis自己实现的文件事件处理器的四个组成部分:
db结构

  • 文件事件处理器使用I/O多路复用(multiplexing)程序来同时监听多个套接字,并根据套接字目前执行的任务来为套接字关联不同的事件处理器。
  • 当被监听的套接字准备好执行连接应答(accept),读取(read),写入(write),关闭(close)等操作时,与操作相对应的文件事件就会产生,这时文件事件处理器就会调用套接字之前关联好的事件处理器来处理这些事件。

虽然文件事件处理器以单线程方式运行,但通过使用I/O多路复用程序来监听多个套接字,文件事件处理器既实现了高性能的网络通信模型,又可以很好地与Redis服务器中其他同样以单线程方式运行的模块进行对接,着保持了Redis内部单线程设计的简单性。
尽管多个文件事件可能会并发地出现,但I/O多路复用程序总会将所有产生事件的套接字都放在一个队列里,然后通过这个队列,以有序(sequentially),同步(synchronously),每次一个套接字的方式向文件事件分派器传送套接字。
Redis的I/O多路复用程序的所有功能都是通过包装常见的select,epoll,evportkqueue这些I/O多路复用函数库来实现的,编译时会自动选择性能高最高的I/O多路复用函数库来作为Redis的I/O多路复用程序的底层实现。

时间事件

时间事件(time event): Redis服务器中的一些操作(如serverCron函数)需要在给定的时间点执行,而时间事件就是服务器对这类定时操作的抽象。
Redis的时间事件分为两类:

  • 定时事件: 让程序在指定的时间之后执行一次。
  • 周期性事件: 让一端程序每隔指定的时间就执行一次。

一个时间事件主要由以下三个属性:

  • id: 服务器为时间事件创建的全局唯一ID(标识号)。ID号按从小到大的顺序递增,新事件的ID号比旧事件的ID号要大。
  • when: 毫秒精度的UNIX时间戳,记录了时间事件的到达(arrive)时间。
  • timeProc: 时间事件处理器,一个函数。当时间事件到达时,服务器就会调用响应的处理器来处理事件。

一个时间事件是定时事件还是周期性事件取决于时间事件处理器的返回值:

  • 如果事件处理器返回ae.h/AE_NOMORE,那么这个事件为定时事件:该事件在达到一次之后就会被删除,之后不再到达。
  • 如果事件处理器返回一个非AE_NOMORE的整数值,那么这个事件为周期性时间:当一个时间事件到达之后,服务器会根据事件处理器返回的值,对时间事件的when属性进行更新,让这个事件在一段时间后再次到达,并以这种方式一直更新运行下去。

服务器将所有时间事件都放在一个无序链表中,每当时间事件执行器运行时,它就遍历整个链表,查找所有已到达的时间事件,并调用相应的事件处理器。

事件的调度与执行

事件的调度和执行由ae.c/aeProcessEvents函数负责,下面是这个函数的伪代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def aeProcessEvents():

// 获取到达时间离当前最接近的时间事件
time_event = aeSearchNearestTimer()

// 计算最接近的时间事件距离到达还有多少毫秒
remaind_ms = time_event.when - unix_ts_now()

// 如果事件已到达,那么remaind_ms的值就可能为负数,将它设定为0
if remaind_ms < 0:
remaind_ms = 0

// 根据remaind_ms的值,创建timeval结构
timeval = create_timeval_with_ms(remaind_ms)

// 阻塞并等待文件事件产生,最大阻塞时间由传入的timeval结构决定
// 如果remaind_ms的值为0,那么aeApiPoll调用之后马上返回,不阻塞
aeApiPoll(timeval)

// 处理所有已产生的文件事件
processFileEvents()

// 处理所有已到达的时间事件
processTimeEvents()

事件的调度和执行规则:

  1. aeApiPoll函数的最大阻塞时间由到达时间最接近的当前时间的时间事件决定,这个方法既可以避免服务器对时间事件并行频繁的轮询,可以确保aeApiPoll函数不会阻塞时间过长。
  2. 因为文件事件是随机出现的,如果处理完文件事件后时间事件仍未到达,继续等待并处理下一个文件事件。
  3. 对文件事件和时间事件的处理都是同步,有序,原子地执行的,服务器不会中途中断事件处理,也不会对事件进行抢占。因此耗时的事件会影响整个服务的性能。
  4. 因为时间事件是在文件事件之后执行,并且事件之间不会抢占,所以时间事件的实际处理时间通常回避时间事件设定的到达时间稍微晚一些。

客户端

通过使用I/O多路复用技术实现的文件事件处理器,Redis服务器使用单线程单进程的方式来处理命令请求,并与多个客户端进行网络通信。
关于redisClient的定义可以从redis.h中看到,客户端有很多属性。这些属性可以分为两类:

  • 比较通用的属性,这些属性很少特定功能相关,无论客户端执行的是什么工作,它都需要这些属性。
  • 和特定功能相关的属性。下重点介绍这些。

属性

  • fd(fake client): 伪客户端的fd属性的值为-1,伪客户端处理的命令请求来自于AOF文件或者lua脚本; 普通客户端fd属性值是大于-1的整数,使用套接字与服务器通信,所以fd用来记录客户端套接字的描述符。
  • name: 默认情况下一个连接到服务器的客户端是没有名字的,但是可以使用CLIENT setnaem命令设置一个名字,可以通过CLIENT list查看。
  • flags: 一部分标志记录了客户端的角色(如REDIS_MASTER代表主服务器, REDIS_SLAVE代表从服务器), 另一部分标志记录了客户端目前所处的状态(REDIS_MONITOR正在执行monitor, REDIS_MULTI标志客户端正在执行事务)。
  • querybuf: 用于保存客户端发送的命令请求。输入缓冲区的大小会根据输入内容动态调整,但是最大不能超过1GB,否则服务器将关闭这个客户端。
  • argvargc: 服务器将客户端发送的名保存到querybuf后,对命令内容进行分析,得出命令参数及命令的参数个数分别保存到argvargc中。
  • authenticated: 记录客户端是否通过了身份验证,未通过用0表示,通过用1表示。
  • ctime: 记录创建客户端的时间。
  • lastinteraction: 记录客户端与服务器最后一次进行互动的时间。
  • obuf_soft_limit_reached_time: 记录输出缓冲区第一次到达软性显示的时间。

执行命令所得的命令回复会被保存到客户端状态的输出缓冲区里,每个客户端都有两个输出缓冲区可用

  • bufbufpos: 固定的换缓冲区,用于保存那些长度比较小的回复,如:OK, 简短的字符串值,整数值或错误回复等。buf是缓冲区,bufpos记录buf数组目前已经使用的字节数量。
  • reply: 可变大小的缓冲区是一个链表,用于保存比较大的回复,比如一个非常长的字符串值,列表等。

创建与关闭

  • 创建不同客户端: 如果客户端是通过网络连接与服务器进行连接的普通客户端,那么在客户端connect函数连接到服务器时,服务器就会调用连接事件处理器为客户端创建响应的客户端状态,并将这个新的客户端状态添加到服务器状态结构clients链表的末尾。
  • 关闭客户端: 一个普通客户端被关闭的原因有很多:
    • 客户端进程退出或被杀死
    • 客户端向服务器发送了带有不符合协议格式的命令请求
    • 客户端成了CLIENT KILL命令的目标
    • 用户为服务器设置了timeout配置选项,客户端空转时间超过timeout选项设置的值
    • 客户端发送的命令请求大小超过了输入缓冲区的限制大小(1GB)
    • 发送给客户端的命令回复超过输出缓冲区的限制大小。按理说输出缓冲区是没有大小限制的,但是为了防止过多占用服务器资源,采用硬性限制和软性限制两种方案限制大小。
  • Lua脚本的伪客户端: 服务器在初始化时负责创建Lua脚本中包含的Redis命令的伪客户端,在服务器运行的整个周期中都会存在。
  • AOF文件的伪客户端: 服务器载入AOF文件时,会创建用于执行AOF文件包含的Redis命令的伪客户端,并在载入完成后关闭。

服务器

命令请求的执行过程

前面讲了,客户端发送的请求会被放到输入缓冲区,然后服务器对命令进行解析,转换成协议格式,服务器将通过调用命令执行器来完成余下的步骤:

  • 查找命令
    根据上面说的argv[0]参数中对应的命令在命令表中查找参数所指定的命令,并将找到的命令保存到客户端状态的cmd属性里。
    命令表是一个字典,字典的键是一个个命令名字,比如”set”,”get”,”del”等;而字典的值则是一个个redisCommand结构,每个redisCommand结构记录了一个Redis命令的实现信息。

redisCommand结构的主要属性:

属性名 类型 作用
name char * 命令的名字,比如”set”
proc redisCommandProc * 函数指针,指向命令的实现函数
arity int 命令参数的个数,用于检查命令请求的格式是否正确
sflags char * 字符串形式的标识值,这个值记录了命令的属性
例如:
w:表示写入命令
r:只读命令
m:可能会占用大量内存的命令
a:这是一个管理命令
flags int 对sflags标识进行分析得出的二进制标识,由程序自动生成
calls long long 服务器总共执行了多少次这个命令
milliseconds long long 服务器执行这个民两个所耗费的总时长
  • 执行预备操作
    到目前为止,服务器已经将执行命令所需的命令实现函数,参数等都收集齐了,真正执行命令之前还需要一些预备操作:

    • 检查客户端状态的cmd指针是否执行NULL
    • 检查命令请求所给定的参数个数是否正确
    • 检查客户端是否已经通过了身份验证
    • 如果服务器打开了maxmemory功能,需要检查服务器的内存占用情况,在有需要的时候进行内存回收
    • 其他检查和限制执行的操作等
  • 调用命令的实现函数
    当服务器决定要执行命令是client->cmd->proc(client);, 执行函数后会把回复保存到客户端的输出缓冲区,之后实现函数还会为客户端的套接字关联命令回复处理器,这个处理器负责将回复返回给客户端。

  • 执行后续工作
    在执行完实现函数后,服务器还需要执行一些后续工作:
    • 如果服务器开启了慢查询日志功能,那么慢查询日志模块会坚持是否需要为刚刚执行完的命令请求添加一条新的慢查询日志。
    • 根据刚刚执行命令所耗费的时长,更被执行命令redisCommand结构的milliseconds属性,并将calls计数器加一
    • 如果服务器开启了AOF持久化功能,那么AOF持久化模块会将刚刚执行的命令请求写入到AOF缓冲区里。
    • 如果有其他从服务器正在复制当前这个服务器,那么服务器会将刚刚执行的命令传播给所有从服务器
    • 根据刚刚执行命令所耗费的时长,更被执行命令redisCommand结构的milliseconds属性,并将calls计数器加一
    • 如果服务器开启了AOF持久化功能,那么AOF持久化模块会将刚刚执行的命令请求写入到AOF缓冲区里。
    • 如果有其他从服务器正在复制当前这个服务器,那么服务器会将刚刚执行的命令传播给所有从服务器。

回复发送完毕后,回复处理器会清空客户端状态的输出缓冲区,未处理下一个命令请求做好准备。当客户端接收到协议格式的命令回复后,它会将这些回复转换成人类可读的格式,并打印给用户观看。

serverCron函数

Redis服务器中的serverCron函数默认每隔100毫秒执行一次,这个函数负责管理服务器的资源,并保持服务器自身的良好运转。serverCron的函数主要功能如下面所列:

  • 更新服务器时间缓存: 为了减少获取服务器时间而进行系统调用的次数,服务器状态中的unixtimemstime属性被用作当前时间的缓存,serverCron函数默认每100ms的频率更新这两个字段。对于设置键值过期时间,慢查询日志这种需要高精度时间的功能来说,服务器还是会再次执行系统调用。
  • 更新LRU时钟: 服务器状态中的lruclock属性保存了服务器的LRU时钟;每个Redis对象都会有一个lru属性,保存了对象最后一次被访问的时间。这个值也是用serverCron来更新。
  • 更新服务器每秒执行命令次数: serverCron函数中的trackOperationsPerSecond函数会以每100ms一次的频率执行,这个函数的功能是以抽样计算的方式,估算并记录服务器在最近一秒钟处理的命令请求数量。可以通过INFO stats查看。
  • 更新服务器内存峰值记录:serverCron每次都会查看服务器当前使用的内存数量,并与stat_peak_memory保持的值进行比较,如果当前的数据比较大就更新这个值。INFO memory命令可以查看具体的数据。
  • 处理SIGTERM信号:服务器启动时,Redis会为服务器进程的SIGTERM信号关联处理器sigtermHandler函数,这个信号处理器负责在服务器接到SIGTERM信号时,打开服务器状态的shutdown_asap标识。如果不拦截这个信号,可能会造成比如RDB持久化操作时关闭服务器。
  • 管理客户端资源:serverCron函数每次执行都会调用clientsCron函数,clientsCron函数会对一定数量的客户端进行以下两个检查:
    • 如果客户端与服务器之间的连接已经超时,那么程序释放这个客户端。
    • 如果客户端在上一次执行命令请求后,输入缓冲区的大小超过了一定的长度,那么程序会释放客户端当前的输入缓冲区,并重新创建一个默认大小的输入缓冲区,从而防止客户端的输入缓冲区耗费了过多的内存。
  • 管理数据库资源: 每次调用databasesCron函数,对服务器中一部分数据库进行检查,删除其中的过期键,并在需要时,对字典进行收缩操作。
  • 执行被延迟的BGREWRITEAOF
  • 检查持久化操作的运行状态
  • 将AOF缓冲区的内容写入到AOF文件
  • 关闭异步客户端
  • 增加cronloops计数器的值:cronloops记录了serverCron函数执行的次数。

初始化服务器

一个Redis服务器从启动到能够接受客户端的命令请求,需要经过一系列的初始化和设置过程。过程如下:

  • 初始化服务器状态结构:包括设置服务器的运行ID,设置服务器的默认运行频率,设置服务器的默认配置文件路径,设置服务器默认端口号,设置服务器默认持久化条件等。
  • 载入配置选项: 可以通过给定配置函数或指定配置文件来修改服务器的默认配置。
  • 初始化服务器数据结构:包括初始化server.clients链表,初始化执Lua脚本的执行环境server.lua等;还进行了创建共享对象,打开服务器的监听端口等操作。
  • 还原数据库状态: 完成初始化后,服务器需要载入RDB文件或者AOF文件,并根据文件记录的内容来还原服务器的数据库状态。
  • 执行事件循环: 初始完成后,开始执行服务器的事件循环(loop)。