Cwww3's Blog

Record what you think

0%

go-sql

Sql源码浅析

Connect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
func Open(driverName, dataSourceName string) (*DB, error) {
// 查找已注册的驱动 由_ "github.com/go-sql-driver/mysql"提供
// 驱动是真正和数据库打交道的
driversMu.RLock()
driveri, ok := drivers[driverName]
driversMu.RUnlock()
if !ok {
return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
}



if driverCtx, ok := driveri.(driver.DriverContext); ok {
// 获取连接器
connector, err := driverCtx.OpenConnector(dataSourceName)
if err != nil {
return nil, err
}
return OpenDB(connector), nil
}

return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
}


// OpenDB may just validate its arguments without creating a connection
// to the database. To verify that the data source name is valid, call
// Ping.
func OpenDB(c driver.Connector) *DB {
// 初始化数据,为后续维护连接池做准备
ctx, cancel := context.WithCancel(context.Background())
db := &DB{
connector: c,
openerCh: make(chan struct{}, connectionRequestQueueSize),
lastPut: make(map[*driverConn]string),
connRequests: make(map[uint64]chan connRequest),
stop: cancel,
}

go db.connectionOpener(ctx)

return db
}

// Runs in a separate goroutine, opens new connections when requested.
func (db *DB) connectionOpener(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-db.openerCh:
// 收到创建新的连接的请求
db.openNewConnection(ctx)
}
}
}

Tx

Begin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
func (db *DB) Begin() (*Tx, error) {
return db.BeginTx(context.Background(), nil)
}


func (db *DB) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) {
var tx *Tx
var err error
var isBadConn bool
// 重试策略
for i := 0; i < maxBadConnRetries; i++ {
// cachedOrNewConn 获取连接池的连接,如果没有,
// 在连接没有到达设置的最大值时,就创建一个新的连接, 否则只能等待。
// inUse=true
tx, err = db.begin(ctx, opts, cachedOrNewConn)
isBadConn = errors.Is(err, driver.ErrBadConn)
if !isBadConn {
break
}
}
if isBadConn {
return db.begin(ctx, opts, alwaysNewConn)
}
return tx, err
}

func (db *DB) begin(ctx context.Context, opts *TxOptions, strategy connReuseStrategy) (tx *Tx, err error) {
dc, err := db.conn(ctx, strategy)
if err != nil {
return nil, err
}
// releaseConn释放连接时调用,放入连接池 inUse=false
return db.beginDC(ctx, dc, dc.releaseConn, opts)
}

func (db *DB) begin(ctx context.Context, opts *TxOptions, strategy connReuseStrategy) (tx *Tx, err error) {
// 根据策略获取到了dc
dc, err := db.conn(ctx, strategy)
if err != nil {
return nil, err
}
// 传入dc
return db.beginDC(ctx, dc, dc.releaseConn, opts)
}


// beginDC starts a transaction. The provided dc must be valid and ready to use.
func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error), opts *TxOptions) (tx *Tx, err error) {
var txi driver.Tx
keepConnOnRollback := false
withLock(dc, func() {
_, hasSessionResetter := dc.ci.(driver.SessionResetter)
_, hasConnectionValidator := dc.ci.(driver.Validator)
keepConnOnRollback = hasSessionResetter && hasConnectionValidator
// dc.ci就是驱动与数据库建立连接的原始结构,通过它开启事务,返回事务的原始结构
txi, err = ctxDriverBegin(ctx, opts, dc.ci)
})
if err != nil {
release(err)
return nil, err
}

// Schedule the transaction to rollback when the context is canceled.
// The cancel function in Tx will be called after done is set to true.
// 生成一个cancel Contxt,这个context伴随着这个事务,它的生命周期和事务一样长
ctx, cancel := context.WithCancel(ctx)
tx = &Tx{
db: db,
dc: dc,
releaseConn: release,
txi: txi,
cancel: cancel,
keepConnOnRollback: keepConnOnRollback,
ctx: ctx,
}

// 等待事务结束
go tx.awaitDone()
return tx, nil
}


// awaitDone blocks until the context in Tx is canceled and rolls back
// the transaction if it's not already done.
func (tx *Tx) awaitDone() {
// Wait for either the transaction to be committed or rolled
// back, or for the associated context to be closed.
// 等待context的结束
// 在执行tx.Commit()或tx.RollBack()时,会调用cancel()
<-tx.ctx.Done()

discardConnection := !tx.keepConnOnRollback
// 如果不是由Commit或RollBack调用的cancel(),导致ctx结束,那么回滚会在这执行
tx.rollback(discardConnection)
}

Commit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// Commit commits the transaction.
func (tx *Tx) Commit() error {
// Check context first to avoid transaction leak.
// If put it behind tx.done CompareAndSwap statement, we can't ensure
// the consistency between tx.done and the real COMMIT operation.
select {
default:
// 如果没有调用rollback以及ctx没有结束,那么会走到这
case <-tx.ctx.Done():
if atomic.LoadInt32(&tx.done) == 1 {
return ErrTxDone
}
return tx.ctx.Err()
}

// 原子操作,0->1,rollback时也会做这个操作,确保只执行一次
if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
return ErrTxDone
}

// Cancel the Tx to release any active R-closemu locks.
// This is safe to do because tx.done has already transitioned
// from 0 to 1. Hold the W-closemu lock prior to rollback
// to ensure no other connection has an active query.
tx.cancel() // 执行cancel方法,结束开始事务时开启的goroutine
tx.closemu.Lock()
tx.closemu.Unlock()

var err error
// 通过驱动建立的连接执行commit方法
withLock(tx.dc, func() {
err = tx.txi.Commit()
})
if !errors.Is(err, driver.ErrBadConn) {
tx.closePrepared()
}
// 释放连接 inUse=false
tx.close(err)
return err
}

Rollback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// Rollback aborts the transaction.
func (tx *Tx) Rollback() error {
return tx.rollback(false)
}


// rollback aborts the transaction and optionally forces the pool to discard
// the connection.
func (tx *Tx) rollback(discardConn bool) error {
// 原子操作 确保只执行一次
if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
return ErrTxDone
}

if rollbackHook != nil {
rollbackHook()
}

// Cancel the Tx to release any active R-closemu locks.
// This is safe to do because tx.done has already transitioned
// from 0 to 1. Hold the W-closemu lock prior to rollback
// to ensure no other connection has an active query.
tx.cancel()
tx.closemu.Lock()
tx.closemu.Unlock()

var err error
// 通过驱动建立的连接执行rollback方法
withLock(tx.dc, func() {
err = tx.txi.Rollback()
})
if !errors.Is(err, driver.ErrBadConn) {
tx.closePrepared()
}
if discardConn {
err = driver.ErrBadConn
}
// 释放连接
tx.close(err)
return err
}

总结

事务是基于连接的,连接只能被一方使用,在释放之前,无法得到。

Donate comment here.