Skip to content

Commit

Permalink
pipelined dml throttle
Browse files Browse the repository at this point in the history
Signed-off-by: ekexium <[email protected]>
  • Loading branch information
ekexium committed Dec 6, 2024
1 parent ea83c82 commit d3adfc2
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 13 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/tikv/client-go/v2
go 1.23

require (
github.com/VividCortex/ewma v1.2.0
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da
github.com/docker/go-units v0.5.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
Expand Down
6 changes: 5 additions & 1 deletion tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ const (

defaultPipelinedFlushConcurrency = 128
defaultPipelinedResolveLockConcurrency = 8
defaultPipelinedWriteSpeedRatio = 1.0
)

func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, error) {
Expand Down Expand Up @@ -954,17 +955,20 @@ func WithDefaultPipelinedTxn() TxnOption {
Enable: true,
FlushConcurrency: defaultPipelinedFlushConcurrency,
ResolveLockConcurrency: defaultPipelinedResolveLockConcurrency,
WriteSpeedRatio: defaultPipelinedWriteSpeedRatio,
}
}
}

// WithPipelinedTxn creates pipelined txn with specified parameters
func WithPipelinedTxn(flushConcurrency, resolveLockConcurrency int) TxnOption {
func WithPipelinedTxn(flushConcurrency, resolveLockConcurrency int,
writeSpeedRatio float64) TxnOption {
return func(st *transaction.TxnOptions) {
st.PipelinedTxn = transaction.PipelinedTxnOptions{
Enable: true,
FlushConcurrency: flushConcurrency,
ResolveLockConcurrency: resolveLockConcurrency,
WriteSpeedRatio: writeSpeedRatio,
}
}
}
Expand Down
68 changes: 56 additions & 12 deletions txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"sync/atomic"
"time"

"github.com/VividCortex/ewma"
"github.com/dgryski/go-farm"
"github.com/docker/go-units"
"github.com/opentracing/opentracing-go"
Expand All @@ -74,6 +75,7 @@ import (
// MaxTxnTimeUse is the max time a Txn may use (in ms) from its begin to commit.
// We use it to abort the transaction to guarantee GC worker will not influence it.
const MaxTxnTimeUse = 24 * 60 * 60 * 1000
const defaultEWMAAge = 10

type tempLockBufferEntry struct {
HasReturnValue bool
Expand Down Expand Up @@ -113,6 +115,8 @@ type PipelinedTxnOptions struct {
Enable bool
FlushConcurrency int
ResolveLockConcurrency int
// (0,1], 1 = no sleep
WriteSpeedRatio float64
}

// TxnOptions indicates the option when beginning a transaction.
Expand Down Expand Up @@ -180,30 +184,35 @@ type KVTxn struct {
pipelinedCancel context.CancelFunc
pipelinedFlushConcurrency int
pipelinedResolveLockConcurrency int
writeSpeedRatio float64
// flushBatchDurationEWMA is read before each flush, and written after each flush => no race
flushBatchDurationEWMA ewma.MovingAverage
}

// NewTiKVTxn creates a new KVTxn.
func NewTiKVTxn(store kvstore, snapshot *txnsnapshot.KVSnapshot, startTS uint64, options *TxnOptions) (*KVTxn, error) {
cfg := config.GetGlobalConfig()
newTiKVTxn := &KVTxn{
snapshot: snapshot,
store: store,
startTS: startTS,
startTime: time.Now(),
valid: true,
vars: tikv.DefaultVars,
scope: options.TxnScope,
enableAsyncCommit: cfg.EnableAsyncCommit,
enable1PC: cfg.Enable1PC,
diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull,
RequestSource: snapshot.RequestSource,
snapshot: snapshot,
store: store,
startTS: startTS,
startTime: time.Now(),
valid: true,
vars: tikv.DefaultVars,
scope: options.TxnScope,
enableAsyncCommit: cfg.EnableAsyncCommit,
enable1PC: cfg.Enable1PC,
diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull,
RequestSource: snapshot.RequestSource,
flushBatchDurationEWMA: ewma.NewMovingAverage(defaultEWMAAge),
}
if !options.PipelinedTxn.Enable {
newTiKVTxn.us = unionstore.NewUnionStore(unionstore.NewMemDB(), snapshot)
return newTiKVTxn, nil
}
newTiKVTxn.pipelinedFlushConcurrency = options.PipelinedTxn.FlushConcurrency
newTiKVTxn.pipelinedResolveLockConcurrency = options.PipelinedTxn.ResolveLockConcurrency
newTiKVTxn.writeSpeedRatio = options.PipelinedTxn.WriteSpeedRatio
if err := newTiKVTxn.InitPipelinedMemDB(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -637,7 +646,15 @@ func (txn *KVTxn) InitPipelinedMemDB() error {
}
mutations.Push(op, false, mustExist, mustNotExist, flags.HasNeedConstraintCheckInPrewrite(), it.Handle())
}
return txn.committer.pipelinedFlushMutations(bo, mutations, generation)
txn.throttle()
flushStart := time.Now()
err = txn.committer.pipelinedFlushMutations(bo, mutations, generation)
if txn.flushBatchDurationEWMA.Value() == 0 {
txn.flushBatchDurationEWMA.Set(float64(time.Since(flushStart).Milliseconds()))
} else {
txn.flushBatchDurationEWMA.Add(float64(time.Since(flushStart).Milliseconds()))
}
return err
})
txn.committer.priority = txn.priority.ToPB()
txn.committer.syncLog = txn.syncLog
Expand All @@ -648,6 +665,33 @@ func (txn *KVTxn) InitPipelinedMemDB() error {
return nil
}

func (txn *KVTxn) throttle() {
if txn.writeSpeedRatio > 1 || txn.writeSpeedRatio <= 0 {
logutil.BgLogger().Error(
"[pipelined dml] invalid write speed ratio",
zap.Float64("writeSpeedRatio", txn.writeSpeedRatio),
zap.Uint64("session", txn.committer.sessionID),
zap.Uint64("startTS", txn.startTS),
)
return
}

expectedFlushMs := txn.flushBatchDurationEWMA.Value()
// T_sleep / (T_sleep + T_flush) = 1 - writeSpeedRatio
sleepMs := int((1.0 - txn.writeSpeedRatio) / txn.writeSpeedRatio * expectedFlushMs)
if sleepMs == 0 {
return
}
logutil.BgLogger().Info(
"[pipelined dml] throttle",
zap.Int("sleepMs", sleepMs),
zap.Float64("writeSpeedRatio", txn.writeSpeedRatio),
zap.Uint64("session", txn.committer.sessionID),
zap.Uint64("startTS", txn.startTS),
)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
}

// IsCasualConsistency returns if the transaction allows linearizability
// inconsistency.
func (txn *KVTxn) IsCasualConsistency() bool {
Expand Down

0 comments on commit d3adfc2

Please sign in to comment.