Skip to content

Commit

Permalink
[6.5]retry as stale reads and remove back off for server busy errors (#…
Browse files Browse the repository at this point in the history
…1511)

* retry as stale reads

Signed-off-by: rishabh_mittal <[email protected]>

* build failure

Signed-off-by: rishabh_mittal <[email protected]>

* fixed test cases

Signed-off-by: rishabh_mittal <[email protected]>

* allow stale read if leader is not even tried

Signed-off-by: rishabh_mittal <[email protected]>

* fixed test cases

Signed-off-by: rishabh_mittal <[email protected]>

* disable test

Signed-off-by: rishabh_mittal <[email protected]>

* fixed test cases

Signed-off-by: rishabh_mittal <[email protected]>

---------

Signed-off-by: rishabh_mittal <[email protected]>
  • Loading branch information
mittalrishabh authored Dec 19, 2024
1 parent 53cb1b6 commit 3a82e24
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 22 deletions.
43 changes: 32 additions & 11 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ type replica struct {
attempts int
// deadlineErrUsingConfTimeout indicates the replica is already tried, but the received deadline exceeded error.
deadlineErrUsingConfTimeout bool
serverIsBusy bool
}

func (r *replica) isEpochStale() bool {
Expand Down Expand Up @@ -625,7 +626,7 @@ func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelec
// a request. So, before the new leader is elected, we should not send requests
// to the unreachable old leader to avoid unnecessary timeout.
if liveness != reachable || leader.isExhausted(maxReplicaAttempt) {
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true}
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true, isStaleRead: false}
return nil, stateChanged{}
}
selector.targetIdx = state.leaderIdx
Expand All @@ -640,15 +641,15 @@ func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *rep
return
}
if liveness != reachable || selector.targetReplica().isExhausted(maxReplicaAttempt) {
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true}
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true, isStaleRead: false}
}
if liveness != reachable {
selector.invalidateReplicaStore(selector.targetReplica(), cause)
}
}

func (state *accessKnownLeader) onNoLeader(selector *replicaSelector) {
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true}
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true, isStaleRead: false}
}

// tryFollower is the state where we cannot access the known leader
Expand All @@ -665,6 +666,7 @@ type tryFollower struct {
labels []*metapb.StoreLabel
// fromAccessKnownLeader indicates whether the state is changed from `accessKnownLeader`.
fromAccessKnownLeader bool
isStaleRead bool
}

func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
Expand Down Expand Up @@ -724,8 +726,13 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (
replicaRead := true
rpcCtx.contextPatcher.replicaRead = &replicaRead
}
staleRead := false
rpcCtx.contextPatcher.staleRead = &staleRead
leader := selector.replicas[state.leaderIdx]
if leader.attempts == 0 || leader.deadlineErrUsingConfTimeout || leader.serverIsBusy {
rpcCtx.contextPatcher.staleRead = &state.isStaleRead
} else {
staleRead := false
rpcCtx.contextPatcher.staleRead = &staleRead
}
return rpcCtx, nil
}

Expand Down Expand Up @@ -934,9 +941,10 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
if (state.isStaleRead && !selector.StaleRead.PreventRetryFollower) ||
(!state.isStaleRead && leader.deadlineErrUsingConfTimeout) {
selector.state = &tryFollower{
leaderIdx: state.leaderIdx,
lastIdx: state.leaderIdx,
labels: state.option.labels,
leaderIdx: state.leaderIdx,
lastIdx: state.leaderIdx,
labels: state.option.labels,
isStaleRead: state.isStaleRead,
}
if leaderEpochStale {
selector.regionCache.scheduleReloadRegion(selector.region)
Expand Down Expand Up @@ -1215,7 +1223,7 @@ func (s *replicaSelector) onReadReqConfigurableTimeout(req *tikvrpc.Request) boo
}
if accessLeader, ok := s.state.(*accessKnownLeader); ok {
// If leader return deadline exceeded error, we should try to access follower next time.
s.state = &tryFollower{leaderIdx: accessLeader.leaderIdx, lastIdx: accessLeader.leaderIdx}
s.state = &tryFollower{leaderIdx: accessLeader.leaderIdx, lastIdx: accessLeader.leaderIdx, isStaleRead: false}
}
return true
default:
Expand All @@ -1224,6 +1232,16 @@ func (s *replicaSelector) onReadReqConfigurableTimeout(req *tikvrpc.Request) boo
}
}

