diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java index 78cf9a10eee74..a3278d29fddfb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java @@ -957,11 +957,36 @@ public void processLoadSnapshot(final File snapshotDir) throws IOException { try (final FileInputStream fileInputStream = new FileInputStream(snapshotFile)) { pipeMetaKeeper.processLoadSnapshot(fileInputStream); } + normalizeRecoveredConsensusPipeStatus(); } finally { releaseWriteLock(); } } + private void normalizeRecoveredConsensusPipeStatus() { + final List restartedConsensusPipes = new ArrayList<>(); + + pipeMetaKeeper + .getPipeMetaList() + .forEach( + pipeMeta -> { + final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta(); + if (!PipeType.CONSENSUS.equals(pipeMeta.getStaticMeta().getPipeType()) + || !PipeStatus.STOPPED.equals(runtimeMeta.getStatus().get()) + || runtimeMeta.getIsStoppedByRuntimeException()) { + return; + } + + runtimeMeta.getStatus().set(PipeStatus.RUNNING); + restartedConsensusPipes.add(pipeMeta.getStaticMeta().getPipeName()); + }); + + if (!restartedConsensusPipes.isEmpty()) { + LOGGER.info( + "Recovered consensus pipes {} as RUNNING during snapshot load.", restartedConsensusPipes); + } + } + /////////////////////////////// hashCode & equals /////////////////////////////// @Override diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java index 094dcebe82fa9..22cd398827716 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java @@ -33,6 +33,7 @@ import org.junit.Before; import org.junit.Test; +import java.io.File; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -149,4 +150,60 @@ public void testGetConsensusPipeStatusMapExcludesSubscriptionPipes() { Assert.assertTrue(result.containsKey(consensusPipeName)); Assert.assertFalse(result.containsKey(subscriptionPipeName)); } + + @Test + public void testProcessLoadSnapshotRestartsOnlyHealthyStoppedConsensusPipes() throws Exception { + DataRegionId regionId = new DataRegionId(100); + String consensusPipeToRestart = new ConsensusPipeName(regionId, 1, 2).toString(); + String consensusPipeStoppedByException = new ConsensusPipeName(regionId, 2, 1).toString(); + String userPipeName = "userPipe"; + + createPipe(consensusPipeToRestart, PipeStatus.STOPPED); + createPipe(consensusPipeStoppedByException, PipeStatus.STOPPED); + createPipe(userPipeName, PipeStatus.STOPPED); + + pipeTaskInfo + .getPipeMetaByPipeName(consensusPipeStoppedByException) + .getRuntimeMeta() + .setIsStoppedByRuntimeException(true); + + final File snapshotDir = + java.nio.file.Files.createTempDirectory("pipe-task-info-consensus-test").toFile(); + try { + Assert.assertTrue(pipeTaskInfo.processTakeSnapshot(snapshotDir)); + + PipeTaskInfo recoveredPipeTaskInfo = new PipeTaskInfo(); + recoveredPipeTaskInfo.processLoadSnapshot(snapshotDir); + + Assert.assertEquals( + PipeStatus.RUNNING, + recoveredPipeTaskInfo + .getPipeMetaByPipeName(consensusPipeToRestart) + .getRuntimeMeta() + .getStatus() + .get()); + Assert.assertEquals( + PipeStatus.STOPPED, + recoveredPipeTaskInfo + .getPipeMetaByPipeName(consensusPipeStoppedByException) + .getRuntimeMeta() + .getStatus() + .get()); + Assert.assertTrue( + recoveredPipeTaskInfo + .getPipeMetaByPipeName(consensusPipeStoppedByException) + .getRuntimeMeta() + .getIsStoppedByRuntimeException()); + Assert.assertEquals( + PipeStatus.STOPPED, + recoveredPipeTaskInfo + .getPipeMetaByPipeName(userPipeName) + .getRuntimeMeta() + .getStatus() + .get()); + } finally { + new File(snapshotDir, "pipe_task_info.bin").delete(); + snapshotDir.delete(); + } + } }