Skip to content

Commit e02c8dc

Browse files
committed
Avoid racing in closing Etcd.errc channel
Signed-off-by: Joshua Zhang <[email protected]>
1 parent 76d836f commit e02c8dc

File tree

1 file changed

+38
-5
lines changed

1 file changed

+38
-5
lines changed

server/embed/etcd.go

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,18 @@ type Etcd struct {
8484
errc chan error
8585

8686
closeOnce sync.Once
87-
wg sync.WaitGroup
87+
88+
// Etcd.errHandler uses a fan-in pattern to send errors to Etcd.errc.
89+
// To prevent panics, ensure all writes to Etcd.errc complete before closing it.
90+
// sync.WaitGroup (Etcd.errcWg) is used to track inflight writes to Etcd.errc.
91+
// However, the usage pattern of Etcd.errHandler can lead to race conditions when
92+
// increasing the counter happens after Etcd.errcWg.Wait() is unblocked.
93+
// To address this, WaitGroup updates are gated by by Etcd.closingErrc flag within
94+
// Etcd.errHandler using a critical section. Etcd.closingErrc is set to true
95+
// before Etcd.errcWg.Wait(), preventing further WaitGroup updates.
96+
errcWg sync.WaitGroup
97+
errcMu sync.RWMutex
98+
closingErrc bool
8899
}
89100

90101
type peerListener struct {
@@ -255,6 +266,15 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
255266

256267
// buffer channel so goroutines on closed connections won't wait forever
257268
e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
269+
e.closingErrc = false
270+
// The WaitGroup must be initialized with a value of 1 to avoid a race condition.
271+
// This race condition can occur if the first e.errcWg.Add() call in a goroutine happens
272+
// after the main function calls e.errcWg.Wait(). By initializing the WaitGroup to 1,
273+
// we guarantee that the main function will wait for at least one operation to complete
274+
// before continuing.
275+
276+
// See https://pkg.go.dev/sync#WaitGroup.Add for more details on WaitGroup.
277+
e.errcWg.Add(1)
258278

259279
// newly started member ("memberInitialized==false")
260280
// does not need corruption check
@@ -457,7 +477,13 @@ func (e *Etcd) Close() {
457477
}
458478
}
459479
if e.errc != nil {
460-
e.wg.Wait()
480+
e.errcMu.Lock()
481+
e.closingErrc = true
482+
e.errcMu.Unlock()
483+
484+
// e.errWg's initial counter is 1 so we need a matching done called here
485+
e.errcWg.Done()
486+
e.errcWg.Wait()
461487
close(e.errc)
462488
}
463489
}
@@ -872,9 +898,6 @@ func (e *Etcd) serveMetrics() (err error) {
872898
}
873899

874900
func (e *Etcd) errHandler(err error) {
875-
e.wg.Add(1)
876-
defer e.wg.Done()
877-
878901
if err != nil {
879902
e.GetLogger().Error("setting up serving from embedded etcd failed.", zap.Error(err))
880903
}
@@ -883,6 +906,16 @@ func (e *Etcd) errHandler(err error) {
883906
return
884907
default:
885908
}
909+
910+
e.errcMu.RLock()
911+
if e.closingErrc {
912+
return
913+
}
914+
e.errcWg.Add(1)
915+
e.errcMu.RUnlock()
916+
917+
defer e.errcWg.Done()
918+
886919
select {
887920
case <-e.stopc:
888921
case e.errc <- err:

0 commit comments

Comments
 (0)