From 411af7493e47d3f2eb80f3d80ad4fb85cad4b2b2 Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Tue, 7 Apr 2026 17:12:08 +0800 Subject: [PATCH 1/2] Fix recovered consensus pipes staying stopped after snapshot load --- .../persistence/pipe/PipeTaskInfo.java | 26 +++++++++ .../pipe/PipeTaskInfoConsensusPipeTest.java | 53 +++++++++++++++++++ 2 files changed, 79 insertions(+) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java index 78cf9a10eee74..74331ba4203ed 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java @@ -957,11 +957,37 @@ public void processLoadSnapshot(final File snapshotDir) throws IOException { try (final FileInputStream fileInputStream = new FileInputStream(snapshotFile)) { pipeMetaKeeper.processLoadSnapshot(fileInputStream); } + normalizeRecoveredConsensusPipeStatus(); } finally { releaseWriteLock(); } } + private void normalizeRecoveredConsensusPipeStatus() { + final List restartedConsensusPipes = new ArrayList<>(); + + pipeMetaKeeper + .getPipeMetaList() + .forEach( + pipeMeta -> { + final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta(); + if (!PipeType.CONSENSUS.equals(pipeMeta.getStaticMeta().getPipeType()) + || !PipeStatus.STOPPED.equals(runtimeMeta.getStatus().get()) + || runtimeMeta.getIsStoppedByRuntimeException()) { + return; + } + + runtimeMeta.getStatus().set(PipeStatus.RUNNING); + restartedConsensusPipes.add(pipeMeta.getStaticMeta().getPipeName()); + }); + + if (!restartedConsensusPipes.isEmpty()) { + LOGGER.info( + "Recovered consensus pipes {} as RUNNING during snapshot load.", + restartedConsensusPipes); + } + } + /////////////////////////////// hashCode & equals /////////////////////////////// @Override diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java index 094dcebe82fa9..0fcc777e3e699 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java @@ -33,6 +33,7 @@ import org.junit.Before; import org.junit.Test; +import java.io.File; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -149,4 +150,56 @@ public void testGetConsensusPipeStatusMapExcludesSubscriptionPipes() { Assert.assertTrue(result.containsKey(consensusPipeName)); Assert.assertFalse(result.containsKey(subscriptionPipeName)); } + + @Test + public void testProcessLoadSnapshotRestartsOnlyHealthyStoppedConsensusPipes() throws Exception { + DataRegionId regionId = new DataRegionId(100); + String consensusPipeToRestart = new ConsensusPipeName(regionId, 1, 2).toString(); + String consensusPipeStoppedByException = new ConsensusPipeName(regionId, 2, 1).toString(); + String userPipeName = "userPipe"; + + createPipe(consensusPipeToRestart, PipeStatus.STOPPED); + createPipe(consensusPipeStoppedByException, PipeStatus.STOPPED); + createPipe(userPipeName, PipeStatus.STOPPED); + + pipeTaskInfo + .getPipeMetaByPipeName(consensusPipeStoppedByException) + .getRuntimeMeta() + .setIsStoppedByRuntimeException(true); + + final File snapshotDir = + java.nio.file.Files.createTempDirectory("pipe-task-info-consensus-test").toFile(); + try { + Assert.assertTrue(pipeTaskInfo.processTakeSnapshot(snapshotDir)); + + PipeTaskInfo recoveredPipeTaskInfo = new PipeTaskInfo(); + recoveredPipeTaskInfo.processLoadSnapshot(snapshotDir); + + Assert.assertEquals( + PipeStatus.RUNNING, + recoveredPipeTaskInfo + .getPipeMetaByPipeName(consensusPipeToRestart) + .getRuntimeMeta() + .getStatus() + .get()); + Assert.assertEquals( + PipeStatus.STOPPED, + recoveredPipeTaskInfo + .getPipeMetaByPipeName(consensusPipeStoppedByException) + .getRuntimeMeta() + .getStatus() + .get()); + Assert.assertTrue( + recoveredPipeTaskInfo + .getPipeMetaByPipeName(consensusPipeStoppedByException) + .getRuntimeMeta() + .getIsStoppedByRuntimeException()); + Assert.assertEquals( + PipeStatus.STOPPED, + recoveredPipeTaskInfo.getPipeMetaByPipeName(userPipeName).getRuntimeMeta().getStatus().get()); + } finally { + new File(snapshotDir, "pipe_task_info.bin").delete(); + snapshotDir.delete(); + } + } } From 87bb93ebc969e37a7b1dd47c086f031d6a645281 Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Tue, 7 Apr 2026 17:26:04 +0800 Subject: [PATCH 2/2] Apply spotless formatting for consensus pipe recovery fix --- .../iotdb/confignode/persistence/pipe/PipeTaskInfo.java | 3 +-- .../persistence/pipe/PipeTaskInfoConsensusPipeTest.java | 6 +++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java index 74331ba4203ed..a3278d29fddfb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java @@ -983,8 +983,7 @@ private void normalizeRecoveredConsensusPipeStatus() { if (!restartedConsensusPipes.isEmpty()) { LOGGER.info( - "Recovered consensus pipes {} as RUNNING during snapshot load.", - restartedConsensusPipes); + "Recovered consensus pipes {} as RUNNING during snapshot load.", restartedConsensusPipes); } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java index 0fcc777e3e699..22cd398827716 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java @@ -196,7 +196,11 @@ public void testProcessLoadSnapshotRestartsOnlyHealthyStoppedConsensusPipes() th .getIsStoppedByRuntimeException()); Assert.assertEquals( PipeStatus.STOPPED, - recoveredPipeTaskInfo.getPipeMetaByPipeName(userPipeName).getRuntimeMeta().getStatus().get()); + recoveredPipeTaskInfo + .getPipeMetaByPipeName(userPipeName) + .getRuntimeMeta() + .getStatus() + .get()); } finally { new File(snapshotDir, "pipe_task_info.bin").delete(); snapshotDir.delete();