Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow setting concurrency for pipelined flush and resolveLocks #1494

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/gcworker/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module gcworker

go 1.23
go 1.23.2

require github.com/tikv/client-go/v2 v2.0.0

Expand Down
2 changes: 1 addition & 1 deletion examples/rawkv/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module rawkv

go 1.23
go 1.23.2

require github.com/tikv/client-go/v2 v2.0.0

Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/1pc_txn/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module 1pc_txn

go 1.23
go 1.23.2

require github.com/tikv/client-go/v2 v2.0.0

Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/async_commit/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module async_commit

go 1.23
go 1.23.2

require github.com/tikv/client-go/v2 v2.0.0

Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/delete_range/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module delete_range

go 1.23
go 1.23.2

require github.com/tikv/client-go/v2 v2.0.0

Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module txnkv

go 1.23
go 1.23.2

require github.com/tikv/client-go/v2 v2.0.0

Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/pessimistic_txn/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module pessimistic_txn

go 1.23
go 1.23.2

require github.com/tikv/client-go/v2 v2.0.0

Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/unsafedestoryrange/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module unsafedestoryrange

go 1.23
go 1.23.2

require github.com/tikv/client-go/v2 v2.0.0

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/tikv/client-go/v2
go 1.23

require (
github.com/VividCortex/ewma v1.2.0
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da
github.com/docker/go-units v0.5.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module integration_tests

go 1.23
go 1.23.2

require (
github.com/ninedraft/israce v0.0.3
Expand Down
30 changes: 15 additions & 15 deletions integration_tests/pipelined_memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (s *testPipelinedMemDBSuite) mustGetLock(key []byte) *txnkv.Lock {

func (s *testPipelinedMemDBSuite) TestPipelinedAndFlush() {
ctx := context.Background()
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
txn1, err := s.store.Begin()
s.Nil(err)
Expand Down Expand Up @@ -139,7 +139,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedAndFlush() {

func (s *testPipelinedMemDBSuite) TestPipelinedMemDBBufferGet() {
ctx := context.Background()
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
for i := 0; i < 100; i++ {
key := []byte(strconv.Itoa(i))
Expand All @@ -161,7 +161,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedMemDBBufferGet() {
}

func (s *testPipelinedMemDBSuite) TestPipelinedFlushBlock() {
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
txn.Set([]byte("key1"), []byte("value1"))

Expand Down Expand Up @@ -192,7 +192,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedFlushBlock() {
}

func (s *testPipelinedMemDBSuite) TestPipelinedSkipFlushedLock() {
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
txn.Set([]byte("key1"), []byte("value1"))
flushed, err := txn.GetMemBuffer().Flush(true)
Expand All @@ -206,7 +206,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedSkipFlushedLock() {
s.Nil(txn.Commit(context.Background()))

// can see it after commit
txn, err = s.store.Begin(tikv.WithPipelinedMemDB())
txn, err = s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
defer txn.Rollback()
val, err := txn.Get(context.Background(), []byte("key1"))
Expand All @@ -222,7 +222,7 @@ func (s *testPipelinedMemDBSuite) TestResolveLockRace() {
failpoint.Disable("tikvclient/pipelinedCommitFail")
}()
for i := 0; i < 100; i++ {
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
startTS := txn.StartTS()
s.Nil(err)
for j := 0; j < 100; j++ {
Expand All @@ -246,7 +246,7 @@ func (s *testPipelinedMemDBSuite) TestResolveLockRace() {
}

func (s *testPipelinedMemDBSuite) TestPipelinedCommit() {
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
for i := 0; i < 100; i++ {
key := []byte(strconv.Itoa(i))
Expand Down Expand Up @@ -279,7 +279,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedCommit() {
}

// check the result
txn, err = s.store.Begin(tikv.WithPipelinedMemDB())
txn, err = s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
for i := 0; i < 100; i++ {
key := []byte(strconv.Itoa(i))
Expand All @@ -290,7 +290,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedCommit() {
}

func (s *testPipelinedMemDBSuite) TestPipelinedRollback() {
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
startTS := txn.StartTS()
s.Nil(err)
keys := make([][]byte, 0, 100)
Expand All @@ -304,7 +304,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedRollback() {
s.Nil(txn.GetMemBuffer().FlushWait())
s.Nil(txn.Rollback())
s.Eventuallyf(func() bool {
txn, err := s.store.Begin(tikv.WithStartTS(startTS), tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithStartTS(startTS), tikv.WithDefaultPipelinedTxn())
s.Nil(err)
defer func() { s.Nil(txn.Rollback()) }()
storageBufferedValues, err := txn.GetSnapshot().BatchGetWithTier(context.Background(), keys, txnsnapshot.BatchGetBufferTier)
Expand All @@ -322,7 +322,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedRollback() {
func (s *testPipelinedMemDBSuite) TestPipelinedPrefetch() {
failpoint.Enable("tikvclient/beforeSendReqToRegion", "return")
defer failpoint.Disable("tikvclient/beforeSendReqToRegion")
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)

mustFlush := func(txn *transaction.KVTxn) {
Expand Down Expand Up @@ -384,7 +384,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedPrefetch() {

s.Nil(txn.Commit(context.Background()))

txn, err = s.store.Begin(tikv.WithPipelinedMemDB())
txn, err = s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
txn.Set([]byte("100"), []byte("100")) // snapshot: [0, 1, ..., 99], membuffer: [100]
m, err := txn.BatchGet(context.Background(), [][]byte{[]byte("99"), []byte("100"), []byte("101")})
Expand Down Expand Up @@ -417,7 +417,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedPrefetch() {
txn.Rollback()

// empty memdb should also cache the not exist result.
txn, err = s.store.Begin(tikv.WithPipelinedMemDB())
txn, err = s.store.Begin(tikv.WithDefaultPipelinedTxn())
// batch get cache: [99 -> not exist]
m, err = txn.BatchGet(context.Background(), [][]byte{[]byte("99")})
s.Nil(err)
Expand All @@ -433,7 +433,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedDMLFailedByPKRollback() {
atomic.StoreUint64(&transaction.ManagedLockTTL, 100) // set to 100ms
defer atomic.StoreUint64(&transaction.ManagedLockTTL, originManageTTLVal)

txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
txn.Set([]byte("key1"), []byte("value1"))
txnProbe := transaction.TxnProbe{KVTxn: txn}
Expand Down Expand Up @@ -482,7 +482,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedDMLFailedByPKMaxTTLExceeded() {
restoreGlobalConfFunc()
}()

txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
txn.Set([]byte("key1"), []byte("value1"))
txnProbe := transaction.TxnProbe{KVTxn: txn}
Expand Down
11 changes: 11 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ var (
TiKVPipelinedFlushDuration prometheus.Histogram
TiKVValidateReadTSFromPDCount prometheus.Counter
TiKVLowResolutionTSOUpdateIntervalSecondsGauge prometheus.Gauge
TiKVPipelinedFlushThrottleSecondsHistogram prometheus.Histogram
)

// Label constants.
Expand Down Expand Up @@ -852,6 +853,15 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) {
Help: "The actual working update interval for the low resolution TSO. As there are adaptive mechanism internally, this value may differ from the config.",
})

TiKVPipelinedFlushThrottleSecondsHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "pipelined_flush_throttle_seconds",
Help: "Throttle durations of pipelined flushes.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 28), // 0.5ms ~ 18h
})

initShortcuts()
}

Expand Down Expand Up @@ -948,6 +958,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVPipelinedFlushDuration)
prometheus.MustRegister(TiKVValidateReadTSFromPDCount)
prometheus.MustRegister(TiKVLowResolutionTSOUpdateIntervalSecondsGauge)
prometheus.MustRegister(TiKVPipelinedFlushThrottleSecondsHistogram)
}

// readCounter reads the value of a prometheus.Counter.
Expand Down
31 changes: 28 additions & 3 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ const (
// DCLabelKey indicates the key of label which represents the dc for Store.
DCLabelKey = "zone"
safeTSUpdateInterval = time.Second * 2

defaultPipelinedFlushConcurrency = 128
defaultPipelinedResolveLockConcurrency = 8
defaultPipelinedWriteThrottleRatio = 0.0
)

func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, error) {
Expand Down Expand Up @@ -944,10 +948,31 @@ func WithStartTS(startTS uint64) TxnOption {
}
}

// WithPipelinedMemDB creates transaction with pipelined memdb.
func WithPipelinedMemDB() TxnOption {
// WithDefaultPipelinedTxn creates pipelined txn with default parameters
func WithDefaultPipelinedTxn() TxnOption {
return func(st *transaction.TxnOptions) {
st.PipelinedMemDB = true
st.PipelinedTxn = transaction.PipelinedTxnOptions{
Enable: true,
FlushConcurrency: defaultPipelinedFlushConcurrency,
ResolveLockConcurrency: defaultPipelinedResolveLockConcurrency,
WriteThrottleRatio: defaultPipelinedWriteThrottleRatio,
}
}
}

// WithPipelinedTxn creates pipelined txn with specified parameters
func WithPipelinedTxn(
flushConcurrency,
resolveLockConcurrency int,
writeThrottleRatio float64,
) TxnOption {
return func(st *transaction.TxnOptions) {
st.PipelinedTxn = transaction.PipelinedTxnOptions{
Enable: true,
FlushConcurrency: flushConcurrency,
ResolveLockConcurrency: resolveLockConcurrency,
WriteThrottleRatio: writeThrottleRatio,
}
}
}

Expand Down
22 changes: 17 additions & 5 deletions txnkv/transaction/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1102,16 +1102,28 @@ func (c *twoPhaseCommitter) doActionOnBatches(
}
return nil
}
rateLim := len(batches)
rateLim := c.calcActionConcurrency(len(batches), action)
batchExecutor := newBatchExecutor(rateLim, c, action, bo)
return batchExecutor.process(batches)
}

func (c *twoPhaseCommitter) calcActionConcurrency(
numBatches int, action twoPhaseCommitAction,
) int {
rateLim := numBatches
// Set rateLim here for the large transaction.
// If the rate limit is too high, tikv will report service is busy.
// If the rate limit is too low, we can't full utilize the tikv's throughput.
// TODO: Find a self-adaptive way to control the rate limit here.
if rateLim > config.GetGlobalConfig().CommitterConcurrency {
rateLim = config.GetGlobalConfig().CommitterConcurrency
switch action.(type) {
case actionPipelinedFlush:
rateLim = min(rateLim, max(1, c.txn.pipelinedFlushConcurrency))
default:
if rateLim > config.GetGlobalConfig().CommitterConcurrency {
rateLim = config.GetGlobalConfig().CommitterConcurrency
}
}
batchExecutor := newBatchExecutor(rateLim, c, action, bo)
return batchExecutor.process(batches)
return rateLim
}

func (c *twoPhaseCommitter) keyValueSize(key, value []byte) int {
Expand Down
3 changes: 1 addition & 2 deletions txnkv/transaction/pipelined_flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,6 @@ func (c *twoPhaseCommitter) buildPipelinedResolveHandler(commit bool, resolved *
// resolveFlushedLocks resolves all locks in the given range [start, end) with the given status.
// The resolve process is running in another goroutine so this function won't block.
func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end []byte, commit bool) {
const RESOLVE_CONCURRENCY = 8
var resolved atomic.Uint64
handler, err := c.buildPipelinedResolveHandler(commit, &resolved)
commitTs := uint64(0)
Expand All @@ -481,7 +480,7 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end
fmt.Sprintf("pipelined-dml-%s", status),
fmt.Sprintf("pipelined-dml-%s-%d", status, c.startTS),
c.store,
RESOLVE_CONCURRENCY,
c.txn.pipelinedResolveLockConcurrency,
handler,
)
runner.SetStatLogInterval(30 * time.Second)
Expand Down
Loading