func (s *replicaSelector) onServerIsBusy(req *tikvrpc.Request) {
switch req.Type {
case tikvrpc.CmdGet, tikvrpc.CmdBatchGet, tikvrpc.CmdScan,
tikvrpc.CmdCop, tikvrpc.CmdBatchCop, tikvrpc.CmdCopStream:
if target := s.targetReplica(); target != nil {
target.serverIsBusy = true
}
}
}

func (s *replicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState {
store := accessReplica.store
liveness := store.requestLiveness(bo, s.regionCache)
Expand Down Expand Up @@ -1306,8 +1324,9 @@ func (s *replicaSelector) canFallback2Follower() bool {
if !ok {
return false
}
if !state.isStaleRead {
return false
if state.isStaleRead {
// fallback to follower if it is stale reads
return true
}
// can fallback to follower only when the leader is exhausted.
return state.lastIdx == state.leaderIdx && state.IsLeaderExhausted(s.replicas[state.leaderIdx])
Expand Down Expand Up @@ -2127,6 +2146,8 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
if s.replicaSelector.onReadReqConfigurableTimeout(req) {
return true, nil
}
} else if s.replicaSelector != nil {
s.replicaSelector.onServerIsBusy(req)
}
logutil.Logger(bo.GetCtx()).Debug(
"tikv reports `ServerIsBusy` retry later",
Expand Down
3 changes: 0 additions & 3 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1291,9 +1291,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestDoNotTryUnreachableLeader() {
follower, _, _, _ := region.FollowerStorePeer(regionStore, 0, &storeSelectorOp{})

s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
if req.StaleRead && addr == follower.addr {
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{DataIsNotReady: &errorpb.DataIsNotReady{}}}}, nil
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{
Value: []byte(addr),
}}, nil
Expand Down
16 changes: 8 additions & 8 deletions internal/locate/region_request_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func TestRegionCacheStaleRead(t *testing.T) {
leaderRegionValid: true,
leaderAsyncReload: Some(true),
leaderSuccessReplica: []string{"z2", "z3"},
leaderSuccessReadType: SuccessFollowerRead,
leaderSuccessReadType: SuccessStaleRead,
followerRegionValid: true,
followerAsyncReload: None[bool](),
followerSuccessReplica: []string{"z2"},
Expand Down Expand Up @@ -379,7 +379,7 @@ func TestRegionCacheStaleRead(t *testing.T) {
leaderRegionValid: true,
leaderAsyncReload: Some(false),
leaderSuccessReplica: []string{"z2", "z3"},
leaderSuccessReadType: SuccessFollowerRead,
leaderSuccessReadType: SuccessStaleRead,
followerRegionValid: true,
followerAsyncReload: Some(false),
followerSuccessReplica: []string{"z2"},
Expand Down Expand Up @@ -443,11 +443,11 @@ func TestRegionCacheStaleRead(t *testing.T) {
leaderRegionValid: true,
leaderAsyncReload: Some(false),
leaderSuccessReplica: []string{"z3"},
leaderSuccessReadType: SuccessFollowerRead,
leaderSuccessReadType: SuccessStaleRead,
followerRegionValid: true,
followerAsyncReload: Some(false),
followerSuccessReplica: []string{"z3"},
followerSuccessReadType: SuccessFollowerRead,
followerSuccessReadType: SuccessStaleRead,
},
{
do: leaderServerIsBusy,
Expand All @@ -456,11 +456,11 @@ func TestRegionCacheStaleRead(t *testing.T) {
leaderRegionValid: true,
leaderAsyncReload: Some(false),
leaderSuccessReplica: []string{"z2", "z3"},
leaderSuccessReadType: SuccessFollowerRead,
leaderSuccessReadType: SuccessStaleRead,
followerRegionValid: true,
followerAsyncReload: Some(false),
followerSuccessReplica: []string{"z2", "z3"},
followerSuccessReadType: SuccessFollowerRead,
followerSuccessReadType: SuccessStaleRead,
},
{
do: leaderServerIsBusy,
Expand All @@ -469,11 +469,11 @@ func TestRegionCacheStaleRead(t *testing.T) {
leaderRegionValid: true,
leaderAsyncReload: Some(false),
leaderSuccessReplica: []string{"z3"},
leaderSuccessReadType: SuccessFollowerRead,
leaderSuccessReadType: SuccessStaleRead,
followerRegionValid: true,
followerAsyncReload: Some(false),
followerSuccessReplica: []string{"z3"},
followerSuccessReadType: SuccessFollowerRead,
followerSuccessReadType: SuccessStaleRead,
},
{
do: leaderDown,
Expand Down

0 comments on commit 3a82e24

Please sign in to comment.