From 2d28b2096e5003b705c58692055bfe85f7711eb2 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Tue, 7 Apr 2026 15:46:29 +0200 Subject: [PATCH 01/18] Fix raft leader handoff regression after SIGTERM --- block/internal/syncing/raft_retriever.go | 1 + block/internal/syncing/raft_retriever_test.go | 61 +++++++++++++++++++ pkg/raft/node.go | 2 +- 3 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 block/internal/syncing/raft_retriever_test.go diff --git a/block/internal/syncing/raft_retriever.go b/block/internal/syncing/raft_retriever.go index cfa55662bd..aaebb7a458 100644 --- a/block/internal/syncing/raft_retriever.go +++ b/block/internal/syncing/raft_retriever.go @@ -74,6 +74,7 @@ func (r *raftRetriever) Stop() { r.mtx.Unlock() r.wg.Wait() + r.raftNode.SetApplyCallback(nil) } // raftApplyLoop processes blocks received from raft diff --git a/block/internal/syncing/raft_retriever_test.go b/block/internal/syncing/raft_retriever_test.go new file mode 100644 index 0000000000..ec176aad2a --- /dev/null +++ b/block/internal/syncing/raft_retriever_test.go @@ -0,0 +1,61 @@ +package syncing + +import ( + "context" + "sync" + "testing" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + + "github.com/evstack/ev-node/pkg/genesis" + pkgraft "github.com/evstack/ev-node/pkg/raft" +) + +type stubRaftNode struct { + mu sync.Mutex + callbacks []chan<- pkgraft.RaftApplyMsg +} + +func (s *stubRaftNode) IsLeader() bool { return false } + +func (s *stubRaftNode) HasQuorum() bool { return false } + +func (s *stubRaftNode) GetState() *pkgraft.RaftBlockState { return nil } + +func (s *stubRaftNode) Broadcast(context.Context, *pkgraft.RaftBlockState) error { return nil } + +func (s *stubRaftNode) SetApplyCallback(ch chan<- pkgraft.RaftApplyMsg) { + s.mu.Lock() + defer s.mu.Unlock() + s.callbacks = append(s.callbacks, ch) +} + +func (s *stubRaftNode) recordedCallbacks() []chan<- pkgraft.RaftApplyMsg { + s.mu.Lock() + defer s.mu.Unlock() + out := make([]chan<- pkgraft.RaftApplyMsg, len(s.callbacks)) + copy(out, s.callbacks) + return out +} + +func TestRaftRetrieverStopClearsApplyCallback(t *testing.T) { + t.Parallel() + + raftNode := &stubRaftNode{} + retriever := newRaftRetriever( + raftNode, + genesis.Genesis{}, + zerolog.Nop(), + nil, + func(context.Context, *pkgraft.RaftBlockState) error { return nil }, + ) + + require.NoError(t, retriever.Start(t.Context())) + retriever.Stop() + + callbacks := raftNode.recordedCallbacks() + require.Len(t, callbacks, 2) + require.NotNil(t, callbacks[0]) + require.Nil(t, callbacks[1]) +} diff --git a/pkg/raft/node.go b/pkg/raft/node.go index ada6838560..c6b2529e14 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -159,7 +159,7 @@ func (n *Node) waitForMsgsLanded(timeout time.Duration) error { for { select { case <-ticker.C: - if n.raft.AppliedIndex() >= n.raft.LastIndex() { + if n.raft.AppliedIndex() >= n.raft.CommitIndex() { return nil } case <-timeoutTicker.C: From 2106f04cb2b5fda70fe18bcd765dfa547871b0b5 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Wed, 8 Apr 2026 17:03:46 +0200 Subject: [PATCH 02/18] fix: follower crash on restart when EVM is ahead of stale raft snapshot MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug A: RecoverFromRaft crashed with "invalid block height" when the node restarted after SIGTERM and the EVM state (persisted before kill) was ahead of the raft FSM snapshot (which hadn't finished log replay yet). The function now verifies the hash of the local block at raftState.Height — if it matches the snapshot hash the EVM history is correct and recovery is safely skipped; a mismatch returns an error indicating a genuine fork. Bug B: waitForMsgsLanded used two repeating tickers with the same effective period (SendTimeout/2 poll, SendTimeout timeout), so both could fire simultaneously in select and the timeout would win even when AppliedIndex >= CommitIndex. Replaced the deadline ticker with a one-shot time.NewTimer, added a final check in the deadline branch, and reduced poll interval to min(50ms, timeout/4) for more responsive detection. Fixes the crash-restart Docker backoff loop observed in SIGTERM HA test cycle 7 (poc-ha-2 never rejoining within the 300s kill interval). Co-Authored-By: Claude Sonnet 4.6 --- block/internal/syncing/syncer.go | 21 ++++- block/internal/syncing/syncer_test.go | 112 ++++++++++++++++++++++++++ pkg/raft/node.go | 16 +++- 3 files changed, 144 insertions(+), 5 deletions(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 802c1b243d..dbb0e9f8ab 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -1234,7 +1234,26 @@ func (s *Syncer) RecoverFromRaft(ctx context.Context, raftState *raft.RaftBlockS } if currentState.LastBlockHeight > raftState.Height { - return fmt.Errorf("invalid block height: %d (expected %d)", raftState.Height, currentState.LastBlockHeight+1) + // Local EVM is ahead of the raft snapshot. This is expected on restart when + // the raft FSM hasn't finished replaying log entries yet (stale snapshot height), + // or when log entries were compacted and the FSM is awaiting a new snapshot from + // the leader. Verify that our local block at raftState.Height has the same hash + // to confirm shared history before skipping recovery. + localHeader, err := s.store.GetHeader(ctx, raftState.Height) + if err != nil { + return fmt.Errorf("local state ahead of raft snapshot (local=%d raft=%d), cannot verify hash: %w", + currentState.LastBlockHeight, raftState.Height, err) + } + localHash := localHeader.Hash() + if !bytes.Equal(localHash, raftState.Hash) { + return fmt.Errorf("local state diverged from raft at height %d: local hash %x != raft hash %x", + raftState.Height, localHash, raftState.Hash) + } + s.logger.Info(). + Uint64("local_height", currentState.LastBlockHeight). + Uint64("raft_height", raftState.Height). + Msg("local state ahead of stale raft snapshot with matching hash; skipping recovery, raft will catch up") + return nil } return nil diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 1ff2ad35fc..66ac7e9e05 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -422,6 +422,118 @@ func TestSyncer_RecoverFromRaft_KeepsStrictValidationAfterStateExists(t *testing require.ErrorContains(t, err, "invalid chain ID") } +// TestSyncer_RecoverFromRaft_LocalAheadOfStaleSnapshot tests Bug A: when the node +// restarts and the EVM is ahead of the raft FSM snapshot (stale snapshot due to +// timing or log compaction), RecoverFromRaft should skip recovery if the local +// block at raftState.Height has a matching hash, rather than crashing. +func TestSyncer_RecoverFromRaft_LocalAheadOfStaleSnapshot(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + require.NoError(t, err) + + addr, pub, signer := buildSyncTestSigner(t) + gen := genesis.Genesis{ + ChainID: "1234", + InitialHeight: 1, + StartTime: time.Now().Add(-time.Second), + ProposerAddress: addr, + } + + mockExec := testmocks.NewMockExecutor(t) + mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t) + mockDataStore := extmocks.NewMockStore[*types.P2PData](t) + s := NewSyncer( + st, + mockExec, + nil, + cm, + common.NopMetrics(), + config.DefaultConfig(), + gen, + mockHeaderStore, + mockDataStore, + zerolog.Nop(), + common.DefaultBlockOptions(), + make(chan error, 1), + nil, + ) + + // Build block at height 1 and persist it (simulates EVM block persisted before SIGTERM). + data1 := makeData(gen.ChainID, 1, 0) + headerBz1, hdr1 := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, []byte("app1"), data1, nil) + dataBz1, err := data1.MarshalBinary() + require.NoError(t, err) + + batch, err := st.NewBatch(t.Context()) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockDataFromBytes(hdr1, headerBz1, dataBz1, &hdr1.Signature)) + require.NoError(t, batch.SetHeight(1)) + require.NoError(t, batch.UpdateState(types.State{ + ChainID: gen.ChainID, + InitialHeight: 1, + LastBlockHeight: 1, + LastHeaderHash: hdr1.Hash(), + })) + require.NoError(t, batch.Commit()) + + // Simulate EVM at height 1, raft snapshot stale at height 0 — but there is no + // block 0 to check, so use height 1 EVM vs stale snapshot at height 0. + // More realistic: EVM at height 2, raft snapshot at height 1. + // Build a second block and advance the store state to height 2. + data2 := makeData(gen.ChainID, 2, 0) + headerBz2, hdr2 := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, []byte("app2"), data2, hdr1.Hash()) + dataBz2, err := data2.MarshalBinary() + require.NoError(t, err) + + batch2, err := st.NewBatch(t.Context()) + require.NoError(t, err) + require.NoError(t, batch2.SaveBlockDataFromBytes(hdr2, headerBz2, dataBz2, &hdr2.Signature)) + require.NoError(t, batch2.SetHeight(2)) + require.NoError(t, batch2.UpdateState(types.State{ + ChainID: gen.ChainID, + InitialHeight: 1, + LastBlockHeight: 2, + LastHeaderHash: hdr2.Hash(), + })) + require.NoError(t, batch2.Commit()) + + // Set lastState to height 2 (EVM is at 2). + s.SetLastState(types.State{ + ChainID: gen.ChainID, + InitialHeight: 1, + LastBlockHeight: 2, + LastHeaderHash: hdr2.Hash(), + }) + + t.Run("matching hash skips recovery", func(t *testing.T) { + // raft snapshot is stale at height 1 (EVM is at 2); hash matches local block 1. + err := s.RecoverFromRaft(t.Context(), &raft.RaftBlockState{ + Height: 1, + Hash: hdr1.Hash(), + Header: headerBz1, + Data: dataBz1, + }) + require.NoError(t, err, "local ahead of stale raft snapshot with matching hash should not error") + }) + + t.Run("diverged hash returns error", func(t *testing.T) { + wrongHash := make([]byte, len(hdr1.Hash())) + copy(wrongHash, hdr1.Hash()) + wrongHash[0] ^= 0xFF // flip a byte to produce a different hash + + err := s.RecoverFromRaft(t.Context(), &raft.RaftBlockState{ + Height: 1, + Hash: wrongHash, + Header: headerBz1, + Data: dataBz1, + }) + require.Error(t, err) + require.ErrorContains(t, err, "diverged from raft") + }) +} + func TestSyncer_processPendingEvents(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) diff --git a/pkg/raft/node.go b/pkg/raft/node.go index 3fdda58000..a9988f793a 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -146,9 +146,13 @@ func (n *Node) waitForMsgsLanded(timeout time.Duration) error { if n == nil { return nil } - timeoutTicker := time.NewTicker(timeout) - defer timeoutTicker.Stop() - ticker := time.NewTicker(min(n.config.SendTimeout, timeout) / 2) + // Use a one-shot timer for the deadline to avoid the race where a repeating + // ticker and the timeout ticker fire simultaneously in select, causing a + // spurious timeout even when AppliedIndex >= CommitIndex. + deadline := time.NewTimer(timeout) + defer deadline.Stop() + pollInterval := min(50*time.Millisecond, timeout/4) + ticker := time.NewTicker(pollInterval) defer ticker.Stop() for { @@ -157,7 +161,11 @@ func (n *Node) waitForMsgsLanded(timeout time.Duration) error { if n.raft.AppliedIndex() >= n.raft.CommitIndex() { return nil } - case <-timeoutTicker.C: + case <-deadline.C: + // Final check after deadline before giving up. + if n.raft.AppliedIndex() >= n.raft.CommitIndex() { + return nil + } return errors.New("max wait time reached") } } From 52d7cdaef5f027efaff64a4d590b1e08a9f3fb91 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Wed, 8 Apr 2026 17:13:23 +0200 Subject: [PATCH 03/18] fix(raft): guard FSM apply callback with RWMutex to prevent data race SetApplyCallback(nil) called from raftRetriever.Stop() raced with FSM.Apply reading applyCh: wg.Wait() only ensures the consumer goroutine exited, but the raft library can still invoke Apply concurrently. Add applyMu sync.RWMutex to FSM; take write lock in SetApplyCallback and read lock in Apply before reading the channel pointer. Co-Authored-By: Claude Sonnet 4.6 --- pkg/raft/node.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pkg/raft/node.go b/pkg/raft/node.go index a9988f793a..157b437367 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -9,6 +9,7 @@ import ( "os" "path/filepath" "strings" + "sync" "sync/atomic" "time" @@ -43,6 +44,7 @@ type Config struct { type FSM struct { logger zerolog.Logger state *atomic.Pointer[RaftBlockState] + applyMu sync.RWMutex applyCh chan<- RaftApplyMsg } @@ -305,6 +307,8 @@ func (n *Node) Shutdown() error { // The channel must have sufficient buffer space since updates are published only once without blocking. // If the channel is full, state updates will be skipped to prevent blocking the raft cluster. func (n *Node) SetApplyCallback(ch chan<- RaftApplyMsg) { + n.fsm.applyMu.Lock() + defer n.fsm.applyMu.Unlock() n.fsm.applyCh = ch } @@ -327,9 +331,12 @@ func (f *FSM) Apply(log *raft.Log) any { Int("data_bytes", len(state.Data)). Msg("applied raft block state") - if f.applyCh != nil { + f.applyMu.RLock() + ch := f.applyCh + f.applyMu.RUnlock() + if ch != nil { select { - case f.applyCh <- RaftApplyMsg{Index: log.Index, State: &state}: + case ch <- RaftApplyMsg{Index: log.Index, State: &state}: default: // on a slow consumer, the raft cluster should not be blocked. Followers can sync from DA or other peers, too. f.logger.Warn().Msg("apply channel full, dropping message") From b8471f0ff3bff875a0254442824cea5754c45071 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:14:21 +0200 Subject: [PATCH 04/18] feat(raft): add ResignLeader() public method on Node Co-Authored-By: Claude Sonnet 4.6 --- pkg/raft/node.go | 11 +++++++++++ pkg/raft/node_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/pkg/raft/node.go b/pkg/raft/node.go index 157b437367..6c1e578483 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -231,6 +231,17 @@ func (n *Node) leadershipTransfer() error { return n.raft.LeadershipTransfer().Error() } +// ResignLeader synchronously transfers leadership to the most up-to-date follower. +// It is a no-op when the node is nil or not currently the leader. +// Call this before cancelling the node context on graceful shutdown to minimise +// the window where a dying leader could still serve blocks. +func (n *Node) ResignLeader() error { + if n == nil || !n.IsLeader() { + return nil + } + return n.leadershipTransfer() +} + func (n *Node) Config() Config { return *n.config } diff --git a/pkg/raft/node_test.go b/pkg/raft/node_test.go index 67b5ea0392..a56394108f 100644 --- a/pkg/raft/node_test.go +++ b/pkg/raft/node_test.go @@ -4,8 +4,10 @@ import ( "context" "errors" "testing" + "time" "github.com/hashicorp/raft" + "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -114,3 +116,27 @@ func TestNodeStartNilNoop(t *testing.T) { var node *Node require.NoError(t, node.Start(context.Background())) } + +func TestNodeResignLeader_NilNoop(t *testing.T) { + var n *Node + assert.NoError(t, n.ResignLeader()) +} + +func TestNodeResignLeader_NotLeaderNoop(t *testing.T) { + // A raft node that hasn't bootstrapped is never leader. + // Use a temp dir so boltdb can initialize. + dir := t.TempDir() + n, err := NewNode(&Config{ + NodeID: "test", + RaftAddr: "127.0.0.1:0", + RaftDir: dir, + SnapCount: 3, + SendTimeout: 200 * time.Millisecond, + HeartbeatTimeout: 350 * time.Millisecond, + LeaderLeaseTimeout: 175 * time.Millisecond, + }, zerolog.Nop()) + require.NoError(t, err) + defer n.raft.Shutdown() + + assert.NoError(t, n.ResignLeader()) // not leader, must be a noop +} From c6b1a5fe7f18ede71f282db1e710f81a5dca3a6b Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:17:24 +0200 Subject: [PATCH 05/18] feat(node): implement LeaderResigner interface on FullNode Co-Authored-By: Claude Sonnet 4.6 --- node/full.go | 10 ++++++++++ node/node.go | 7 +++++++ 2 files changed, 17 insertions(+) diff --git a/node/full.go b/node/full.go index 01d5e86284..c41e841d2e 100644 --- a/node/full.go +++ b/node/full.go @@ -35,6 +35,7 @@ const ( ) var _ Node = &FullNode{} +var _ LeaderResigner = &FullNode{} type leaderElection interface { Run(ctx context.Context) error @@ -384,3 +385,12 @@ func (n *FullNode) GetGenesisChunks() ([]string, error) { func (n *FullNode) IsRunning() bool { return n.leaderElection.IsRunning() } + +// ResignLeader transfers raft leadership before the node shuts down. +// It is a no-op when raft is not enabled or this node is not the leader. +func (n *FullNode) ResignLeader() error { + if n.raftNode == nil { + return nil + } + return n.raftNode.ResignLeader() +} diff --git a/node/node.go b/node/node.go index d8aeea333f..4d12463a01 100644 --- a/node/node.go +++ b/node/node.go @@ -21,6 +21,13 @@ type Node interface { IsRunning() bool } +// LeaderResigner is an optional interface implemented by nodes that participate +// in Raft leader election. Callers should type-assert to this interface and call +// ResignLeader before cancelling the node context on graceful shutdown. +type LeaderResigner interface { + ResignLeader() error +} + type NodeOptions struct { BlockOptions block.BlockOptions } From 4cdfc54f6b555e3bb3b55b4b5eee2fa964302433 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:19:27 +0200 Subject: [PATCH 06/18] fix(shutdown): resign raft leadership before cancelling context on SIGTERM Co-Authored-By: Claude Sonnet 4.6 --- pkg/cmd/run_node.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkg/cmd/run_node.go b/pkg/cmd/run_node.go index 113a9229ba..28fda1623a 100644 --- a/pkg/cmd/run_node.go +++ b/pkg/cmd/run_node.go @@ -224,6 +224,16 @@ func StartNode( select { case <-quit: logger.Info().Msg("shutting down node...") + // Proactively resign Raft leadership before cancelling the worker context. + // This gives the cluster a chance to elect a new leader before this node + // stops producing blocks, shrinking the unconfirmed-block window. + if resigner, ok := rollnode.(node.LeaderResigner); ok { + if err := resigner.ResignLeader(); err != nil { + logger.Warn().Err(err).Msg("leadership resign on shutdown failed") + } else { + logger.Info().Msg("leadership resigned before shutdown") + } + } cancel() case err := <-errCh: if err != nil && !errors.Is(err, context.Canceled) { From 266c61f79c1ee85ff98f356d36cabe3086ca064a Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:21:18 +0200 Subject: [PATCH 07/18] =?UTF-8?q?feat(config):=20add=20election=5Ftimeout,?= =?UTF-8?q?=20snapshot=5Fthreshold,=20trailing=5Flogs=20to=20RaftConfig;?= =?UTF-8?q?=20fix=20SnapCount=20default=200=E2=86=923?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add three new Raft config parameters: - ElectionTimeout: timeout for candidate to wait for votes (defaults to 1s) - SnapshotThreshold: outstanding log entries that trigger snapshot (defaults to 500) - TrailingLogs: log entries to retain after snapshot (defaults to 200) Fix critical default: SnapCount was 0 (broken, retains no snapshots) → 3 This enables control over Raft's snapshot frequency and recovery behavior to prevent resync debt from accumulating unbounded during normal operation. Co-Authored-By: Claude Sonnet 4.6 --- pkg/config/config.go | 12 ++++++++++++ pkg/config/config_test.go | 5 ++++- pkg/config/defaults.go | 4 ++++ 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 09e85f3e20..cd34158193 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -195,6 +195,12 @@ const ( FlagRaftHeartbeatTimeout = FlagPrefixEvnode + "raft.heartbeat_timeout" // FlagRaftLeaderLeaseTimeout is a flag for specifying leader lease timeout FlagRaftLeaderLeaseTimeout = FlagPrefixEvnode + "raft.leader_lease_timeout" + // FlagRaftElectionTimeout is the flag for the raft election timeout. + FlagRaftElectionTimeout = FlagPrefixEvnode + "raft.election_timeout" + // FlagRaftSnapshotThreshold is the flag for the raft snapshot threshold. + FlagRaftSnapshotThreshold = FlagPrefixEvnode + "raft.snapshot_threshold" + // FlagRaftTrailingLogs is the flag for the number of trailing logs after a snapshot. + FlagRaftTrailingLogs = FlagPrefixEvnode + "raft.trailing_logs" // Pruning configuration flags FlagPruningMode = FlagPrefixEvnode + "pruning.pruning_mode" @@ -406,6 +412,9 @@ type RaftConfig struct { SendTimeout time.Duration `mapstructure:"send_timeout" yaml:"send_timeout" comment:"Max duration to wait for a message to be sent to a peer"` HeartbeatTimeout time.Duration `mapstructure:"heartbeat_timeout" yaml:"heartbeat_timeout" comment:"Time between leader heartbeats to followers"` LeaderLeaseTimeout time.Duration `mapstructure:"leader_lease_timeout" yaml:"leader_lease_timeout" comment:"Duration of the leader lease"` + ElectionTimeout time.Duration `mapstructure:"election_timeout" yaml:"election_timeout" comment:"Time a candidate waits for votes before restarting election; must be >= heartbeat_timeout"` + SnapshotThreshold uint64 `mapstructure:"snapshot_threshold" yaml:"snapshot_threshold" comment:"Number of outstanding log entries that trigger an automatic snapshot"` + TrailingLogs uint64 `mapstructure:"trailing_logs" yaml:"trailing_logs" comment:"Number of log entries to retain after a snapshot (controls rejoin catch-up cost)"` } func (c RaftConfig) Validate() error { @@ -652,6 +661,9 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().Duration(FlagRaftSendTimeout, def.Raft.SendTimeout, "max duration to wait for a message to be sent to a peer") cmd.Flags().Duration(FlagRaftHeartbeatTimeout, def.Raft.HeartbeatTimeout, "time between leader heartbeats to followers") cmd.Flags().Duration(FlagRaftLeaderLeaseTimeout, def.Raft.LeaderLeaseTimeout, "duration of the leader lease") + cmd.Flags().Duration(FlagRaftElectionTimeout, def.Raft.ElectionTimeout, "time a candidate waits for votes before restarting election") + cmd.Flags().Uint64(FlagRaftSnapshotThreshold, def.Raft.SnapshotThreshold, "number of outstanding log entries that trigger an automatic snapshot") + cmd.Flags().Uint64(FlagRaftTrailingLogs, def.Raft.TrailingLogs, "number of log entries to retain after a snapshot") cmd.MarkFlagsMutuallyExclusive(FlagCatchupTimeout, FlagRaftEnable) // Pruning configuration flags diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index cf556803c2..99bb3f1392 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -133,6 +133,9 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagRaftSendTimeout, DefaultConfig().Raft.SendTimeout) assertFlagValue(t, flags, FlagRaftHeartbeatTimeout, DefaultConfig().Raft.HeartbeatTimeout) assertFlagValue(t, flags, FlagRaftLeaderLeaseTimeout, DefaultConfig().Raft.LeaderLeaseTimeout) + assertFlagValue(t, flags, FlagRaftElectionTimeout, DefaultConfig().Raft.ElectionTimeout) + assertFlagValue(t, flags, FlagRaftSnapshotThreshold, DefaultConfig().Raft.SnapshotThreshold) + assertFlagValue(t, flags, FlagRaftTrailingLogs, DefaultConfig().Raft.TrailingLogs) // Pruning flags assertFlagValue(t, flags, FlagPruningMode, DefaultConfig().Pruning.Mode) @@ -140,7 +143,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagPruningInterval, DefaultConfig().Pruning.Interval.Duration) // Count the number of flags we're explicitly checking - expectedFlagCount := 78 // Update this number if you add more flag checks above + expectedFlagCount := 81 // Update this number if you add more flag checks above // Get the actual number of flags (both regular and persistent) actualFlagCount := 0 diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index 91fe68e3fc..e9e9906183 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -118,6 +118,10 @@ func DefaultConfig() Config { SendTimeout: 200 * time.Millisecond, HeartbeatTimeout: 350 * time.Millisecond, LeaderLeaseTimeout: 175 * time.Millisecond, + ElectionTimeout: 1000 * time.Millisecond, + SnapshotThreshold: 500, // at 1 blk/s: snapshot ~every 8 min; limits resync debt + TrailingLogs: 200, // keep 200 logs post-snapshot; bounds catch-up on rejoin + SnapCount: 3, // retain 3 snapshots on disk (was 0 — broken default) RaftDir: filepath.Join(DefaultRootDir, "raft"), }, Pruning: PruningConfig{ From cc39c9ae981e9d6d59b1baaed3b87369670adf58 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:24:42 +0200 Subject: [PATCH 08/18] fix(raft): wire snapshot_threshold, trailing_logs, election_timeout into hashicorp/raft config Co-Authored-By: Claude Sonnet 4.6 --- node/full.go | 3 +++ pkg/raft/node.go | 12 ++++++++++++ pkg/raft/node_test.go | 24 ++++++++++++++++++++++++ 3 files changed, 39 insertions(+) diff --git a/node/full.go b/node/full.go index c41e841d2e..f6d2dcffc8 100644 --- a/node/full.go +++ b/node/full.go @@ -157,6 +157,9 @@ func initRaftNode(nodeConfig config.Config, logger zerolog.Logger) (*raftpkg.Nod SendTimeout: nodeConfig.Raft.SendTimeout, HeartbeatTimeout: nodeConfig.Raft.HeartbeatTimeout, LeaderLeaseTimeout: nodeConfig.Raft.LeaderLeaseTimeout, + ElectionTimeout: nodeConfig.Raft.ElectionTimeout, + SnapshotThreshold: nodeConfig.Raft.SnapshotThreshold, + TrailingLogs: nodeConfig.Raft.TrailingLogs, } if nodeConfig.Raft.Peers != "" { diff --git a/pkg/raft/node.go b/pkg/raft/node.go index 6c1e578483..74128ea5db 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -38,6 +38,9 @@ type Config struct { SendTimeout time.Duration HeartbeatTimeout time.Duration LeaderLeaseTimeout time.Duration + ElectionTimeout time.Duration + SnapshotThreshold uint64 + TrailingLogs uint64 } // FSM implements raft.FSM for block state @@ -59,6 +62,15 @@ func NewNode(cfg *Config, logger zerolog.Logger) (*Node, error) { raftConfig.LogLevel = "INFO" raftConfig.HeartbeatTimeout = cfg.HeartbeatTimeout raftConfig.LeaderLeaseTimeout = cfg.LeaderLeaseTimeout + if cfg.ElectionTimeout > 0 { + raftConfig.ElectionTimeout = cfg.ElectionTimeout + } + if cfg.SnapshotThreshold > 0 { + raftConfig.SnapshotThreshold = cfg.SnapshotThreshold + } + if cfg.TrailingLogs > 0 { + raftConfig.TrailingLogs = cfg.TrailingLogs + } startPointer := new(atomic.Pointer[RaftBlockState]) startPointer.Store(&RaftBlockState{}) diff --git a/pkg/raft/node_test.go b/pkg/raft/node_test.go index a56394108f..c8a362ecc0 100644 --- a/pkg/raft/node_test.go +++ b/pkg/raft/node_test.go @@ -140,3 +140,27 @@ func TestNodeResignLeader_NotLeaderNoop(t *testing.T) { assert.NoError(t, n.ResignLeader()) // not leader, must be a noop } + +func TestNewNode_SnapshotConfigApplied(t *testing.T) { + dir := t.TempDir() + cfg := &Config{ + NodeID: "test", + RaftAddr: "127.0.0.1:0", + RaftDir: dir, + SnapCount: 3, + SendTimeout: 200 * time.Millisecond, + HeartbeatTimeout: 350 * time.Millisecond, + LeaderLeaseTimeout: 175 * time.Millisecond, + ElectionTimeout: 500 * time.Millisecond, + SnapshotThreshold: 42, + TrailingLogs: 7, + } + n, err := NewNode(cfg, zerolog.Nop()) + require.NoError(t, err) + defer n.raft.Shutdown() + + // Verify the config was stored and raft started without error. + assert.Equal(t, cfg.SnapshotThreshold, n.config.SnapshotThreshold) + assert.Equal(t, cfg.TrailingLogs, n.config.TrailingLogs) + assert.Equal(t, cfg.ElectionTimeout, n.config.ElectionTimeout) +} From 135b5af186bcd8641150398f8e5fb77661257fb2 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:26:27 +0200 Subject: [PATCH 09/18] feat(raft): annotate FSM apply log and RaftApplyMsg with raft term for block provenance audit Add Term field to RaftApplyMsg struct to track the raft term in which each block was committed. Update FSM.Apply() debug logging to include both raft_term and raft_index fields alongside block height and hash. This enables better audit trails and debugging of replication issues. Co-Authored-By: Claude Sonnet 4.6 --- pkg/raft/node.go | 4 +++- pkg/raft/types.go | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/raft/node.go b/pkg/raft/node.go index 74128ea5db..0791fc3b3a 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -348,6 +348,8 @@ func (f *FSM) Apply(log *raft.Log) any { f.state.Store(&state) f.logger.Debug(). Uint64("height", state.Height). + Uint64("raft_term", log.Term). + Uint64("raft_index", log.Index). Hex("hash", state.Hash). Uint64("timestamp", state.Timestamp). Int("header_bytes", len(state.Header)). @@ -359,7 +361,7 @@ func (f *FSM) Apply(log *raft.Log) any { f.applyMu.RUnlock() if ch != nil { select { - case ch <- RaftApplyMsg{Index: log.Index, State: &state}: + case ch <- RaftApplyMsg{Index: log.Index, Term: log.Term, State: &state}: default: // on a slow consumer, the raft cluster should not be blocked. Followers can sync from DA or other peers, too. f.logger.Warn().Msg("apply channel full, dropping message") diff --git a/pkg/raft/types.go b/pkg/raft/types.go index 968d9aa959..38d3a5130b 100644 --- a/pkg/raft/types.go +++ b/pkg/raft/types.go @@ -23,5 +23,6 @@ func assertValid(s *RaftBlockState, next *RaftBlockState) error { // RaftApplyMsg is sent when raft applies a log entry type RaftApplyMsg struct { Index uint64 + Term uint64 // raft term in which this entry was committed State *RaftBlockState } From 465203ec0d3c3f2bfc622838aaa5737ed29fef97 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Thu, 9 Apr 2026 16:01:05 +0200 Subject: [PATCH 10/18] fix(ci): fix gci comment alignment in defaults.go; remove boltdb-triggering tests The gci formatter requires single space before inline comments (not aligned double-space). Also removed TestNodeResignLeader_NotLeaderNoop and TestNewNode_SnapshotConfigApplied which create real boltdb-backed raft nodes: boltdb@v1.3.1 has an unsafe pointer alignment issue that panics under Go 1.25's -checkptr. The nil-receiver test (TestNodeResignLeader_NilNoop) is retained as it exercises the same guard without touching boltdb. Co-Authored-By: Claude Sonnet 4.6 --- pkg/config/defaults.go | 6 +++--- pkg/raft/node_test.go | 45 ------------------------------------------ 2 files changed, 3 insertions(+), 48 deletions(-) diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index e9e9906183..2a8d2b4129 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -119,9 +119,9 @@ func DefaultConfig() Config { HeartbeatTimeout: 350 * time.Millisecond, LeaderLeaseTimeout: 175 * time.Millisecond, ElectionTimeout: 1000 * time.Millisecond, - SnapshotThreshold: 500, // at 1 blk/s: snapshot ~every 8 min; limits resync debt - TrailingLogs: 200, // keep 200 logs post-snapshot; bounds catch-up on rejoin - SnapCount: 3, // retain 3 snapshots on disk (was 0 — broken default) + SnapshotThreshold: 500, // at 1 blk/s: snapshot ~every 8 min; limits resync debt + TrailingLogs: 200, // keep 200 logs post-snapshot; bounds catch-up on rejoin + SnapCount: 3, // retain 3 snapshots on disk (was 0 — broken default) RaftDir: filepath.Join(DefaultRootDir, "raft"), }, Pruning: PruningConfig{ diff --git a/pkg/raft/node_test.go b/pkg/raft/node_test.go index c8a362ecc0..de1fea97e1 100644 --- a/pkg/raft/node_test.go +++ b/pkg/raft/node_test.go @@ -4,10 +4,8 @@ import ( "context" "errors" "testing" - "time" "github.com/hashicorp/raft" - "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -121,46 +119,3 @@ func TestNodeResignLeader_NilNoop(t *testing.T) { var n *Node assert.NoError(t, n.ResignLeader()) } - -func TestNodeResignLeader_NotLeaderNoop(t *testing.T) { - // A raft node that hasn't bootstrapped is never leader. - // Use a temp dir so boltdb can initialize. - dir := t.TempDir() - n, err := NewNode(&Config{ - NodeID: "test", - RaftAddr: "127.0.0.1:0", - RaftDir: dir, - SnapCount: 3, - SendTimeout: 200 * time.Millisecond, - HeartbeatTimeout: 350 * time.Millisecond, - LeaderLeaseTimeout: 175 * time.Millisecond, - }, zerolog.Nop()) - require.NoError(t, err) - defer n.raft.Shutdown() - - assert.NoError(t, n.ResignLeader()) // not leader, must be a noop -} - -func TestNewNode_SnapshotConfigApplied(t *testing.T) { - dir := t.TempDir() - cfg := &Config{ - NodeID: "test", - RaftAddr: "127.0.0.1:0", - RaftDir: dir, - SnapCount: 3, - SendTimeout: 200 * time.Millisecond, - HeartbeatTimeout: 350 * time.Millisecond, - LeaderLeaseTimeout: 175 * time.Millisecond, - ElectionTimeout: 500 * time.Millisecond, - SnapshotThreshold: 42, - TrailingLogs: 7, - } - n, err := NewNode(cfg, zerolog.Nop()) - require.NoError(t, err) - defer n.raft.Shutdown() - - // Verify the config was stored and raft started without error. - assert.Equal(t, cfg.SnapshotThreshold, n.config.SnapshotThreshold) - assert.Equal(t, cfg.TrailingLogs, n.config.TrailingLogs) - assert.Equal(t, cfg.ElectionTimeout, n.config.ElectionTimeout) -} From 84b70d4cc0e51d018088cb6f5eebb8eda0e04a0d Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Fri, 10 Apr 2026 12:14:54 +0200 Subject: [PATCH 11/18] fix(raft): suppress boltdb 'Rollback failed: tx closed' log noise MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit hashicorp/raft-boltdb uses defer tx.Rollback() as a safety net on every transaction. When Commit() succeeds, the deferred Rollback() returns bolt.ErrTxClosed and raft-boltdb logs it as an error — even though it is the expected outcome of every successful read or write. The message has no actionable meaning and floods logs at high block rates. Add a one-time stdlib log filter (sync.Once in NewNode) that silently drops lines containing 'tx closed' and forwards everything else to stderr. Co-Authored-By: Claude Sonnet 4.6 --- pkg/raft/node.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/pkg/raft/node.go b/pkg/raft/node.go index 0791fc3b3a..47ddee101e 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -1,10 +1,12 @@ package raft import ( + "bytes" "context" "errors" "fmt" "io" + "log" "net" "os" "path/filepath" @@ -19,6 +21,24 @@ import ( "google.golang.org/protobuf/proto" ) +// suppressBoltNoise redirects the stdlib log output once to drop the +// "Rollback failed: tx closed" messages emitted by hashicorp/raft-boltdb. +// boltdb returns ErrTxClosed when Rollback is called after a successful +// Commit; raft-boltdb unconditionally logs this as an error even though it +// is the expected outcome of every successful transaction. +var suppressBoltNoise sync.Once + +// boltTxClosedFilter is an io.Writer that silently drops log lines containing +// "tx closed" and forwards everything else to the underlying writer. +type boltTxClosedFilter struct{ w io.Writer } + +func (f *boltTxClosedFilter) Write(p []byte) (n int, err error) { + if bytes.Contains(p, []byte("tx closed")) { + return len(p), nil + } + return f.w.Write(p) +} + // Node represents a raft consensus node type Node struct { raft *raft.Raft @@ -53,6 +73,9 @@ type FSM struct { // NewNode creates a new raft node func NewNode(cfg *Config, logger zerolog.Logger) (*Node, error) { + suppressBoltNoise.Do(func() { + log.SetOutput(&boltTxClosedFilter{w: os.Stderr}) + }) if err := os.MkdirAll(cfg.RaftDir, 0755); err != nil { return nil, fmt.Errorf("create raft dir: %w", err) } From 30ef514fd009975200953b7e27c4207ead0e87c6 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Fri, 10 Apr 2026 14:00:54 +0200 Subject: [PATCH 12/18] =?UTF-8?q?fix(raft):=20address=20PR=20review=20?= =?UTF-8?q?=E2=80=94=20shutdown=20wiring,=20error=20logging,=20snap=20docs?= =?UTF-8?q?,=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Call raftRetriever.Stop() in Syncer.Stop() so SetApplyCallback(nil) is actually reached and the goroutine is awaited before wg.Wait() - Log leadershipTransfer error at warn level in Node.Stop() instead of discarding it silently - Fix SnapCount comments in config.go: it retains snapshot files on disk (NewFileSnapshotStore retain param), not log-entry frequency - Extract buildRaftConfig helper from NewNode to enable config wiring tests - Add TestNodeResignLeader_NotLeaderNoop (non-nil node, nil raft → noop) - Add TestNewNode_SnapshotConfigApplied (table-driven, verifies SnapshotThreshold and TrailingLogs wiring with custom and zero values) Co-Authored-By: Claude Sonnet 4.6 --- block/internal/syncing/syncer.go | 5 ++++ pkg/config/config.go | 6 ++--- pkg/raft/node.go | 26 +++++++++++++------- pkg/raft/node_test.go | 41 ++++++++++++++++++++++++++++++++ 4 files changed, 66 insertions(+), 12 deletions(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index dbb0e9f8ab..8e34c7f3fe 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -243,6 +243,11 @@ func (s *Syncer) Stop(ctx context.Context) error { if s.daFollower != nil { s.daFollower.Stop() } + + if s.raftRetriever != nil { + s.raftRetriever.Stop() + } + s.wg.Wait() // Skip draining if we're shutting down due to a critical error (e.g. execution diff --git a/pkg/config/config.go b/pkg/config/config.go index cd34158193..1a57ac7d7e 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -187,7 +187,7 @@ const ( FlagRaftBootstrap = FlagPrefixEvnode + "raft.bootstrap" // FlagRaftPeers is a flag for specifying Raft peer addresses FlagRaftPeers = FlagPrefixEvnode + "raft.peers" - // FlagRaftSnapCount is a flag for specifying snapshot frequency + // FlagRaftSnapCount is a flag for specifying how many snapshot files to retain on disk FlagRaftSnapCount = FlagPrefixEvnode + "raft.snap_count" // FlagRaftSendTimeout max time to wait for a message to be sent to a peer FlagRaftSendTimeout = FlagPrefixEvnode + "raft.send_timeout" @@ -408,7 +408,7 @@ type RaftConfig struct { RaftDir string `mapstructure:"raft_dir" yaml:"raft_dir" comment:"Directory for Raft logs and snapshots"` Bootstrap bool `mapstructure:"bootstrap" yaml:"bootstrap" comment:"Bootstrap a new static Raft cluster during initial bring-up"` Peers string `mapstructure:"peers" yaml:"peers" comment:"Comma-separated list of peer Raft addresses (nodeID@host:port)"` - SnapCount uint64 `mapstructure:"snap_count" yaml:"snap_count" comment:"Number of log entries between snapshots"` + SnapCount uint64 `mapstructure:"snap_count" yaml:"snap_count" comment:"Number of snapshot files to retain on disk"` SendTimeout time.Duration `mapstructure:"send_timeout" yaml:"send_timeout" comment:"Max duration to wait for a message to be sent to a peer"` HeartbeatTimeout time.Duration `mapstructure:"heartbeat_timeout" yaml:"heartbeat_timeout" comment:"Time between leader heartbeats to followers"` LeaderLeaseTimeout time.Duration `mapstructure:"leader_lease_timeout" yaml:"leader_lease_timeout" comment:"Duration of the leader lease"` @@ -657,7 +657,7 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().String(FlagRaftDir, def.Raft.RaftDir, "directory for Raft logs and snapshots") cmd.Flags().Bool(FlagRaftBootstrap, def.Raft.Bootstrap, "bootstrap a new static Raft cluster during initial bring-up") cmd.Flags().String(FlagRaftPeers, def.Raft.Peers, "comma-separated list of peer Raft addresses (nodeID@host:port)") - cmd.Flags().Uint64(FlagRaftSnapCount, def.Raft.SnapCount, "number of log entries between snapshots") + cmd.Flags().Uint64(FlagRaftSnapCount, def.Raft.SnapCount, "number of snapshot files to retain on disk") cmd.Flags().Duration(FlagRaftSendTimeout, def.Raft.SendTimeout, "max duration to wait for a message to be sent to a peer") cmd.Flags().Duration(FlagRaftHeartbeatTimeout, def.Raft.HeartbeatTimeout, "time between leader heartbeats to followers") cmd.Flags().Duration(FlagRaftLeaderLeaseTimeout, def.Raft.LeaderLeaseTimeout, "duration of the leader lease") diff --git a/pkg/raft/node.go b/pkg/raft/node.go index 47ddee101e..26ebafe099 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -72,14 +72,8 @@ type FSM struct { } // NewNode creates a new raft node -func NewNode(cfg *Config, logger zerolog.Logger) (*Node, error) { - suppressBoltNoise.Do(func() { - log.SetOutput(&boltTxClosedFilter{w: os.Stderr}) - }) - if err := os.MkdirAll(cfg.RaftDir, 0755); err != nil { - return nil, fmt.Errorf("create raft dir: %w", err) - } - +// buildRaftConfig converts a Node Config into a hashicorp/raft Config. +func buildRaftConfig(cfg *Config) *raft.Config { raftConfig := raft.DefaultConfig() raftConfig.LocalID = raft.ServerID(cfg.NodeID) raftConfig.LogLevel = "INFO" @@ -94,6 +88,18 @@ func NewNode(cfg *Config, logger zerolog.Logger) (*Node, error) { if cfg.TrailingLogs > 0 { raftConfig.TrailingLogs = cfg.TrailingLogs } + return raftConfig +} + +func NewNode(cfg *Config, logger zerolog.Logger) (*Node, error) { + suppressBoltNoise.Do(func() { + log.SetOutput(&boltTxClosedFilter{w: os.Stderr}) + }) + if err := os.MkdirAll(cfg.RaftDir, 0755); err != nil { + return nil, fmt.Errorf("create raft dir: %w", err) + } + + raftConfig := buildRaftConfig(cfg) startPointer := new(atomic.Pointer[RaftBlockState]) startPointer.Store(&RaftBlockState{}) @@ -218,7 +224,9 @@ func (n *Node) Stop() error { n.logger.Warn().Err(err).Msg("timed out waiting for raft messages to land during shutdown") } if n.IsLeader() { - _ = n.leadershipTransfer() + if err := n.leadershipTransfer(); err != nil { + n.logger.Warn().Err(err).Msg("leadership transfer on shutdown failed") + } } return n.raft.Shutdown().Error() } diff --git a/pkg/raft/node_test.go b/pkg/raft/node_test.go index de1fea97e1..e68856e24e 100644 --- a/pkg/raft/node_test.go +++ b/pkg/raft/node_test.go @@ -119,3 +119,44 @@ func TestNodeResignLeader_NilNoop(t *testing.T) { var n *Node assert.NoError(t, n.ResignLeader()) } + +func TestNodeResignLeader_NotLeaderNoop(t *testing.T) { + // Non-nil node with nil raft field — IsLeader() returns false, no transfer attempted. + n := &Node{} + assert.NoError(t, n.ResignLeader()) +} + +func TestNewNode_SnapshotConfigApplied(t *testing.T) { + specs := map[string]struct { + cfg *Config + expectedSnapshotThreshold uint64 + expectedTrailingLogs uint64 + }{ + "custom values applied": { + cfg: &Config{ + NodeID: "node1", + SnapshotThreshold: 1000, + TrailingLogs: 500, + }, + expectedSnapshotThreshold: 1000, + expectedTrailingLogs: 500, + }, + "zero values use defaults": { + cfg: &Config{ + NodeID: "node1", + SnapshotThreshold: 0, + TrailingLogs: 0, + }, + expectedSnapshotThreshold: raft.DefaultConfig().SnapshotThreshold, + expectedTrailingLogs: raft.DefaultConfig().TrailingLogs, + }, + } + + for name, spec := range specs { + t.Run(name, func(t *testing.T) { + rc := buildRaftConfig(spec.cfg) + assert.Equal(t, spec.expectedSnapshotThreshold, rc.SnapshotThreshold) + assert.Equal(t, spec.expectedTrailingLogs, rc.TrailingLogs) + }) + } +} From 3d94b75cfe7be281a4b733417530d1fb1754e580 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Fri, 10 Apr 2026 14:14:47 +0200 Subject: [PATCH 13/18] =?UTF-8?q?fix(raft):=20address=20code=20review=20is?= =?UTF-8?q?sues=20=E2=80=94=20ShutdownTimeout,=20resign=20fence,=20electio?= =?UTF-8?q?n=20validation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add ShutdownTimeout field (default 5s) to raft Config so Stop() drains committed logs with a meaningful timeout instead of the 200ms SendTimeout - Wrap ResignLeader() in a 3s goroutine+select fence in the SIGTERM handler so a hung leadership transfer cannot block graceful shutdown indefinitely - Validate ElectionTimeout >= HeartbeatTimeout in RaftConfig.Validate() to prevent hashicorp/raft panicking at startup with an invalid config - Fix stale "NewNode creates a new raft node" comment that had migrated onto buildRaftConfig after the function was extracted Co-Authored-By: Claude Sonnet 4.6 --- node/full.go | 1 + pkg/cmd/run_node.go | 17 +++++++++++++---- pkg/config/config.go | 12 ++++++++++++ pkg/config/config_test.go | 16 +++++++++++++++- pkg/config/defaults.go | 1 + pkg/raft/node.go | 4 ++-- 6 files changed, 44 insertions(+), 7 deletions(-) diff --git a/node/full.go b/node/full.go index f6d2dcffc8..fb59ccac2f 100644 --- a/node/full.go +++ b/node/full.go @@ -155,6 +155,7 @@ func initRaftNode(nodeConfig config.Config, logger zerolog.Logger) (*raftpkg.Nod Bootstrap: nodeConfig.Raft.Bootstrap, SnapCount: nodeConfig.Raft.SnapCount, SendTimeout: nodeConfig.Raft.SendTimeout, + ShutdownTimeout: nodeConfig.Raft.ShutdownTimeout, HeartbeatTimeout: nodeConfig.Raft.HeartbeatTimeout, LeaderLeaseTimeout: nodeConfig.Raft.LeaderLeaseTimeout, ElectionTimeout: nodeConfig.Raft.ElectionTimeout, diff --git a/pkg/cmd/run_node.go b/pkg/cmd/run_node.go index 28fda1623a..ff6875525f 100644 --- a/pkg/cmd/run_node.go +++ b/pkg/cmd/run_node.go @@ -228,10 +228,19 @@ func StartNode( // This gives the cluster a chance to elect a new leader before this node // stops producing blocks, shrinking the unconfirmed-block window. if resigner, ok := rollnode.(node.LeaderResigner); ok { - if err := resigner.ResignLeader(); err != nil { - logger.Warn().Err(err).Msg("leadership resign on shutdown failed") - } else { - logger.Info().Msg("leadership resigned before shutdown") + resignCtx, resignCancel := context.WithTimeout(context.Background(), 3*time.Second) + defer resignCancel() + resignDone := make(chan error, 1) + go func() { resignDone <- resigner.ResignLeader() }() + select { + case err := <-resignDone: + if err != nil { + logger.Warn().Err(err).Msg("leadership resign on shutdown failed") + } else { + logger.Info().Msg("leadership resigned before shutdown") + } + case <-resignCtx.Done(): + logger.Warn().Msg("leadership resign timed out") } } cancel() diff --git a/pkg/config/config.go b/pkg/config/config.go index 1a57ac7d7e..47e84e4c4b 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -201,6 +201,8 @@ const ( FlagRaftSnapshotThreshold = FlagPrefixEvnode + "raft.snapshot_threshold" // FlagRaftTrailingLogs is the flag for the number of trailing logs after a snapshot. FlagRaftTrailingLogs = FlagPrefixEvnode + "raft.trailing_logs" + // FlagRaftShutdownTimeout is the flag for how long to wait for committed logs to be applied on graceful shutdown. + FlagRaftShutdownTimeout = FlagPrefixEvnode + "raft.shutdown_timeout" // Pruning configuration flags FlagPruningMode = FlagPrefixEvnode + "pruning.pruning_mode" @@ -410,6 +412,7 @@ type RaftConfig struct { Peers string `mapstructure:"peers" yaml:"peers" comment:"Comma-separated list of peer Raft addresses (nodeID@host:port)"` SnapCount uint64 `mapstructure:"snap_count" yaml:"snap_count" comment:"Number of snapshot files to retain on disk"` SendTimeout time.Duration `mapstructure:"send_timeout" yaml:"send_timeout" comment:"Max duration to wait for a message to be sent to a peer"` + ShutdownTimeout time.Duration `mapstructure:"shutdown_timeout" yaml:"shutdown_timeout" comment:"Max duration to wait for committed raft logs to be applied on graceful shutdown"` HeartbeatTimeout time.Duration `mapstructure:"heartbeat_timeout" yaml:"heartbeat_timeout" comment:"Time between leader heartbeats to followers"` LeaderLeaseTimeout time.Duration `mapstructure:"leader_lease_timeout" yaml:"leader_lease_timeout" comment:"Duration of the leader lease"` ElectionTimeout time.Duration `mapstructure:"election_timeout" yaml:"election_timeout" comment:"Time a candidate waits for votes before restarting election; must be >= heartbeat_timeout"` @@ -436,6 +439,10 @@ func (c RaftConfig) Validate() error { multiErr = errors.Join(multiErr, fmt.Errorf("send timeout must be positive")) } + if c.ShutdownTimeout <= 0 { + multiErr = errors.Join(multiErr, fmt.Errorf("shutdown timeout must be positive")) + } + if c.HeartbeatTimeout <= 0 { multiErr = errors.Join(multiErr, fmt.Errorf("heartbeat timeout must be positive")) } @@ -444,6 +451,10 @@ func (c RaftConfig) Validate() error { multiErr = errors.Join(multiErr, fmt.Errorf("leader lease timeout must be positive")) } + if c.ElectionTimeout > 0 && c.ElectionTimeout < c.HeartbeatTimeout { + multiErr = errors.Join(multiErr, fmt.Errorf("election timeout (%v) must be >= heartbeat timeout (%v)", c.ElectionTimeout, c.HeartbeatTimeout)) + } + return multiErr } @@ -659,6 +670,7 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().String(FlagRaftPeers, def.Raft.Peers, "comma-separated list of peer Raft addresses (nodeID@host:port)") cmd.Flags().Uint64(FlagRaftSnapCount, def.Raft.SnapCount, "number of snapshot files to retain on disk") cmd.Flags().Duration(FlagRaftSendTimeout, def.Raft.SendTimeout, "max duration to wait for a message to be sent to a peer") + cmd.Flags().Duration(FlagRaftShutdownTimeout, def.Raft.ShutdownTimeout, "max duration to wait for committed raft logs to be applied on graceful shutdown") cmd.Flags().Duration(FlagRaftHeartbeatTimeout, def.Raft.HeartbeatTimeout, "time between leader heartbeats to followers") cmd.Flags().Duration(FlagRaftLeaderLeaseTimeout, def.Raft.LeaderLeaseTimeout, "duration of the leader lease") cmd.Flags().Duration(FlagRaftElectionTimeout, def.Raft.ElectionTimeout, "time a candidate waits for votes before restarting election") diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 99bb3f1392..f01d4ff992 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -131,6 +131,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagRaftPeers, DefaultConfig().Raft.Peers) assertFlagValue(t, flags, FlagRaftSnapCount, DefaultConfig().Raft.SnapCount) assertFlagValue(t, flags, FlagRaftSendTimeout, DefaultConfig().Raft.SendTimeout) + assertFlagValue(t, flags, FlagRaftShutdownTimeout, DefaultConfig().Raft.ShutdownTimeout) assertFlagValue(t, flags, FlagRaftHeartbeatTimeout, DefaultConfig().Raft.HeartbeatTimeout) assertFlagValue(t, flags, FlagRaftLeaderLeaseTimeout, DefaultConfig().Raft.LeaderLeaseTimeout) assertFlagValue(t, flags, FlagRaftElectionTimeout, DefaultConfig().Raft.ElectionTimeout) @@ -143,7 +144,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagPruningInterval, DefaultConfig().Pruning.Interval.Duration) // Count the number of flags we're explicitly checking - expectedFlagCount := 81 // Update this number if you add more flag checks above + expectedFlagCount := 82 // Update this number if you add more flag checks above // Get the actual number of flags (both regular and persistent) actualFlagCount := 0 @@ -404,8 +405,10 @@ func TestRaftConfig_Validate(t *testing.T) { Peers: "", SnapCount: 1, SendTimeout: 1 * time.Second, + ShutdownTimeout: 5 * time.Second, HeartbeatTimeout: 1 * time.Second, LeaderLeaseTimeout: 1 * time.Second, + ElectionTimeout: 2 * time.Second, } } @@ -443,6 +446,17 @@ func TestRaftConfig_Validate(t *testing.T) { mutate: func(c *RaftConfig) { c.LeaderLeaseTimeout = 0 }, expErr: "leader lease timeout must be positive", }, + "non-positive shutdown timeout": { + mutate: func(c *RaftConfig) { c.ShutdownTimeout = 0 }, + expErr: "shutdown timeout must be positive", + }, + "election timeout less than heartbeat timeout": { + mutate: func(c *RaftConfig) { c.ElectionTimeout = 500 * time.Millisecond }, + expErr: "election timeout (500ms) must be >= heartbeat timeout (1s)", + }, + "zero election timeout skips check": { + mutate: func(c *RaftConfig) { c.ElectionTimeout = 0 }, + }, "multiple invalid returns last": { mutate: func(c *RaftConfig) { c.NodeID = "" diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index 2a8d2b4129..dec3e6e0e5 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -116,6 +116,7 @@ func DefaultConfig() Config { }, Raft: RaftConfig{ SendTimeout: 200 * time.Millisecond, + ShutdownTimeout: 5 * time.Second, HeartbeatTimeout: 350 * time.Millisecond, LeaderLeaseTimeout: 175 * time.Millisecond, ElectionTimeout: 1000 * time.Millisecond, diff --git a/pkg/raft/node.go b/pkg/raft/node.go index 26ebafe099..3b2f30b0d4 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -56,6 +56,7 @@ type Config struct { Peers []string SnapCount uint64 SendTimeout time.Duration + ShutdownTimeout time.Duration HeartbeatTimeout time.Duration LeaderLeaseTimeout time.Duration ElectionTimeout time.Duration @@ -71,7 +72,6 @@ type FSM struct { applyCh chan<- RaftApplyMsg } -// NewNode creates a new raft node // buildRaftConfig converts a Node Config into a hashicorp/raft Config. func buildRaftConfig(cfg *Config) *raft.Config { raftConfig := raft.DefaultConfig() @@ -220,7 +220,7 @@ func (n *Node) Stop() error { } // Wait for FSM to apply all committed logs before shutdown to prevent state loss. // This ensures pending raft messages are processed before the node stops. - if err := n.waitForMsgsLanded(n.config.SendTimeout); err != nil { + if err := n.waitForMsgsLanded(n.config.ShutdownTimeout); err != nil { n.logger.Warn().Err(err).Msg("timed out waiting for raft messages to land during shutdown") } if n.IsLeader() { From 9ed4946f8812d867965b6013406bc651e429c2e5 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Fri, 10 Apr 2026 14:24:19 +0200 Subject: [PATCH 14/18] style(raft): fix gci struct field alignment in node_test.go gofmt/gci requires minimal alignment; excessive spaces in the TestNewNode_SnapshotConfigApplied struct literal caused a lint failure. Co-Authored-By: Claude Sonnet 4.6 --- pkg/raft/node_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/raft/node_test.go b/pkg/raft/node_test.go index e68856e24e..9d67f3d19e 100644 --- a/pkg/raft/node_test.go +++ b/pkg/raft/node_test.go @@ -128,9 +128,9 @@ func TestNodeResignLeader_NotLeaderNoop(t *testing.T) { func TestNewNode_SnapshotConfigApplied(t *testing.T) { specs := map[string]struct { - cfg *Config - expectedSnapshotThreshold uint64 - expectedTrailingLogs uint64 + cfg *Config + expectedSnapshotThreshold uint64 + expectedTrailingLogs uint64 }{ "custom values applied": { cfg: &Config{ From a2a2599a84377b388c3de3dd28b0f18732a5affd Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Fri, 10 Apr 2026 15:53:57 +0200 Subject: [PATCH 15/18] test: improve patch coverage for raft shutdown and resign paths Add unit tests for lines flagged by Codecov: - boltTxClosedFilter.Write: filter drops "tx closed", forwards others - buildRaftConfig: ElectionTimeout > 0 applied, zero uses default - FullNode.ResignLeader: nil raftNode no-op; non-leader raftNode no-op - Syncer.Stop: raftRetriever.Stop is called when raftRetriever is set - Syncer.RecoverFromRaft: GetHeader failure when local state is ahead of stale raft snapshot returns "cannot verify hash" error Co-Authored-By: Claude Sonnet 4.6 --- block/internal/syncing/syncer_test.go | 53 +++++++++++++++++++++ node/full_node_test.go | 13 ++++++ pkg/raft/node_test.go | 66 +++++++++++++++++++++++++++ 3 files changed, 132 insertions(+) diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 66ac7e9e05..4a638c47f1 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -532,6 +532,59 @@ func TestSyncer_RecoverFromRaft_LocalAheadOfStaleSnapshot(t *testing.T) { require.Error(t, err) require.ErrorContains(t, err, "diverged from raft") }) + + t.Run("get header fails returns error", func(t *testing.T) { + // lastState is at height 2; raft snapshot at height 0. + // No block is stored at height 0, so GetHeader fails. + err := s.RecoverFromRaft(t.Context(), &raft.RaftBlockState{ + Height: 0, + Hash: make([]byte, 32), + Header: headerBz1, + Data: dataBz1, + }) + require.Error(t, err) + require.ErrorContains(t, err, "cannot verify hash") + }) +} + +func TestSyncer_Stop_CallsRaftRetrieverStop(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + require.NoError(t, err) + + raftNode := &stubRaftNode{} + s := NewSyncer( + st, + nil, + nil, + cm, + common.NopMetrics(), + config.DefaultConfig(), + genesis.Genesis{}, + nil, + nil, + zerolog.Nop(), + common.DefaultBlockOptions(), + make(chan error, 1), + raftNode, + ) + + require.NotNil(t, s.raftRetriever, "raftRetriever should be set when raftNode is provided") + + // Manually set cancel so Stop() doesn't bail out early (simulates having been started). + ctx, cancel := context.WithCancel(t.Context()) + s.ctx = ctx + s.cancel = cancel + + require.NoError(t, s.Stop(t.Context())) + + // raftRetriever.Stop clears the apply callback (sets it to nil). + // The stub records each SetApplyCallback call; the last one should be nil. + callbacks := raftNode.recordedCallbacks() + require.NotEmpty(t, callbacks, "expected at least one callback registration") + assert.Nil(t, callbacks[len(callbacks)-1], "last callback should be nil after Stop") } func TestSyncer_processPendingEvents(t *testing.T) { diff --git a/node/full_node_test.go b/node/full_node_test.go index 23c2626f40..2421ab7079 100644 --- a/node/full_node_test.go +++ b/node/full_node_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + raftpkg "github.com/evstack/ev-node/pkg/raft" "github.com/evstack/ev-node/pkg/service" ) @@ -82,3 +83,15 @@ func TestStartInstrumentationServer(t *testing.T) { assert.NoError(err, "Pprof server shutdown should not return error") } } + +func TestFullNode_ResignLeader_NilRaftNode(t *testing.T) { + n := &FullNode{} // raftNode is nil + assert.NoError(t, n.ResignLeader()) +} + +func TestFullNode_ResignLeader_NonLeaderRaftNode(t *testing.T) { + // Empty *raftpkg.Node has nil raft field so IsLeader() returns false; + // ResignLeader() is a no-op and returns nil. + n := &FullNode{raftNode: &raftpkg.Node{}} + assert.NoError(t, n.ResignLeader()) +} diff --git a/pkg/raft/node_test.go b/pkg/raft/node_test.go index 9d67f3d19e..7e90aeeb0c 100644 --- a/pkg/raft/node_test.go +++ b/pkg/raft/node_test.go @@ -1,15 +1,81 @@ package raft import ( + "bytes" "context" "errors" "testing" + "time" "github.com/hashicorp/raft" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +func TestBoltTxClosedFilter_Write(t *testing.T) { + specs := map[string]struct { + input string + expectFwd bool + }{ + "passes through normal log line": { + input: "some normal log message\n", + expectFwd: true, + }, + "drops line containing tx closed": { + input: "Rollback failed: tx closed\n", + expectFwd: false, + }, + "drops line with tx closed anywhere": { + input: "error: tx closed due to commit\n", + expectFwd: false, + }, + } + + for name, spec := range specs { + t.Run(name, func(t *testing.T) { + var buf bytes.Buffer + f := &boltTxClosedFilter{w: &buf} + n, err := f.Write([]byte(spec.input)) + require.NoError(t, err) + assert.Equal(t, len(spec.input), n) + if spec.expectFwd { + assert.Equal(t, spec.input, buf.String()) + } else { + assert.Empty(t, buf.String()) + } + }) + } +} + +func TestBuildRaftConfig_ElectionTimeout(t *testing.T) { + specs := map[string]struct { + cfg *Config + expectedElectionTimeout time.Duration + }{ + "custom election timeout is applied": { + cfg: &Config{ + NodeID: "node1", + ElectionTimeout: 500 * time.Millisecond, + }, + expectedElectionTimeout: 500 * time.Millisecond, + }, + "zero election timeout uses default": { + cfg: &Config{ + NodeID: "node1", + ElectionTimeout: 0, + }, + expectedElectionTimeout: raft.DefaultConfig().ElectionTimeout, + }, + } + + for name, spec := range specs { + t.Run(name, func(t *testing.T) { + rc := buildRaftConfig(spec.cfg) + assert.Equal(t, spec.expectedElectionTimeout, rc.ElectionTimeout) + }) + } +} + func TestSplitPeerAddr(t *testing.T) { specs := map[string]struct { in string From 4d105a27baece54fcdae427d50dcb2761f20e3db Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Fri, 10 Apr 2026 19:22:19 +0200 Subject: [PATCH 16/18] fix(config): reject negative ElectionTimeout in RaftConfig.Validate A negative ElectionTimeout was silently ignored (buildRaftConfig only applies values > 0), allowing a misconfigured node to start with the library default instead of failing fast. Add an explicit < 0 check that returns an error; 0 remains valid as the "use library default" sentinel. Co-Authored-By: Claude Sonnet 4.6 --- pkg/config/config.go | 4 +++- pkg/config/config_test.go | 4 ++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 47e84e4c4b..d0a334b7a6 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -451,7 +451,9 @@ func (c RaftConfig) Validate() error { multiErr = errors.Join(multiErr, fmt.Errorf("leader lease timeout must be positive")) } - if c.ElectionTimeout > 0 && c.ElectionTimeout < c.HeartbeatTimeout { + if c.ElectionTimeout < 0 { + multiErr = errors.Join(multiErr, fmt.Errorf("election timeout (%v) must be >= 0", c.ElectionTimeout)) + } else if c.ElectionTimeout > 0 && c.ElectionTimeout < c.HeartbeatTimeout { multiErr = errors.Join(multiErr, fmt.Errorf("election timeout (%v) must be >= heartbeat timeout (%v)", c.ElectionTimeout, c.HeartbeatTimeout)) } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index f01d4ff992..483bfbee9c 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -450,6 +450,10 @@ func TestRaftConfig_Validate(t *testing.T) { mutate: func(c *RaftConfig) { c.ShutdownTimeout = 0 }, expErr: "shutdown timeout must be positive", }, + "negative election timeout rejected": { + mutate: func(c *RaftConfig) { c.ElectionTimeout = -1 * time.Second }, + expErr: "election timeout (-1s) must be >= 0", + }, "election timeout less than heartbeat timeout": { mutate: func(c *RaftConfig) { c.ElectionTimeout = 500 * time.Millisecond }, expErr: "election timeout (500ms) must be >= heartbeat timeout (1s)", From fc023b878e8d371310d560e4d76ba56db6da5082 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Fri, 10 Apr 2026 19:22:29 +0200 Subject: [PATCH 17/18] fix(raft): preserve stdlib logger writer in bolt filter; propagate ctx through ResignLeader - suppressBoltNoise.Do now wraps log.Writer() instead of os.Stderr so any existing stdlib logger redirection is preserved rather than clobbered - ResignLeader now accepts a context.Context: leadershipTransfer runs in a goroutine and a select abandons the caller at ctx.Done(), returning ctx.Err(); the goroutine itself exits once the inner raft transfer completes (bounded by ElectionTimeout) Co-Authored-By: Claude Sonnet 4.6 --- pkg/raft/node.go | 14 +++++++++++--- pkg/raft/node_test.go | 4 ++-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/pkg/raft/node.go b/pkg/raft/node.go index 3b2f30b0d4..0d136a9539 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -93,7 +93,7 @@ func buildRaftConfig(cfg *Config) *raft.Config { func NewNode(cfg *Config, logger zerolog.Logger) (*Node, error) { suppressBoltNoise.Do(func() { - log.SetOutput(&boltTxClosedFilter{w: os.Stderr}) + log.SetOutput(&boltTxClosedFilter{w: log.Writer()}) }) if err := os.MkdirAll(cfg.RaftDir, 0755); err != nil { return nil, fmt.Errorf("create raft dir: %w", err) @@ -278,11 +278,19 @@ func (n *Node) leadershipTransfer() error { // It is a no-op when the node is nil or not currently the leader. // Call this before cancelling the node context on graceful shutdown to minimise // the window where a dying leader could still serve blocks. -func (n *Node) ResignLeader() error { +// The transfer is abandoned and ctx.Err() is returned if ctx expires first. +func (n *Node) ResignLeader(ctx context.Context) error { if n == nil || !n.IsLeader() { return nil } - return n.leadershipTransfer() + done := make(chan error, 1) + go func() { done <- n.leadershipTransfer() }() + select { + case err := <-done: + return err + case <-ctx.Done(): + return ctx.Err() + } } func (n *Node) Config() Config { diff --git a/pkg/raft/node_test.go b/pkg/raft/node_test.go index 7e90aeeb0c..8516f7cf9d 100644 --- a/pkg/raft/node_test.go +++ b/pkg/raft/node_test.go @@ -183,13 +183,13 @@ func TestNodeStartNilNoop(t *testing.T) { func TestNodeResignLeader_NilNoop(t *testing.T) { var n *Node - assert.NoError(t, n.ResignLeader()) + assert.NoError(t, n.ResignLeader(context.Background())) } func TestNodeResignLeader_NotLeaderNoop(t *testing.T) { // Non-nil node with nil raft field — IsLeader() returns false, no transfer attempted. n := &Node{} - assert.NoError(t, n.ResignLeader()) + assert.NoError(t, n.ResignLeader(context.Background())) } func TestNewNode_SnapshotConfigApplied(t *testing.T) { From 2d3dc8eab1f3de52bc536aface2a07f16332c481 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Fri, 10 Apr 2026 19:22:35 +0200 Subject: [PATCH 18/18] fix(node): propagate context through LeaderResigner.ResignLeader interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - LeaderResigner.ResignLeader() → ResignLeader(ctx context.Context) error - FullNode.ResignLeader passes ctx down to raft.Node.ResignLeader - run_node.go calls resigner.ResignLeader(resignCtx) directly — no wrapper goroutine/select needed; context.DeadlineExceeded vs other errors are logged distinctly - Merge TestFullNode_ResignLeader_NilRaftNode and TestFullNode_ResignLeader_NonLeaderRaftNode into single table-driven test Co-Authored-By: Claude Sonnet 4.6 --- node/full.go | 4 ++-- node/full_node_test.go | 24 ++++++++++++++---------- node/node.go | 4 +++- pkg/cmd/run_node.go | 15 ++++++--------- 4 files changed, 25 insertions(+), 22 deletions(-) diff --git a/node/full.go b/node/full.go index fb59ccac2f..870d1fdb2a 100644 --- a/node/full.go +++ b/node/full.go @@ -392,9 +392,9 @@ func (n *FullNode) IsRunning() bool { // ResignLeader transfers raft leadership before the node shuts down. // It is a no-op when raft is not enabled or this node is not the leader. -func (n *FullNode) ResignLeader() error { +func (n *FullNode) ResignLeader(ctx context.Context) error { if n.raftNode == nil { return nil } - return n.raftNode.ResignLeader() + return n.raftNode.ResignLeader(ctx) } diff --git a/node/full_node_test.go b/node/full_node_test.go index 2421ab7079..a2b874ba18 100644 --- a/node/full_node_test.go +++ b/node/full_node_test.go @@ -84,14 +84,18 @@ func TestStartInstrumentationServer(t *testing.T) { } } -func TestFullNode_ResignLeader_NilRaftNode(t *testing.T) { - n := &FullNode{} // raftNode is nil - assert.NoError(t, n.ResignLeader()) -} - -func TestFullNode_ResignLeader_NonLeaderRaftNode(t *testing.T) { - // Empty *raftpkg.Node has nil raft field so IsLeader() returns false; - // ResignLeader() is a no-op and returns nil. - n := &FullNode{raftNode: &raftpkg.Node{}} - assert.NoError(t, n.ResignLeader()) +func TestFullNode_ResignLeader_Noop(t *testing.T) { + cases := []struct { + name string + node *FullNode + }{ + {name: "nil raftNode", node: &FullNode{}}, + // Empty *raftpkg.Node has nil raft field so IsLeader() returns false. + {name: "non-leader raftNode", node: &FullNode{raftNode: &raftpkg.Node{}}}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + assert.NoError(t, tc.node.ResignLeader(context.Background())) + }) + } } diff --git a/node/node.go b/node/node.go index 4d12463a01..c42721ea75 100644 --- a/node/node.go +++ b/node/node.go @@ -1,6 +1,8 @@ package node import ( + "context" + ds "github.com/ipfs/go-datastore" "github.com/rs/zerolog" @@ -25,7 +27,7 @@ type Node interface { // in Raft leader election. Callers should type-assert to this interface and call // ResignLeader before cancelling the node context on graceful shutdown. type LeaderResigner interface { - ResignLeader() error + ResignLeader(ctx context.Context) error } type NodeOptions struct { diff --git a/pkg/cmd/run_node.go b/pkg/cmd/run_node.go index ff6875525f..8b241c98db 100644 --- a/pkg/cmd/run_node.go +++ b/pkg/cmd/run_node.go @@ -230,17 +230,14 @@ func StartNode( if resigner, ok := rollnode.(node.LeaderResigner); ok { resignCtx, resignCancel := context.WithTimeout(context.Background(), 3*time.Second) defer resignCancel() - resignDone := make(chan error, 1) - go func() { resignDone <- resigner.ResignLeader() }() - select { - case err := <-resignDone: - if err != nil { - logger.Warn().Err(err).Msg("leadership resign on shutdown failed") + if err := resigner.ResignLeader(resignCtx); err != nil { + if errors.Is(err, context.DeadlineExceeded) { + logger.Warn().Msg("leadership resign timed out") } else { - logger.Info().Msg("leadership resigned before shutdown") + logger.Warn().Err(err).Msg("leadership resign on shutdown failed") } - case <-resignCtx.Done(): - logger.Warn().Msg("leadership resign timed out") + } else { + logger.Info().Msg("leadership resigned before shutdown") } } cancel()