Skip to content

Commit

Permalink
change name to write_throttle_ratio
Browse files Browse the repository at this point in the history
Signed-off-by: ekexium <[email protected]>
  • Loading branch information
ekexium committed Dec 10, 2024
1 parent d3adfc2 commit 6b75b34
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 14 deletions.
11 changes: 11 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ var (
TiKVPipelinedFlushDuration prometheus.Histogram
TiKVValidateReadTSFromPDCount prometheus.Counter
TiKVLowResolutionTSOUpdateIntervalSecondsGauge prometheus.Gauge
TiKVPipelinedFlushThrottleSecondsHistogram prometheus.Histogram
)

// Label constants.
Expand Down Expand Up @@ -852,6 +853,15 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) {
Help: "The actual working update interval for the low resolution TSO. As there are adaptive mechanism internally, this value may differ from the config.",
})

TiKVPipelinedFlushThrottleSecondsHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "pipelined_flush_throttle_seconds",
Help: "Throttle durations of pipelined flushes.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 28), // 0.5ms ~ 18h
})

initShortcuts()
}

Expand Down Expand Up @@ -948,6 +958,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVPipelinedFlushDuration)
prometheus.MustRegister(TiKVValidateReadTSFromPDCount)
prometheus.MustRegister(TiKVLowResolutionTSOUpdateIntervalSecondsGauge)
prometheus.MustRegister(TiKVPipelinedFlushThrottleSecondsHistogram)
}

// readCounter reads the value of a prometheus.Counter.
Expand Down
13 changes: 8 additions & 5 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ const (

defaultPipelinedFlushConcurrency = 128
defaultPipelinedResolveLockConcurrency = 8
defaultPipelinedWriteSpeedRatio = 1.0
defaultPipelinedWriteThrottleRatio = 0.0
)

func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, error) {
Expand Down Expand Up @@ -955,20 +955,23 @@ func WithDefaultPipelinedTxn() TxnOption {
Enable: true,
FlushConcurrency: defaultPipelinedFlushConcurrency,
ResolveLockConcurrency: defaultPipelinedResolveLockConcurrency,
WriteSpeedRatio: defaultPipelinedWriteSpeedRatio,
WriteThrottleRatio: defaultPipelinedWriteThrottleRatio,
}
}
}

// WithPipelinedTxn creates pipelined txn with specified parameters
func WithPipelinedTxn(flushConcurrency, resolveLockConcurrency int,
writeSpeedRatio float64) TxnOption {
func WithPipelinedTxn(
flushConcurrency,
resolveLockConcurrency int,
writeThrottleRatio float64,
) TxnOption {
return func(st *transaction.TxnOptions) {
st.PipelinedTxn = transaction.PipelinedTxnOptions{
Enable: true,
FlushConcurrency: flushConcurrency,
ResolveLockConcurrency: resolveLockConcurrency,
WriteSpeedRatio: writeSpeedRatio,
WriteThrottleRatio: writeThrottleRatio,
}
}
}
Expand Down
19 changes: 10 additions & 9 deletions txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ type PipelinedTxnOptions struct {
Enable bool
FlushConcurrency int
ResolveLockConcurrency int
// (0,1], 1 = no sleep
WriteSpeedRatio float64
// [0,1), 0 = no sleep, 1 = no write
WriteThrottleRatio float64
}

// TxnOptions indicates the option when beginning a transaction.
Expand Down Expand Up @@ -184,7 +184,7 @@ type KVTxn struct {
pipelinedCancel context.CancelFunc
pipelinedFlushConcurrency int
pipelinedResolveLockConcurrency int
writeSpeedRatio float64
writeThrottleRatio float64
// flushBatchDurationEWMA is read before each flush, and written after each flush => no race
flushBatchDurationEWMA ewma.MovingAverage
}
Expand Down Expand Up @@ -212,7 +212,7 @@ func NewTiKVTxn(store kvstore, snapshot *txnsnapshot.KVSnapshot, startTS uint64,
}
newTiKVTxn.pipelinedFlushConcurrency = options.PipelinedTxn.FlushConcurrency
newTiKVTxn.pipelinedResolveLockConcurrency = options.PipelinedTxn.ResolveLockConcurrency
newTiKVTxn.writeSpeedRatio = options.PipelinedTxn.WriteSpeedRatio
newTiKVTxn.writeThrottleRatio = options.PipelinedTxn.WriteThrottleRatio
if err := newTiKVTxn.InitPipelinedMemDB(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -666,26 +666,27 @@ func (txn *KVTxn) InitPipelinedMemDB() error {
}

func (txn *KVTxn) throttle() {
if txn.writeSpeedRatio > 1 || txn.writeSpeedRatio <= 0 {
if txn.writeThrottleRatio >= 1 || txn.writeThrottleRatio < 0 {
logutil.BgLogger().Error(
"[pipelined dml] invalid write speed ratio",
zap.Float64("writeSpeedRatio", txn.writeSpeedRatio),
zap.Float64("writeThrottleRatio", txn.writeThrottleRatio),
zap.Uint64("session", txn.committer.sessionID),
zap.Uint64("startTS", txn.startTS),
)
return
}

expectedFlushMs := txn.flushBatchDurationEWMA.Value()
// T_sleep / (T_sleep + T_flush) = 1 - writeSpeedRatio
sleepMs := int((1.0 - txn.writeSpeedRatio) / txn.writeSpeedRatio * expectedFlushMs)
// T_sleep / (T_sleep + T_flush) = writeThrottleRatio
sleepMs := int(txn.writeThrottleRatio / (1.0 - txn.writeThrottleRatio) * expectedFlushMs)
metrics.TiKVPipelinedFlushThrottleSecondsHistogram.Observe(float64(sleepMs) / 1000)
if sleepMs == 0 {
return
}
logutil.BgLogger().Info(
"[pipelined dml] throttle",
zap.Int("sleepMs", sleepMs),
zap.Float64("writeSpeedRatio", txn.writeSpeedRatio),
zap.Float64("writeThrottleRatio", txn.writeThrottleRatio),
zap.Uint64("session", txn.committer.sessionID),
zap.Uint64("startTS", txn.startTS),
)
Expand Down

0 comments on commit 6b75b34

Please sign in to comment.