From 0dcdcff09155eabb411a2bb7931ab885f2d3374c Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Mon, 9 Dec 2024 19:46:12 +0000 Subject: [PATCH 1/2] Add MemberDowngrade failpoint Signed-off-by: Siyuan Zhang --- tests/framework/e2e/curl.go | 7 ++ tests/robustness/failpoint/cluster.go | 116 ++++++++++++++++++++++++ tests/robustness/failpoint/failpoint.go | 1 + tests/robustness/report/wal.go | 2 + 4 files changed, 126 insertions(+) diff --git a/tests/framework/e2e/curl.go b/tests/framework/e2e/curl.go index a6a9ee35fc4..d0546622afd 100644 --- a/tests/framework/e2e/curl.go +++ b/tests/framework/e2e/curl.go @@ -128,3 +128,10 @@ func CURLGet(clus *EtcdProcessCluster, req CURLReq) error { return SpawnWithExpectsContext(ctx, CURLPrefixArgsCluster(clus.Cfg, clus.Procs[rand.Intn(clus.Cfg.ClusterSize)], "GET", req), nil, req.Expected) } + +func CURLGetFromMember(clus *EtcdProcessCluster, member EtcdProcess, req CURLReq) error { + ctx, cancel := context.WithTimeout(context.Background(), req.timeoutDuration()) + defer cancel() + + return SpawnWithExpectsContext(ctx, CURLPrefixArgsCluster(clus.Cfg, member, "GET", req), nil, req.Expected) +} diff --git a/tests/robustness/failpoint/cluster.go b/tests/robustness/failpoint/cluster.go index cde85ce03a0..201bf48a451 100644 --- a/tests/robustness/failpoint/cluster.go +++ b/tests/robustness/failpoint/cluster.go @@ -23,10 +23,13 @@ import ( "testing" "time" + "github.com/coreos/go-semver/semver" "github.com/stretchr/testify/require" "go.uber.org/zap" + pb "go.etcd.io/etcd/api/v3/etcdserverpb" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/pkg/v3/expect" "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/robustness/identity" @@ -35,6 +38,7 @@ import ( ) var MemberReplace Failpoint = memberReplace{} +var MemberDowngrade Failpoint = memberDowngrade{} type memberReplace struct{} @@ -138,6 +142,92 @@ func (f memberReplace) Available(config e2e.EtcdProcessClusterConfig, member e2e return config.ClusterSize > 1 && (config.Version == e2e.QuorumLastVersion || member.Config().ExecPath == e2e.BinPath.Etcd) } +type memberDowngrade struct{} + +func (f memberDowngrade) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { + v, err := e2e.GetVersionFromBinary(e2e.BinPath.Etcd) + if err != nil { + return nil, err + } + targetVersion := semver.Version{Major: v.Major, Minor: v.Minor - 1} + numberOfMembersToDowngrade := rand.Int()%len(clus.Procs) + 1 + membersToDowngrade := rand.Perm(len(clus.Procs))[:numberOfMembersToDowngrade] + lg.Info("Test downgrading members", zap.Any("members", membersToDowngrade)) + + member := clus.Procs[0] + endpoints := []string{member.EndpointsGRPC()[0]} + cc, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + Logger: zap.NewNop(), + DialKeepAliveTime: 10 * time.Second, + DialKeepAliveTimeout: 100 * time.Millisecond, + }) + if err != nil { + return nil, err + } + defer cc.Close() + + // Need to wait health interval for cluster to accept changes + time.Sleep(etcdserver.HealthInterval) + lg.Info("Enable downgrade") + err = enableDowngrade(ctx, cc, &targetVersion) + if err != nil { + return nil, err + } + // Need to wait health interval for cluster to prepare for downgrade + time.Sleep(etcdserver.HealthInterval) + + for _, memberID := range membersToDowngrade { + member = clus.Procs[memberID] + lg.Info("Downgrading member", zap.String("member", member.Config().Name)) + for member.IsRunning() { + err = member.Kill() + if err != nil { + lg.Info("Sending kill signal failed", zap.Error(err)) + } + err = member.Wait(ctx) + if err != nil && !strings.Contains(err.Error(), "unexpected exit code") { + lg.Info("Failed to kill the process", zap.Error(err)) + return nil, fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err) + } + } + if lazyfs := member.LazyFS(); lazyfs != nil { + lg.Info("Removing data that was not fsynced") + err := lazyfs.ClearCache(ctx) + if err != nil { + return nil, err + } + } + member.Config().ExecPath = e2e.BinPath.EtcdLastRelease + err = patchArgs(member.Config().Args, "initial-cluster-state", "existing") + if err != nil { + return nil, err + } + lg.Info("Restarting member", zap.String("member", member.Config().Name)) + err = member.Start(ctx) + if err != nil { + return nil, err + } + err = verifyVersion(t, clus, member, targetVersion) + } + time.Sleep(etcdserver.HealthInterval) + return nil, err +} + +func (f memberDowngrade) Name() string { + return "MemberDowngrade" +} + +func (f memberDowngrade) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, profile traffic.Profile) bool { + v, err := e2e.GetVersionFromBinary(e2e.BinPath.Etcd) + if err != nil { + panic("Failed checking etcd version binary") + } + v3_6 := semver.Version{Major: 3, Minor: 6} + // only current version cluster can be downgraded. + return config.ClusterSize > 1 && v.Compare(v3_6) >= 0 && (config.Version == e2e.CurrentVersion && member.Config().ExecPath == e2e.BinPath.Etcd) +} + func getID(ctx context.Context, cc *clientv3.Client, name string) (id uint64, found bool, err error) { // Ensure linearized MemberList by first making a linearized Get request from the same member. // This is required for v3.4 support as it doesn't support linearized MemberList https://github.com/etcd-io/etcd/issues/18929 @@ -170,3 +260,29 @@ func patchArgs(args []string, flag, newValue string) error { } return fmt.Errorf("--%s flag not found", flag) } + +func enableDowngrade(ctx context.Context, cc *clientv3.Client, targetVersion *semver.Version) error { + _, err := cc.Maintenance.Downgrade(ctx, clientv3.DowngradeAction(pb.DowngradeRequest_VALIDATE), targetVersion.String()) + if err != nil { + return err + } + _, err = cc.Maintenance.Downgrade(ctx, clientv3.DowngradeAction(pb.DowngradeRequest_ENABLE), targetVersion.String()) + return err +} + +func verifyVersion(t *testing.T, clus *e2e.EtcdProcessCluster, member e2e.EtcdProcess, expectedVersion semver.Version) error { + var err error + expected := fmt.Sprintf(`"etcdserver":"%d.%d\..*"etcdcluster":"%d\.%d\.`, expectedVersion.Major, expectedVersion.Minor, expectedVersion.Major, expectedVersion.Minor) + for i := 0; i < 35; i++ { + if err = e2e.CURLGetFromMember(clus, member, e2e.CURLReq{Endpoint: "/version", Expected: expect.ExpectedResponse{Value: expected, IsRegularExpr: true}}); err != nil { + t.Logf("#%d: v3 is not ready yet (%v)", i, err) + time.Sleep(200 * time.Millisecond) + continue + } + break + } + if err != nil { + return fmt.Errorf("failed to verify version, expected %v got (%v)", expected, err) + } + return nil +} diff --git a/tests/robustness/failpoint/failpoint.go b/tests/robustness/failpoint/failpoint.go index bca909a0c11..17c0d11b8e7 100644 --- a/tests/robustness/failpoint/failpoint.go +++ b/tests/robustness/failpoint/failpoint.go @@ -46,6 +46,7 @@ var allFailpoints = []Failpoint{ RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, BlackholeUntilSnapshot, BeforeApplyOneConfChangeSleep, MemberReplace, + MemberDowngrade, DropPeerNetwork, RaftBeforeSaveSleep, RaftAfterSaveSleep, diff --git a/tests/robustness/report/wal.go b/tests/robustness/report/wal.go index e152be5450b..5efa8b9b23b 100644 --- a/tests/robustness/report/wal.go +++ b/tests/robustness/report/wal.go @@ -183,6 +183,8 @@ func parseEntryNormal(ent raftpb.Entry) (*model.EtcdRequest, error) { return nil, nil case raftReq.ClusterVersionSet != nil: return nil, nil + case raftReq.DowngradeInfoSet != nil: + return nil, nil case raftReq.Compaction != nil: request := model.EtcdRequest{ Type: model.Compact, From 68f5e3f5d8c3d008581ef50293e0fbd30635e094 Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Wed, 18 Dec 2024 23:30:19 +0000 Subject: [PATCH 2/2] local test for MemberDowngrade failpoint Signed-off-by: Siyuan Zhang --- tests/framework/e2e/cluster.go | 1 + tests/robustness/failpoint/cluster.go | 24 ++++++++------- tests/robustness/failpoint/failpoint.go | 30 +++++++++--------- tests/robustness/scenarios/scenarios.go | 41 +++++++++++++------------ 4 files changed, 50 insertions(+), 46 deletions(-) diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 4aff11b9d6f..64880385b40 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -564,6 +564,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in "--initial-cluster-token=" + cfg.ServerConfig.InitialClusterToken, "--data-dir", dataDirPath, "--snapshot-count=" + fmt.Sprintf("%d", cfg.ServerConfig.SnapshotCount), + "--max-wals=1000", "--max-snapshots=1000", } var clientHTTPURL string if cfg.ClientHTTPSeparate { diff --git a/tests/robustness/failpoint/cluster.go b/tests/robustness/failpoint/cluster.go index 201bf48a451..bcb2f4fc835 100644 --- a/tests/robustness/failpoint/cluster.go +++ b/tests/robustness/failpoint/cluster.go @@ -181,23 +181,23 @@ func (f memberDowngrade) Inject(ctx context.Context, t *testing.T, lg *zap.Logge member = clus.Procs[memberID] lg.Info("Downgrading member", zap.String("member", member.Config().Name)) for member.IsRunning() { - err = member.Kill() + err = member.Stop() if err != nil { - lg.Info("Sending kill signal failed", zap.Error(err)) + lg.Info("Stopping server failed", zap.Error(err)) } err = member.Wait(ctx) if err != nil && !strings.Contains(err.Error(), "unexpected exit code") { - lg.Info("Failed to kill the process", zap.Error(err)) - return nil, fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err) - } - } - if lazyfs := member.LazyFS(); lazyfs != nil { - lg.Info("Removing data that was not fsynced") - err := lazyfs.ClearCache(ctx) - if err != nil { - return nil, err + lg.Info("Failed to stop the process", zap.Error(err)) + return nil, fmt.Errorf("failed to stop the process within %s, err: %w", triggerTimeout, err) } } + // if lazyfs := member.LazyFS(); lazyfs != nil { + // lg.Info("Removing data that was not fsynced") + // err := lazyfs.ClearCache(ctx) + // if err != nil { + // return nil, err + // } + // } member.Config().ExecPath = e2e.BinPath.EtcdLastRelease err = patchArgs(member.Config().Args, "initial-cluster-state", "existing") if err != nil { @@ -208,9 +208,11 @@ func (f memberDowngrade) Inject(ctx context.Context, t *testing.T, lg *zap.Logge if err != nil { return nil, err } + time.Sleep(etcdserver.HealthInterval) err = verifyVersion(t, clus, member, targetVersion) } time.Sleep(etcdserver.HealthInterval) + lg.Info("Finished downgrading members", zap.Any("members", membersToDowngrade)) return nil, err } diff --git a/tests/robustness/failpoint/failpoint.go b/tests/robustness/failpoint/failpoint.go index 17c0d11b8e7..5cdf25c81aa 100644 --- a/tests/robustness/failpoint/failpoint.go +++ b/tests/robustness/failpoint/failpoint.go @@ -36,22 +36,22 @@ const ( ) var allFailpoints = []Failpoint{ - KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic, RaftAfterSavePanic, - DefragBeforeCopyPanic, DefragBeforeRenamePanic, BackendBeforePreCommitHookPanic, BackendAfterPreCommitHookPanic, - BackendBeforeStartDBTxnPanic, BackendAfterStartDBTxnPanic, BackendBeforeWritebackBufPanic, - BackendAfterWritebackBufPanic, CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic, - CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic, CompactBeforeCommitBatchPanic, - CompactAfterCommitBatchPanic, RaftBeforeLeaderSendPanic, BlackholePeerNetwork, DelayPeerNetwork, - RaftBeforeFollowerSendPanic, RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic, - RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, BlackholeUntilSnapshot, - BeforeApplyOneConfChangeSleep, - MemberReplace, + // KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic, RaftAfterSavePanic, + // DefragBeforeCopyPanic, DefragBeforeRenamePanic, BackendBeforePreCommitHookPanic, BackendAfterPreCommitHookPanic, + // BackendBeforeStartDBTxnPanic, BackendAfterStartDBTxnPanic, BackendBeforeWritebackBufPanic, + // BackendAfterWritebackBufPanic, CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic, + // CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic, CompactBeforeCommitBatchPanic, + // CompactAfterCommitBatchPanic, RaftBeforeLeaderSendPanic, BlackholePeerNetwork, DelayPeerNetwork, + // RaftBeforeFollowerSendPanic, RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic, + // RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, BlackholeUntilSnapshot, + // BeforeApplyOneConfChangeSleep, + // MemberReplace, MemberDowngrade, - DropPeerNetwork, - RaftBeforeSaveSleep, - RaftAfterSaveSleep, - ApplyBeforeOpenSnapshot, - SleepBeforeSendWatchResponse, + // DropPeerNetwork, + // RaftBeforeSaveSleep, + // RaftAfterSaveSleep, + // ApplyBeforeOpenSnapshot, + // SleepBeforeSendWatchResponse, } func PickRandom(clus *e2e.EtcdProcessCluster, profile traffic.Profile) (Failpoint, error) { diff --git a/tests/robustness/scenarios/scenarios.go b/tests/robustness/scenarios/scenarios.go index afad5879fee..99bb5fc08c5 100644 --- a/tests/robustness/scenarios/scenarios.go +++ b/tests/robustness/scenarios/scenarios.go @@ -81,22 +81,23 @@ func Exploratory(_ *testing.T) []TestScenario { // 60% with all members of current version {Choice: options.ClusterOptions{options.WithVersion(e2e.CurrentVersion)}, Weight: 60}, // 10% with 2 members of current version, 1 member last version, leader is current version - {Choice: options.ClusterOptions{options.WithVersion(e2e.MinorityLastVersion), options.WithInitialLeaderIndex(0)}, Weight: 10}, - // 10% with 2 members of current version, 1 member last version, leader is last version - {Choice: options.ClusterOptions{options.WithVersion(e2e.MinorityLastVersion), options.WithInitialLeaderIndex(2)}, Weight: 10}, - // 10% with 2 members of last version, 1 member current version, leader is last version - {Choice: options.ClusterOptions{options.WithVersion(e2e.QuorumLastVersion), options.WithInitialLeaderIndex(0)}, Weight: 10}, - // 10% with 2 members of last version, 1 member current version, leader is current version - {Choice: options.ClusterOptions{options.WithVersion(e2e.QuorumLastVersion), options.WithInitialLeaderIndex(2)}, Weight: 10}, + // {Choice: options.ClusterOptions{options.WithVersion(e2e.MinorityLastVersion), options.WithInitialLeaderIndex(0)}, Weight: 10}, + // // 10% with 2 members of current version, 1 member last version, leader is last version + // {Choice: options.ClusterOptions{options.WithVersion(e2e.MinorityLastVersion), options.WithInitialLeaderIndex(2)}, Weight: 10}, + // // 10% with 2 members of last version, 1 member current version, leader is last version + // {Choice: options.ClusterOptions{options.WithVersion(e2e.QuorumLastVersion), options.WithInitialLeaderIndex(0)}, Weight: 10}, + // // 10% with 2 members of last version, 1 member current version, leader is current version + // {Choice: options.ClusterOptions{options.WithVersion(e2e.QuorumLastVersion), options.WithInitialLeaderIndex(2)}, Weight: 10}, } mixedVersionOption := options.WithClusterOptionGroups(random.PickRandom[options.ClusterOptions](mixedVersionOptionChoices)) baseOptions := []e2e.EPClusterOption{ - options.WithSnapshotCount(50, 100, 1000), + options.WithSnapshotCount(100000), options.WithSubsetOptions(randomizableOptions...), e2e.WithGoFailEnabled(true), + e2e.WithKeepDataDir(true), // Set low minimal compaction batch limit to allow for triggering multi batch compaction failpoints. - options.WithCompactionBatchLimit(10, 100, 1000), + options.WithCompactionBatchLimit(100000), e2e.WithWatchProcessNotifyInterval(100 * time.Millisecond), } @@ -104,17 +105,17 @@ func Exploratory(_ *testing.T) []TestScenario { baseOptions = append(baseOptions, e2e.WithSnapshotCatchUpEntries(100)) } scenarios := []TestScenario{} - for _, tp := range trafficProfiles { - name := filepath.Join(tp.Name, "ClusterOfSize1") - clusterOfSize1Options := baseOptions - clusterOfSize1Options = append(clusterOfSize1Options, e2e.WithClusterSize(1)) - scenarios = append(scenarios, TestScenario{ - Name: name, - Traffic: tp.Traffic, - Profile: tp.Profile, - Cluster: *e2e.NewConfig(clusterOfSize1Options...), - }) - } + // for _, tp := range trafficProfiles { + // name := filepath.Join(tp.Name, "ClusterOfSize1") + // clusterOfSize1Options := baseOptions + // clusterOfSize1Options = append(clusterOfSize1Options, e2e.WithClusterSize(1)) + // scenarios = append(scenarios, TestScenario{ + // Name: name, + // Traffic: tp.Traffic, + // Profile: tp.Profile, + // Cluster: *e2e.NewConfig(clusterOfSize1Options...), + // }) + // } for _, tp := range trafficProfiles { name := filepath.Join(tp.Name, "ClusterOfSize3")