@@ -16,16 +16,13 @@ package e2e
1616
1717import (
1818 "context"
19- "encoding/json"
2019 "fmt"
21- "strings"
2220 "testing"
2321 "time"
2422
2523 "github.com/coreos/go-semver/semver"
2624 "github.com/stretchr/testify/assert"
2725 "github.com/stretchr/testify/require"
28- "go.uber.org/zap"
2926
3027 "go.etcd.io/etcd/api/v3/version"
3128 "go.etcd.io/etcd/client/pkg/v3/fileutil"
@@ -38,7 +35,6 @@ import (
3835 "go.etcd.io/etcd/server/v3/storage/datadir"
3936 "go.etcd.io/etcd/tests/v3/framework/config"
4037 "go.etcd.io/etcd/tests/v3/framework/e2e"
41- "go.etcd.io/etcd/tests/v3/framework/testutils"
4238)
4339
4440func TestDowngradeUpgradeClusterOf1 (t * testing.T ) {
@@ -78,15 +74,14 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool) {
7874
7975 lastClusterVersion := semver .New (lastVersionStr )
8076 lastClusterVersion .Patch = 0
81- lastClusterVersionStr := lastClusterVersion .String ()
8277
8378 e2e .BeforeTest (t )
8479
8580 t .Logf ("Create cluster with version %s" , currentVersionStr )
8681 var snapshotCount uint64 = 10
8782 epc := newCluster (t , clusterSize , snapshotCount )
8883 for i := 0 ; i < len (epc .Procs ); i ++ {
89- validateVersion (t , epc .Cfg , epc .Procs [i ], version.Versions {
84+ e2e . ValidateVersion (t , epc .Cfg , epc .Procs [i ], version.Versions {
9085 Cluster : currentVersionStr ,
9186 Server : version .Version ,
9287 Storage : currentVersionStr ,
@@ -112,35 +107,10 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool) {
112107 require .NoError (t , err )
113108 beforeMembers , beforeKV := getMembersAndKeys (t , cc )
114109
115- t .Logf ("etcdctl downgrade enable %s" , lastVersionStr )
116- downgradeEnable (t , epc , lastVersion )
117-
118- t .Log ("Downgrade enabled, validating if cluster is ready for downgrade" )
119- for i := 0 ; i < len (epc .Procs ); i ++ {
120- validateVersion (t , epc .Cfg , epc .Procs [i ], version.Versions {
121- Cluster : lastClusterVersionStr ,
122- Server : version .Version ,
123- Storage : lastClusterVersionStr ,
124- })
125- e2e .AssertProcessLogs (t , epc .Procs [i ], "The server is ready to downgrade" )
126- }
127-
128- t .Log ("Cluster is ready for downgrade" )
110+ e2e .DowngradeEnable (t , epc , lastVersion )
129111 t .Logf ("Starting downgrade process to %q" , lastVersionStr )
130- for i := 0 ; i < len (epc .Procs ); i ++ {
131- t .Logf ("Downgrading member %d by running %s binary" , i , lastReleaseBinary )
132- stopEtcd (t , epc .Procs [i ])
133- startEtcd (t , epc .Procs [i ], lastReleaseBinary )
134- }
135-
136- t .Log ("All members downgraded, validating downgrade" )
112+ e2e .DowngradeUpgradeMembers (t , nil , epc , len (epc .Procs ), currentVersion , lastClusterVersion )
137113 e2e .AssertProcessLogs (t , leader (t , epc ), "the cluster has been downgraded" )
138- for i := 0 ; i < len (epc .Procs ); i ++ {
139- validateVersion (t , epc .Cfg , epc .Procs [i ], version.Versions {
140- Cluster : lastClusterVersionStr ,
141- Server : lastVersionStr ,
142- })
143- }
144114
145115 t .Log ("Downgrade complete" )
146116 afterMembers , afterKV := getMembersAndKeys (t , cc )
@@ -165,23 +135,7 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool) {
165135 beforeMembers , beforeKV = getMembersAndKeys (t , cc )
166136
167137 t .Logf ("Starting upgrade process to %q" , currentVersionStr )
168- for i := 0 ; i < len (epc .Procs ); i ++ {
169- t .Logf ("Upgrading member %d" , i )
170- stopEtcd (t , epc .Procs [i ])
171- startEtcd (t , epc .Procs [i ], currentEtcdBinary )
172- // NOTE: The leader has monitor to the cluster version, which will
173- // update cluster version. We don't need to check the transient
174- // version just in case that it might be flaky.
175- }
176-
177- t .Log ("All members upgraded, validating upgrade" )
178- for i := 0 ; i < len (epc .Procs ); i ++ {
179- validateVersion (t , epc .Cfg , epc .Procs [i ], version.Versions {
180- Cluster : currentVersionStr ,
181- Server : version .Version ,
182- Storage : currentVersionStr ,
183- })
184- }
138+ e2e .DowngradeUpgradeMembers (t , nil , epc , len (epc .Procs ), lastClusterVersion , currentVersion )
185139 t .Log ("Upgrade complete" )
186140
187141 afterMembers , afterKV = getMembersAndKeys (t , cc )
@@ -206,48 +160,6 @@ func newCluster(t *testing.T, clusterSize int, snapshotCount uint64) *e2e.EtcdPr
206160 return epc
207161}
208162
209- func startEtcd (t * testing.T , ep e2e.EtcdProcess , execPath string ) {
210- ep .Config ().ExecPath = execPath
211- err := ep .Restart (context .TODO ())
212- if err != nil {
213- t .Fatalf ("could not start etcd process cluster (%v)" , err )
214- }
215- }
216-
217- func downgradeEnable (t * testing.T , epc * e2e.EtcdProcessCluster , ver * semver.Version ) {
218- c := epc .Etcdctl ()
219- testutils .ExecuteWithTimeout (t , 20 * time .Second , func () {
220- err := c .DowngradeEnable (context .TODO (), ver .String ())
221- require .NoError (t , err )
222- })
223- }
224-
225- func stopEtcd (t * testing.T , ep e2e.EtcdProcess ) {
226- err := ep .Stop ()
227- require .NoError (t , err )
228- }
229-
230- func validateVersion (t * testing.T , cfg * e2e.EtcdProcessClusterConfig , member e2e.EtcdProcess , expect version.Versions ) {
231- testutils .ExecuteWithTimeout (t , 30 * time .Second , func () {
232- for {
233- result , err := getMemberVersionByCurl (cfg , member )
234- if err != nil {
235- cfg .Logger .Warn ("failed to get member version and retrying" , zap .Error (err ), zap .String ("member" , member .Config ().Name ))
236- time .Sleep (time .Second )
237- continue
238- }
239- cfg .Logger .Info ("Comparing versions" , zap .String ("member" , member .Config ().Name ), zap .Any ("got" , result ), zap .Any ("want" , expect ))
240- if err := compareMemberVersion (expect , result ); err != nil {
241- cfg .Logger .Warn ("Versions didn't match retrying" , zap .Error (err ), zap .String ("member" , member .Config ().Name ))
242- time .Sleep (time .Second )
243- continue
244- }
245- cfg .Logger .Info ("Versions match" , zap .String ("member" , member .Config ().Name ))
246- break
247- }
248- })
249- }
250-
251163func leader (t * testing.T , epc * e2e.EtcdProcessCluster ) e2e.EtcdProcess {
252164 ctx , cancel := context .WithTimeout (context .Background (), time .Second * 10 )
253165 defer cancel ()
@@ -269,36 +181,6 @@ func leader(t *testing.T, epc *e2e.EtcdProcessCluster) e2e.EtcdProcess {
269181 return nil
270182}
271183
272- func compareMemberVersion (expect version.Versions , target version.Versions ) error {
273- if expect .Server != "" && expect .Server != target .Server {
274- return fmt .Errorf ("expect etcdserver version %v, but got %v" , expect .Server , target .Server )
275- }
276-
277- if expect .Cluster != "" && expect .Cluster != target .Cluster {
278- return fmt .Errorf ("expect etcdcluster version %v, but got %v" , expect .Cluster , target .Cluster )
279- }
280-
281- if expect .Storage != "" && expect .Storage != target .Storage {
282- return fmt .Errorf ("expect storage version %v, but got %v" , expect .Storage , target .Storage )
283- }
284- return nil
285- }
286-
287- func getMemberVersionByCurl (cfg * e2e.EtcdProcessClusterConfig , member e2e.EtcdProcess ) (version.Versions , error ) {
288- args := e2e .CURLPrefixArgsCluster (cfg , member , "GET" , e2e.CURLReq {Endpoint : "/version" })
289- lines , err := e2e .RunUtilCompletion (args , nil )
290- if err != nil {
291- return version.Versions {}, err
292- }
293-
294- data := strings .Join (lines , "\n " )
295- result := version.Versions {}
296- if err := json .Unmarshal ([]byte (data ), & result ); err != nil {
297- return version.Versions {}, fmt .Errorf ("failed to unmarshal (%v): %w" , data , err )
298- }
299- return result , nil
300- }
301-
302184func generateSnapshot (t * testing.T , snapshotCount uint64 , cc * e2e.EtcdctlV3 ) {
303185 ctx , cancel := context .WithCancel (context .Background ())
304186 defer cancel ()
0 commit comments