Cwww3's Blog

Record what you think

0%

gorm

GORM 源码浅析

连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// gorm 连接数据库
func Open(dialector Dialector, opts ...Option) (db *DB, err error) {
// 全局配置
config := &Config{}

...

// 加载配置
for _, opt := range opts {
if opt != nil {
if applyErr := opt.Apply(config); applyErr != nil {
return nil, applyErr
}
defer func(opt Option) {
if errr := opt.AfterInitialize(db); errr != nil {
err = errr
}
}(opt)
}
}

// 时间精度配置 默认毫秒
if d, ok := dialector.(interface{ Apply(*Config) error }); ok {
if err = d.Apply(config); err != nil {
return
}
}


// 注册query/create..等操作执行时所需要调用的函数
// gorm:query -> gorm:preload -> gorm:after_query
// 对不同版本的数据库做相应的约束配置
// 调用sql.Open()初始化连接池
db.callbacks = initializeCallbacks(db)
if config.Dialector != nil {
err = config.Dialector.Initialize(db)
}

...

// clone=1
db = &DB{Config: config, clone: 1}

// 在执行sql前后,相关的信息都会存在其中
db.Statement = &Statement{
DB: db,
ConnPool: db.ConnPool,
Context: context.Background(),
Clauses: map[string]clause.Clause{},
}

// 连接时,自动会进行ping操作
if err == nil && !config.DisableAutomaticPing {
if pinger, ok := db.ConnPool.(interface{ Ping() error }); ok {
err = pinger.Ping()
}
}

if err != nil {
config.Logger.Error(context.Background(), "failed to initialize database, got error %v", err)
}

return
}

Session

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// Session create new db session 可覆盖全局配置
func (db *DB) Session(config *Session) *DB {
var (
txConfig = *db.Config
tx = &DB{
Config: &txConfig,
Statement: db.Statement, // 直接赋值原db的statement
Error: db.Error,
clone: 1, // clone = 1
}
)

// 对全局配置进行覆盖
if config.CreateBatchSize > 0 {
tx.Config.CreateBatchSize = config.CreateBatchSize
}

if config.SkipDefaultTransaction {
tx.Config.SkipDefaultTransaction = true
}

if config.AllowGlobalUpdate {
txConfig.AllowGlobalUpdate = true
}

if config.FullSaveAssociations {
txConfig.FullSaveAssociations = true
}

// 如果条件true 会对原db的statement进行深拷贝(并发安全)
// 如果不进行clone,原db和新db使用的是同一个statement
// 在分别执行sql后,信息都会存于同一个statement中,相互影响
if config.Context != nil || config.PrepareStmt || config.SkipHooks {
tx.Statement = tx.Statement.clone()
tx.Statement.DB = tx
}

if config.Context != nil {
tx.Statement.Context = config.Context
}

...

if config.DisableNestedTransaction {
txConfig.DisableNestedTransaction = true
}

// newDB=true clone=1
// newDB=false clone=2
// clone值的影响在getInstance()中体现
if !config.NewDB {
tx.clone = 2
}

if config.DryRun {
tx.Config.DryRun = true
}

if config.QueryFields {
tx.Config.QueryFields = true
}

if config.Logger != nil {
tx.Config.Logger = config.Logger
}

if config.NowFunc != nil {
tx.Config.NowFunc = config.NowFunc
}

if config.Initialized {
tx = tx.getInstance()
}

return tx
}


getInstance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

// 在执行大部分语句之前都会调用该方法
// Model() Where() Order() Find() ...
func (db *DB) getInstance() *DB {
// 根据clone的值,做相应的处理

// 如果clone=1,会将statement清空,原先设置的条件(比如where order...)都会清除
// 如果clone=2, 会将statement进行深拷贝,原先的条件保留,但是新老db互不影响()
// 如果clone=0,直接返回,不做任何操作
// 做一些build(拼接)操作,比如执行Where(),Order()等操作时,直接使用原db即可,不需要对statement进行拷贝或清除

if db.clone > 0 {
tx := &DB{Config: db.Config, Error: db.Error}

if db.clone == 1 {
// clone with new statement
tx.Statement = &Statement{
DB: tx,
ConnPool: db.Statement.ConnPool,
Context: db.Statement.Context,
Clauses: map[string]clause.Clause{},
Vars: make([]interface{}, 0, 8),
}
} else {
// with clone statement
tx.Statement = db.Statement.clone()
tx.Statement.DB = tx
}

return tx
}

return db
}

Tx

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// Transaction start a transaction as a block, return error will rollback, otherwise to commit.
func (db *DB) Transaction(fc func(tx *DB) error, opts ...*sql.TxOptions) (err error) {
panicked := true

if committer, ok := db.Statement.ConnPool.(TxCommitter); ok && committer != nil {
// nested transaction
// 嵌套事务会进入该分支,原理是一个事务可以分为多个段,用户可以将事务回滚到指定的段,而不是整个事务
// https://dotnettutorials.net/lesson/savepoint-in-mysql/
if !db.DisableNestedTransaction {
err = db.SavePoint(fmt.Sprintf("sp%p", fc)).Error
if err != nil {
return
}

defer func() {
// Make sure to rollback when panic, Block error or Commit error
if panicked || err != nil {
db.RollbackTo(fmt.Sprintf("sp%p", fc))
}
}()
}
err = fc(db.Session(&Session{NewDB: db.clone == 1}))
} else {
// 非嵌套事务 开启事务
tx := db.Begin(opts...)
if tx.Error != nil {
return tx.Error
}

defer func() {
// Make sure to rollback when panic, Block error or Commit error
if panicked || err != nil {
// rollback 事务
tx.Rollback()
}
}()

if err = fc(tx); err == nil {
panicked = false
// commit 事务
return tx.Commit().Error
}
}

panicked = false
return
}

嵌套事务

1
2
3
4
5
6
db.Transaction(func(tx *gorm.DB) error {
tx.Transaction(func(tx2 *gorm.DB) error {
return nil
})
return nil
})
Donate comment here.