diff --git a/block/internal/syncing/raft_retriever.go b/block/internal/syncing/raft_retriever.go index cfa55662bd..aaebb7a458 100644 --- a/block/internal/syncing/raft_retriever.go +++ b/block/internal/syncing/raft_retriever.go @@ -74,6 +74,7 @@ func (r *raftRetriever) Stop() { r.mtx.Unlock() r.wg.Wait() + r.raftNode.SetApplyCallback(nil) } // raftApplyLoop processes blocks received from raft diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 4d2cbb4afe..c1791ea89e 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -247,6 +247,11 @@ func (s *Syncer) Stop(ctx context.Context) error { if s.daFollower != nil { s.daFollower.Stop() } + + if s.raftRetriever != nil { + s.raftRetriever.Stop() + } + s.wg.Wait() // Skip draining if we're shutting down due to a critical error (e.g. execution @@ -1237,7 +1242,26 @@ func (s *Syncer) RecoverFromRaft(ctx context.Context, raftState *raft.RaftBlockS } if currentState.LastBlockHeight > raftState.Height { - return fmt.Errorf("invalid block height: %d (expected %d)", raftState.Height, currentState.LastBlockHeight+1) + // Local EVM is ahead of the raft snapshot. This is expected on restart when + // the raft FSM hasn't finished replaying log entries yet (stale snapshot height), + // or when log entries were compacted and the FSM is awaiting a new snapshot from + // the leader. Verify that our local block at raftState.Height has the same hash + // to confirm shared history before skipping recovery. + localHeader, err := s.store.GetHeader(ctx, raftState.Height) + if err != nil { + return fmt.Errorf("local state ahead of raft snapshot (local=%d raft=%d), cannot verify hash: %w", + currentState.LastBlockHeight, raftState.Height, err) + } + localHash := localHeader.Hash() + if !bytes.Equal(localHash, raftState.Hash) { + return fmt.Errorf("local state diverged from raft at height %d: local hash %x != raft hash %x", + raftState.Height, localHash, raftState.Hash) + } + s.logger.Info(). + Uint64("local_height", currentState.LastBlockHeight). + Uint64("raft_height", raftState.Height). + Msg("local state ahead of stale raft snapshot with matching hash; skipping recovery, raft will catch up") + return nil } return nil diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 1ff2ad35fc..67c87e06ed 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -6,6 +6,7 @@ import ( "crypto/sha512" "errors" "math" + "sync" "sync/atomic" "testing" "testing/synctest" @@ -35,6 +36,30 @@ import ( "github.com/evstack/ev-node/types" ) +// stubRaftNode is a minimal RaftNode stub that records SetApplyCallback calls. +type stubRaftNode struct { + mu sync.Mutex + callbacks []chan<- raft.RaftApplyMsg +} + +func (s *stubRaftNode) IsLeader() bool { return false } +func (s *stubRaftNode) HasQuorum() bool { return false } +func (s *stubRaftNode) GetState() *raft.RaftBlockState { return nil } +func (s *stubRaftNode) Broadcast(context.Context, *raft.RaftBlockState) error { return nil } +func (s *stubRaftNode) SetApplyCallback(ch chan<- raft.RaftApplyMsg) { + s.mu.Lock() + defer s.mu.Unlock() + s.callbacks = append(s.callbacks, ch) +} + +func (s *stubRaftNode) recordedCallbacks() []chan<- raft.RaftApplyMsg { + s.mu.Lock() + defer s.mu.Unlock() + out := make([]chan<- raft.RaftApplyMsg, len(s.callbacks)) + copy(out, s.callbacks) + return out +} + // helper to create a signer, pubkey and address for tests func buildSyncTestSigner(tb testing.TB) (addr []byte, pub crypto.PubKey, signer signerpkg.Signer) { tb.Helper() @@ -422,6 +447,171 @@ func TestSyncer_RecoverFromRaft_KeepsStrictValidationAfterStateExists(t *testing require.ErrorContains(t, err, "invalid chain ID") } +// TestSyncer_RecoverFromRaft_LocalAheadOfStaleSnapshot tests Bug A: when the node +// restarts and the EVM is ahead of the raft FSM snapshot (stale snapshot due to +// timing or log compaction), RecoverFromRaft should skip recovery if the local +// block at raftState.Height has a matching hash, rather than crashing. +func TestSyncer_RecoverFromRaft_LocalAheadOfStaleSnapshot(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + require.NoError(t, err) + + addr, pub, signer := buildSyncTestSigner(t) + gen := genesis.Genesis{ + ChainID: "1234", + InitialHeight: 1, + StartTime: time.Now().Add(-time.Second), + ProposerAddress: addr, + } + + mockExec := testmocks.NewMockExecutor(t) + mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t) + mockDataStore := extmocks.NewMockStore[*types.P2PData](t) + s := NewSyncer( + st, + mockExec, + nil, + cm, + common.NopMetrics(), + config.DefaultConfig(), + gen, + mockHeaderStore, + mockDataStore, + zerolog.Nop(), + common.DefaultBlockOptions(), + make(chan error, 1), + nil, + ) + + // Build block at height 1 and persist it (simulates EVM block persisted before SIGTERM). + data1 := makeData(gen.ChainID, 1, 0) + headerBz1, hdr1 := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, []byte("app1"), data1, nil) + dataBz1, err := data1.MarshalBinary() + require.NoError(t, err) + + batch, err := st.NewBatch(t.Context()) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockDataFromBytes(hdr1, headerBz1, dataBz1, &hdr1.Signature)) + require.NoError(t, batch.SetHeight(1)) + require.NoError(t, batch.UpdateState(types.State{ + ChainID: gen.ChainID, + InitialHeight: 1, + LastBlockHeight: 1, + LastHeaderHash: hdr1.Hash(), + })) + require.NoError(t, batch.Commit()) + + // Simulate EVM at height 1, raft snapshot stale at height 0 — but there is no + // block 0 to check, so use height 1 EVM vs stale snapshot at height 0. + // More realistic: EVM at height 2, raft snapshot at height 1. + // Build a second block and advance the store state to height 2. + data2 := makeData(gen.ChainID, 2, 0) + headerBz2, hdr2 := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, []byte("app2"), data2, hdr1.Hash()) + dataBz2, err := data2.MarshalBinary() + require.NoError(t, err) + + batch2, err := st.NewBatch(t.Context()) + require.NoError(t, err) + require.NoError(t, batch2.SaveBlockDataFromBytes(hdr2, headerBz2, dataBz2, &hdr2.Signature)) + require.NoError(t, batch2.SetHeight(2)) + require.NoError(t, batch2.UpdateState(types.State{ + ChainID: gen.ChainID, + InitialHeight: 1, + LastBlockHeight: 2, + LastHeaderHash: hdr2.Hash(), + })) + require.NoError(t, batch2.Commit()) + + // Set lastState to height 2 (EVM is at 2). + s.SetLastState(types.State{ + ChainID: gen.ChainID, + InitialHeight: 1, + LastBlockHeight: 2, + LastHeaderHash: hdr2.Hash(), + }) + + t.Run("matching hash skips recovery", func(t *testing.T) { + // raft snapshot is stale at height 1 (EVM is at 2); hash matches local block 1. + err := s.RecoverFromRaft(t.Context(), &raft.RaftBlockState{ + Height: 1, + Hash: hdr1.Hash(), + Header: headerBz1, + Data: dataBz1, + }) + require.NoError(t, err, "local ahead of stale raft snapshot with matching hash should not error") + }) + + t.Run("diverged hash returns error", func(t *testing.T) { + wrongHash := make([]byte, len(hdr1.Hash())) + copy(wrongHash, hdr1.Hash()) + wrongHash[0] ^= 0xFF // flip a byte to produce a different hash + + err := s.RecoverFromRaft(t.Context(), &raft.RaftBlockState{ + Height: 1, + Hash: wrongHash, + Header: headerBz1, + Data: dataBz1, + }) + require.Error(t, err) + require.ErrorContains(t, err, "diverged from raft") + }) + + t.Run("get header fails returns error", func(t *testing.T) { + // lastState is at height 2; raft snapshot at height 0. + // No block is stored at height 0, so GetHeader fails. + err := s.RecoverFromRaft(t.Context(), &raft.RaftBlockState{ + Height: 0, + Hash: make([]byte, 32), + Header: headerBz1, + Data: dataBz1, + }) + require.Error(t, err) + require.ErrorContains(t, err, "cannot verify hash") + }) +} + +func TestSyncer_Stop_CallsRaftRetrieverStop(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + require.NoError(t, err) + + raftNode := &stubRaftNode{} + s := NewSyncer( + st, + nil, + nil, + cm, + common.NopMetrics(), + config.DefaultConfig(), + genesis.Genesis{}, + nil, + nil, + zerolog.Nop(), + common.DefaultBlockOptions(), + make(chan error, 1), + raftNode, + ) + + require.NotNil(t, s.raftRetriever, "raftRetriever should be set when raftNode is provided") + + // Manually set cancel so Stop() doesn't bail out early (simulates having been started). + ctx, cancel := context.WithCancel(t.Context()) + s.ctx = ctx + s.cancel = cancel + + require.NoError(t, s.Stop(t.Context())) + + // raftRetriever.Stop clears the apply callback (sets it to nil). + // The stub records each SetApplyCallback call; the last one should be nil. + callbacks := raftNode.recordedCallbacks() + require.NotEmpty(t, callbacks, "expected at least one callback registration") + assert.Nil(t, callbacks[len(callbacks)-1], "last callback should be nil after Stop") +} + func TestSyncer_processPendingEvents(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) diff --git a/go.mod b/go.mod index c320bfdb58..301e3ed86b 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/go-kit/kit v0.13.0 github.com/go-viper/mapstructure/v2 v2.5.0 github.com/goccy/go-yaml v1.19.2 + github.com/hashicorp/go-hclog v1.6.3 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/hashicorp/raft v1.7.3 github.com/hashicorp/raft-boltdb v0.0.0-20251103221153-05f9dd7a5148 @@ -102,7 +103,6 @@ require ( github.com/googleapis/gax-go/v2 v2.20.0 // indirect github.com/gorilla/websocket v1.5.3 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect - github.com/hashicorp/go-hclog v1.6.3 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect github.com/hashicorp/go-metrics v0.5.4 // indirect github.com/hashicorp/go-msgpack v0.5.5 // indirect diff --git a/node/full.go b/node/full.go index 01d5e86284..bd44f9ef42 100644 --- a/node/full.go +++ b/node/full.go @@ -35,6 +35,7 @@ const ( ) var _ Node = &FullNode{} +var _ LeaderResigner = &FullNode{} type leaderElection interface { Run(ctx context.Context) error @@ -154,8 +155,12 @@ func initRaftNode(nodeConfig config.Config, logger zerolog.Logger) (*raftpkg.Nod Bootstrap: nodeConfig.Raft.Bootstrap, SnapCount: nodeConfig.Raft.SnapCount, SendTimeout: nodeConfig.Raft.SendTimeout, + ShutdownTimeout: 5 * nodeConfig.Raft.SendTimeout, HeartbeatTimeout: nodeConfig.Raft.HeartbeatTimeout, LeaderLeaseTimeout: nodeConfig.Raft.LeaderLeaseTimeout, + ElectionTimeout: nodeConfig.Raft.ElectionTimeout, + SnapshotThreshold: nodeConfig.Raft.SnapshotThreshold, + TrailingLogs: nodeConfig.Raft.TrailingLogs, } if nodeConfig.Raft.Peers != "" { @@ -384,3 +389,12 @@ func (n *FullNode) GetGenesisChunks() ([]string, error) { func (n *FullNode) IsRunning() bool { return n.leaderElection.IsRunning() } + +// ResignLeader transfers raft leadership before the node shuts down. +// It is a no-op when raft is not enabled or this node is not the leader. +func (n *FullNode) ResignLeader(ctx context.Context) error { + if n.raftNode == nil { + return nil + } + return n.raftNode.ResignLeader(ctx) +} diff --git a/node/full_node_test.go b/node/full_node_test.go index 23c2626f40..a2b874ba18 100644 --- a/node/full_node_test.go +++ b/node/full_node_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + raftpkg "github.com/evstack/ev-node/pkg/raft" "github.com/evstack/ev-node/pkg/service" ) @@ -82,3 +83,19 @@ func TestStartInstrumentationServer(t *testing.T) { assert.NoError(err, "Pprof server shutdown should not return error") } } + +func TestFullNode_ResignLeader_Noop(t *testing.T) { + cases := []struct { + name string + node *FullNode + }{ + {name: "nil raftNode", node: &FullNode{}}, + // Empty *raftpkg.Node has nil raft field so IsLeader() returns false. + {name: "non-leader raftNode", node: &FullNode{raftNode: &raftpkg.Node{}}}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + assert.NoError(t, tc.node.ResignLeader(context.Background())) + }) + } +} diff --git a/node/node.go b/node/node.go index d8aeea333f..c42721ea75 100644 --- a/node/node.go +++ b/node/node.go @@ -1,6 +1,8 @@ package node import ( + "context" + ds "github.com/ipfs/go-datastore" "github.com/rs/zerolog" @@ -21,6 +23,13 @@ type Node interface { IsRunning() bool } +// LeaderResigner is an optional interface implemented by nodes that participate +// in Raft leader election. Callers should type-assert to this interface and call +// ResignLeader before cancelling the node context on graceful shutdown. +type LeaderResigner interface { + ResignLeader(ctx context.Context) error +} + type NodeOptions struct { BlockOptions block.BlockOptions } diff --git a/pkg/cmd/run_node.go b/pkg/cmd/run_node.go index 113a9229ba..8b241c98db 100644 --- a/pkg/cmd/run_node.go +++ b/pkg/cmd/run_node.go @@ -224,6 +224,22 @@ func StartNode( select { case <-quit: logger.Info().Msg("shutting down node...") + // Proactively resign Raft leadership before cancelling the worker context. + // This gives the cluster a chance to elect a new leader before this node + // stops producing blocks, shrinking the unconfirmed-block window. + if resigner, ok := rollnode.(node.LeaderResigner); ok { + resignCtx, resignCancel := context.WithTimeout(context.Background(), 3*time.Second) + defer resignCancel() + if err := resigner.ResignLeader(resignCtx); err != nil { + if errors.Is(err, context.DeadlineExceeded) { + logger.Warn().Msg("leadership resign timed out") + } else { + logger.Warn().Err(err).Msg("leadership resign on shutdown failed") + } + } else { + logger.Info().Msg("leadership resigned before shutdown") + } + } cancel() case err := <-errCh: if err != nil && !errors.Is(err, context.Canceled) { diff --git a/pkg/config/config.go b/pkg/config/config.go index 09e85f3e20..850f098dc7 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -187,7 +187,7 @@ const ( FlagRaftBootstrap = FlagPrefixEvnode + "raft.bootstrap" // FlagRaftPeers is a flag for specifying Raft peer addresses FlagRaftPeers = FlagPrefixEvnode + "raft.peers" - // FlagRaftSnapCount is a flag for specifying snapshot frequency + // FlagRaftSnapCount is a flag for specifying how many snapshot files to retain on disk FlagRaftSnapCount = FlagPrefixEvnode + "raft.snap_count" // FlagRaftSendTimeout max time to wait for a message to be sent to a peer FlagRaftSendTimeout = FlagPrefixEvnode + "raft.send_timeout" @@ -195,7 +195,12 @@ const ( FlagRaftHeartbeatTimeout = FlagPrefixEvnode + "raft.heartbeat_timeout" // FlagRaftLeaderLeaseTimeout is a flag for specifying leader lease timeout FlagRaftLeaderLeaseTimeout = FlagPrefixEvnode + "raft.leader_lease_timeout" - + // FlagRaftElectionTimeout is the flag for the raft election timeout. + FlagRaftElectionTimeout = FlagPrefixEvnode + "raft.election_timeout" + // FlagRaftSnapshotThreshold is the flag for the raft snapshot threshold. + FlagRaftSnapshotThreshold = FlagPrefixEvnode + "raft.snapshot_threshold" + // FlagRaftTrailingLogs is the flag for the number of trailing logs after a snapshot. + FlagRaftTrailingLogs = FlagPrefixEvnode + "raft.trailing_logs" // Pruning configuration flags FlagPruningMode = FlagPrefixEvnode + "pruning.pruning_mode" FlagPruningKeepRecent = FlagPrefixEvnode + "pruning.pruning_keep_recent" @@ -402,10 +407,13 @@ type RaftConfig struct { RaftDir string `mapstructure:"raft_dir" yaml:"raft_dir" comment:"Directory for Raft logs and snapshots"` Bootstrap bool `mapstructure:"bootstrap" yaml:"bootstrap" comment:"Bootstrap a new static Raft cluster during initial bring-up"` Peers string `mapstructure:"peers" yaml:"peers" comment:"Comma-separated list of peer Raft addresses (nodeID@host:port)"` - SnapCount uint64 `mapstructure:"snap_count" yaml:"snap_count" comment:"Number of log entries between snapshots"` + SnapCount uint64 `mapstructure:"snap_count" yaml:"snap_count" comment:"Number of snapshot files to retain on disk"` SendTimeout time.Duration `mapstructure:"send_timeout" yaml:"send_timeout" comment:"Max duration to wait for a message to be sent to a peer"` HeartbeatTimeout time.Duration `mapstructure:"heartbeat_timeout" yaml:"heartbeat_timeout" comment:"Time between leader heartbeats to followers"` LeaderLeaseTimeout time.Duration `mapstructure:"leader_lease_timeout" yaml:"leader_lease_timeout" comment:"Duration of the leader lease"` + ElectionTimeout time.Duration `mapstructure:"election_timeout" yaml:"election_timeout" comment:"Time a candidate waits for votes before restarting election; must be >= heartbeat_timeout"` + SnapshotThreshold uint64 `mapstructure:"snapshot_threshold" yaml:"snapshot_threshold" comment:"Number of outstanding log entries that trigger an automatic snapshot"` + TrailingLogs uint64 `mapstructure:"trailing_logs" yaml:"trailing_logs" comment:"Number of log entries to retain after a snapshot (controls rejoin catch-up cost)"` } func (c RaftConfig) Validate() error { @@ -435,6 +443,12 @@ func (c RaftConfig) Validate() error { multiErr = errors.Join(multiErr, fmt.Errorf("leader lease timeout must be positive")) } + if c.ElectionTimeout < 0 { + multiErr = errors.Join(multiErr, fmt.Errorf("election timeout (%v) must be >= 0", c.ElectionTimeout)) + } else if c.ElectionTimeout > 0 && c.ElectionTimeout < c.HeartbeatTimeout { + multiErr = errors.Join(multiErr, fmt.Errorf("election timeout (%v) must be >= heartbeat timeout (%v)", c.ElectionTimeout, c.HeartbeatTimeout)) + } + return multiErr } @@ -648,10 +662,13 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().String(FlagRaftDir, def.Raft.RaftDir, "directory for Raft logs and snapshots") cmd.Flags().Bool(FlagRaftBootstrap, def.Raft.Bootstrap, "bootstrap a new static Raft cluster during initial bring-up") cmd.Flags().String(FlagRaftPeers, def.Raft.Peers, "comma-separated list of peer Raft addresses (nodeID@host:port)") - cmd.Flags().Uint64(FlagRaftSnapCount, def.Raft.SnapCount, "number of log entries between snapshots") + cmd.Flags().Uint64(FlagRaftSnapCount, def.Raft.SnapCount, "number of snapshot files to retain on disk") cmd.Flags().Duration(FlagRaftSendTimeout, def.Raft.SendTimeout, "max duration to wait for a message to be sent to a peer") cmd.Flags().Duration(FlagRaftHeartbeatTimeout, def.Raft.HeartbeatTimeout, "time between leader heartbeats to followers") cmd.Flags().Duration(FlagRaftLeaderLeaseTimeout, def.Raft.LeaderLeaseTimeout, "duration of the leader lease") + cmd.Flags().Duration(FlagRaftElectionTimeout, def.Raft.ElectionTimeout, "time a candidate waits for votes before restarting election") + cmd.Flags().Uint64(FlagRaftSnapshotThreshold, def.Raft.SnapshotThreshold, "number of outstanding log entries that trigger an automatic snapshot") + cmd.Flags().Uint64(FlagRaftTrailingLogs, def.Raft.TrailingLogs, "number of log entries to retain after a snapshot") cmd.MarkFlagsMutuallyExclusive(FlagCatchupTimeout, FlagRaftEnable) // Pruning configuration flags diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index cf556803c2..2cb4792189 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -133,6 +133,9 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagRaftSendTimeout, DefaultConfig().Raft.SendTimeout) assertFlagValue(t, flags, FlagRaftHeartbeatTimeout, DefaultConfig().Raft.HeartbeatTimeout) assertFlagValue(t, flags, FlagRaftLeaderLeaseTimeout, DefaultConfig().Raft.LeaderLeaseTimeout) + assertFlagValue(t, flags, FlagRaftElectionTimeout, DefaultConfig().Raft.ElectionTimeout) + assertFlagValue(t, flags, FlagRaftSnapshotThreshold, DefaultConfig().Raft.SnapshotThreshold) + assertFlagValue(t, flags, FlagRaftTrailingLogs, DefaultConfig().Raft.TrailingLogs) // Pruning flags assertFlagValue(t, flags, FlagPruningMode, DefaultConfig().Pruning.Mode) @@ -140,7 +143,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagPruningInterval, DefaultConfig().Pruning.Interval.Duration) // Count the number of flags we're explicitly checking - expectedFlagCount := 78 // Update this number if you add more flag checks above + expectedFlagCount := 81 // Update this number if you add more flag checks above // Get the actual number of flags (both regular and persistent) actualFlagCount := 0 @@ -403,6 +406,7 @@ func TestRaftConfig_Validate(t *testing.T) { SendTimeout: 1 * time.Second, HeartbeatTimeout: 1 * time.Second, LeaderLeaseTimeout: 1 * time.Second, + ElectionTimeout: 2 * time.Second, } } @@ -440,6 +444,17 @@ func TestRaftConfig_Validate(t *testing.T) { mutate: func(c *RaftConfig) { c.LeaderLeaseTimeout = 0 }, expErr: "leader lease timeout must be positive", }, + "negative election timeout rejected": { + mutate: func(c *RaftConfig) { c.ElectionTimeout = -1 * time.Second }, + expErr: "election timeout (-1s) must be >= 0", + }, + "election timeout less than heartbeat timeout": { + mutate: func(c *RaftConfig) { c.ElectionTimeout = 500 * time.Millisecond }, + expErr: "election timeout (500ms) must be >= heartbeat timeout (1s)", + }, + "zero election timeout skips check": { + mutate: func(c *RaftConfig) { c.ElectionTimeout = 0 }, + }, "multiple invalid returns last": { mutate: func(c *RaftConfig) { c.NodeID = "" diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index 91fe68e3fc..2a8d2b4129 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -118,6 +118,10 @@ func DefaultConfig() Config { SendTimeout: 200 * time.Millisecond, HeartbeatTimeout: 350 * time.Millisecond, LeaderLeaseTimeout: 175 * time.Millisecond, + ElectionTimeout: 1000 * time.Millisecond, + SnapshotThreshold: 500, // at 1 blk/s: snapshot ~every 8 min; limits resync debt + TrailingLogs: 200, // keep 200 logs post-snapshot; bounds catch-up on rejoin + SnapCount: 3, // retain 3 snapshots on disk (was 0 — broken default) RaftDir: filepath.Join(DefaultRootDir, "raft"), }, Pruning: PruningConfig{ diff --git a/pkg/raft/election.go b/pkg/raft/election.go index 757f07ca27..d7941675b5 100644 --- a/pkg/raft/election.go +++ b/pkg/raft/election.go @@ -132,6 +132,24 @@ func (d *DynamicLeaderElection) Run(ctx context.Context) error { if err != nil { return err } + if diff < -1 { + // Store is more than 1 block behind raft state. + // RecoverFromRaft can only apply the single latest block + // from the raft snapshot; it cannot replay a larger gap. + // Starting leader operations in this state would stall block + // production until catch-up completes (potentially minutes or + // hours). Abdicate immediately so a better-synced peer can + // take leadership. + d.logger.Warn(). + Int("store_lag_blocks", -diff). + Uint64("raft_height", raftState.Height). + Msg("became leader but store is significantly behind raft state; abdicating to prevent stalled block production") + if tErr := d.node.leadershipTransfer(); tErr != nil { + d.logger.Error().Err(tErr).Msg("leadership transfer failed after store-lag abdication") + return fmt.Errorf("leadership transfer failed after store-lag abdication: %w", tErr) + } + continue + } if diff != 0 { d.logger.Info().Msg("became leader but not synced, attempting recovery") if err := runnable.Recover(ctx, raftState); err != nil { diff --git a/pkg/raft/election_test.go b/pkg/raft/election_test.go index b29cbda8d7..f025405494 100644 --- a/pkg/raft/election_test.go +++ b/pkg/raft/election_test.go @@ -221,6 +221,64 @@ func TestDynamicLeaderElectionRun(t *testing.T) { assert.ErrorIs(t, err, context.Canceled) }, }, + "abdicate when store significantly behind raft": { + setup: func(t *testing.T) (*DynamicLeaderElection, context.Context, context.CancelFunc) { + m := newMocksourceNode(t) + leaderCh := make(chan bool, 2) + m.EXPECT().leaderCh().Return((<-chan bool)(leaderCh)) + // GetState called in verifyState (follower start) and in leader sync check + m.EXPECT().GetState().Return(&RaftBlockState{Height: 10}) + m.EXPECT().GetState().Return(&RaftBlockState{Height: 10}) + m.EXPECT().Config().Return(testCfg()).Times(2) + m.EXPECT().waitForMsgsLanded(2 * time.Millisecond).Return(nil) + m.EXPECT().NodeID().Return("self") + m.EXPECT().leaderID().Return("self") + // Abdication must transfer leadership + m.EXPECT().leadershipTransfer().Return(nil) + + fStarted := make(chan struct{}) + follower := &testRunnable{ + startedCh: fStarted, + isSyncedFn: func(*RaftBlockState) (int, error) { return -5, nil }, + } + // Signal if leader ever starts — it must not. + leaderStarted := make(chan struct{}, 1) + leader := &testRunnable{runFn: func(ctx context.Context) error { + select { + case leaderStarted <- struct{}{}: + default: + } + <-ctx.Done() + return ctx.Err() + }} + + logger := zerolog.Nop() + d := &DynamicLeaderElection{logger: logger, node: m, + leaderFactory: func() (Runnable, error) { return leader, nil }, + followerFactory: func() (Runnable, error) { return follower, nil }, + } + ctx, cancel := context.WithCancel(t.Context()) + go func() { + leaderCh <- false + <-fStarted + leaderCh <- true + // Wait for abdication to complete (transfer + continue) then verify + // the leader was never started before cancelling. + select { + case <-leaderStarted: + t.Error("leader should not start when store is significantly behind raft") + case <-time.After(50 * time.Millisecond): + // leadership transferred without starting leader — expected + } + cancel() + }() + return d, ctx, cancel + }, + assertF: func(t *testing.T, err error) { + require.Error(t, err) + assert.ErrorIs(t, err, context.Canceled) + }, + }, "lost leadership during sync wait": { setup: func(t *testing.T) (*DynamicLeaderElection, context.Context, context.CancelFunc) { m := newMocksourceNode(t) diff --git a/pkg/raft/node.go b/pkg/raft/node.go index f156ae4785..329a4b2c9f 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -9,9 +9,11 @@ import ( "os" "path/filepath" "strings" + "sync" "sync/atomic" "time" + hclog "github.com/hashicorp/go-hclog" "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb" "github.com/rs/zerolog" @@ -35,28 +37,65 @@ type Config struct { Peers []string SnapCount uint64 SendTimeout time.Duration + ShutdownTimeout time.Duration HeartbeatTimeout time.Duration LeaderLeaseTimeout time.Duration + ElectionTimeout time.Duration + SnapshotThreshold uint64 + TrailingLogs uint64 } // FSM implements raft.FSM for block state type FSM struct { logger zerolog.Logger state *atomic.Pointer[RaftBlockState] + applyMu sync.RWMutex applyCh chan<- RaftApplyMsg } -// NewNode creates a new raft node +// buildRaftConfig converts a Node Config into a hashicorp/raft Config. +// logger is used to bridge hashicorp/raft's internal hclog output to zerolog. +func buildRaftConfig(cfg *Config, logger zerolog.Logger) *raft.Config { + raftConfig := raft.DefaultConfig() + raftConfig.LocalID = raft.ServerID(cfg.NodeID) + // Route raft's internal hclog messages through zerolog so all log output is + // consistent. hclog writes formatted text lines; zerolog receives them via + // its io.Writer implementation and emits them as structured JSON. + raftConfig.Logger = hclog.New(&hclog.LoggerOptions{ + Name: "raft", + Level: hclog.Info, + Output: logger.With().Str("component", "raft-hashicorp").Logger(), + DisableTime: true, // zerolog adds its own timestamp + }) + raftConfig.HeartbeatTimeout = cfg.HeartbeatTimeout + raftConfig.LeaderLeaseTimeout = cfg.LeaderLeaseTimeout + if cfg.ElectionTimeout > 0 { + raftConfig.ElectionTimeout = cfg.ElectionTimeout + } + if cfg.SnapshotThreshold > 0 { + raftConfig.SnapshotThreshold = cfg.SnapshotThreshold + } + if cfg.TrailingLogs > 0 { + raftConfig.TrailingLogs = cfg.TrailingLogs + } + return raftConfig +} + func NewNode(cfg *Config, logger zerolog.Logger) (*Node, error) { + // Clamp ShutdownTimeout so waitForMsgsLanded never receives a zero or + // negative interval (which would panic in time.NewTicker). Callers such as + // initRaftNode already set this, but direct callers in tests may not. + if cfg.ShutdownTimeout <= 0 { + cfgCopy := *cfg + cfgCopy.ShutdownTimeout = 5 * cfg.SendTimeout + cfg = &cfgCopy + } + if err := os.MkdirAll(cfg.RaftDir, 0755); err != nil { return nil, fmt.Errorf("create raft dir: %w", err) } - raftConfig := raft.DefaultConfig() - raftConfig.LocalID = raft.ServerID(cfg.NodeID) - raftConfig.LogLevel = "INFO" - raftConfig.HeartbeatTimeout = cfg.HeartbeatTimeout - raftConfig.LeaderLeaseTimeout = cfg.LeaderLeaseTimeout + raftConfig := buildRaftConfig(cfg, logger) startPointer := new(atomic.Pointer[RaftBlockState]) startPointer.Store(&RaftBlockState{}) @@ -146,18 +185,26 @@ func (n *Node) waitForMsgsLanded(timeout time.Duration) error { if n == nil { return nil } - timeoutTicker := time.NewTicker(timeout) - defer timeoutTicker.Stop() - ticker := time.NewTicker(min(n.config.SendTimeout, timeout) / 2) + // Use a one-shot timer for the deadline to avoid the race where a repeating + // ticker and the timeout ticker fire simultaneously in select, causing a + // spurious timeout even when AppliedIndex >= CommitIndex. + deadline := time.NewTimer(timeout) + defer deadline.Stop() + pollInterval := min(50*time.Millisecond, timeout/4) + ticker := time.NewTicker(pollInterval) defer ticker.Stop() for { select { case <-ticker.C: - if n.raft.AppliedIndex() >= n.raft.LastIndex() { + if n.raft.AppliedIndex() >= n.raft.CommitIndex() { + return nil + } + case <-deadline.C: + // Final check after deadline before giving up. + if n.raft.AppliedIndex() >= n.raft.CommitIndex() { return nil } - case <-timeoutTicker.C: return errors.New("max wait time reached") } } @@ -169,11 +216,13 @@ func (n *Node) Stop() error { } // Wait for FSM to apply all committed logs before shutdown to prevent state loss. // This ensures pending raft messages are processed before the node stops. - if err := n.waitForMsgsLanded(n.config.SendTimeout); err != nil { + if err := n.waitForMsgsLanded(n.config.ShutdownTimeout); err != nil { n.logger.Warn().Err(err).Msg("timed out waiting for raft messages to land during shutdown") } if n.IsLeader() { - _ = n.leadershipTransfer() + if err := n.leadershipTransfer(); err != nil { + n.logger.Warn().Err(err).Msg("leadership transfer on shutdown failed") + } } return n.raft.Shutdown().Error() } @@ -221,6 +270,25 @@ func (n *Node) leadershipTransfer() error { return n.raft.LeadershipTransfer().Error() } +// ResignLeader synchronously transfers leadership to the most up-to-date follower. +// It is a no-op when the node is nil or not currently the leader. +// Call this before cancelling the node context on graceful shutdown to minimise +// the window where a dying leader could still serve blocks. +// The transfer is abandoned and ctx.Err() is returned if ctx expires first. +func (n *Node) ResignLeader(ctx context.Context) error { + if n == nil || !n.IsLeader() { + return nil + } + done := make(chan error, 1) + go func() { done <- n.leadershipTransfer() }() + select { + case err := <-done: + return err + case <-ctx.Done(): + return ctx.Err() + } +} + func (n *Node) Config() Config { return *n.config } @@ -297,6 +365,8 @@ func (n *Node) Shutdown() error { // The channel must have sufficient buffer space since updates are published only once without blocking. // If the channel is full, state updates will be skipped to prevent blocking the raft cluster. func (n *Node) SetApplyCallback(ch chan<- RaftApplyMsg) { + n.fsm.applyMu.Lock() + defer n.fsm.applyMu.Unlock() n.fsm.applyCh = ch } @@ -313,15 +383,20 @@ func (f *FSM) Apply(log *raft.Log) any { f.state.Store(&state) f.logger.Debug(). Uint64("height", state.Height). + Uint64("raft_term", log.Term). + Uint64("raft_index", log.Index). Hex("hash", state.Hash). Uint64("timestamp", state.Timestamp). Int("header_bytes", len(state.Header)). Int("data_bytes", len(state.Data)). Msg("applied raft block state") - if f.applyCh != nil { + f.applyMu.RLock() + ch := f.applyCh + f.applyMu.RUnlock() + if ch != nil { select { - case f.applyCh <- RaftApplyMsg{Index: log.Index, State: &state}: + case ch <- RaftApplyMsg{Index: log.Index, Term: log.Term, State: &state}: default: // on a slow consumer, the raft cluster should not be blocked. Followers can sync from DA or other peers, too. f.logger.Warn().Msg("apply channel full, dropping message") diff --git a/pkg/raft/node_test.go b/pkg/raft/node_test.go index 67b5ea0392..870d3e22b7 100644 --- a/pkg/raft/node_test.go +++ b/pkg/raft/node_test.go @@ -4,12 +4,43 @@ import ( "context" "errors" "testing" + "time" "github.com/hashicorp/raft" + "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +func TestBuildRaftConfig_ElectionTimeout(t *testing.T) { + specs := map[string]struct { + cfg *Config + expectedElectionTimeout time.Duration + }{ + "custom election timeout is applied": { + cfg: &Config{ + NodeID: "node1", + ElectionTimeout: 500 * time.Millisecond, + }, + expectedElectionTimeout: 500 * time.Millisecond, + }, + "zero election timeout uses default": { + cfg: &Config{ + NodeID: "node1", + ElectionTimeout: 0, + }, + expectedElectionTimeout: raft.DefaultConfig().ElectionTimeout, + }, + } + + for name, spec := range specs { + t.Run(name, func(t *testing.T) { + rc := buildRaftConfig(spec.cfg, zerolog.Nop()) + assert.Equal(t, spec.expectedElectionTimeout, rc.ElectionTimeout) + }) + } +} + func TestSplitPeerAddr(t *testing.T) { specs := map[string]struct { in string @@ -114,3 +145,49 @@ func TestNodeStartNilNoop(t *testing.T) { var node *Node require.NoError(t, node.Start(context.Background())) } + +func TestNodeResignLeader_NilNoop(t *testing.T) { + var n *Node + assert.NoError(t, n.ResignLeader(context.Background())) +} + +func TestNodeResignLeader_NotLeaderNoop(t *testing.T) { + // Non-nil node with nil raft field — IsLeader() returns false, no transfer attempted. + n := &Node{} + assert.NoError(t, n.ResignLeader(context.Background())) +} + +func TestBuildRaftConfig_SnapshotConfigApplied(t *testing.T) { + specs := map[string]struct { + cfg *Config + expectedSnapshotThreshold uint64 + expectedTrailingLogs uint64 + }{ + "custom values applied": { + cfg: &Config{ + NodeID: "node1", + SnapshotThreshold: 1000, + TrailingLogs: 500, + }, + expectedSnapshotThreshold: 1000, + expectedTrailingLogs: 500, + }, + "zero values use defaults": { + cfg: &Config{ + NodeID: "node1", + SnapshotThreshold: 0, + TrailingLogs: 0, + }, + expectedSnapshotThreshold: raft.DefaultConfig().SnapshotThreshold, + expectedTrailingLogs: raft.DefaultConfig().TrailingLogs, + }, + } + + for name, spec := range specs { + t.Run(name, func(t *testing.T) { + rc := buildRaftConfig(spec.cfg, zerolog.Nop()) + assert.Equal(t, spec.expectedSnapshotThreshold, rc.SnapshotThreshold) + assert.Equal(t, spec.expectedTrailingLogs, rc.TrailingLogs) + }) + } +} diff --git a/pkg/raft/types.go b/pkg/raft/types.go index 968d9aa959..38d3a5130b 100644 --- a/pkg/raft/types.go +++ b/pkg/raft/types.go @@ -23,5 +23,6 @@ func assertValid(s *RaftBlockState, next *RaftBlockState) error { // RaftApplyMsg is sent when raft applies a log entry type RaftApplyMsg struct { Index uint64 + Term uint64 // raft term in which this entry was committed State *RaftBlockState }