Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -957,11 +957,36 @@ public void processLoadSnapshot(final File snapshotDir) throws IOException {
try (final FileInputStream fileInputStream = new FileInputStream(snapshotFile)) {
pipeMetaKeeper.processLoadSnapshot(fileInputStream);
}
normalizeRecoveredConsensusPipeStatus();
} finally {
releaseWriteLock();
}
}

private void normalizeRecoveredConsensusPipeStatus() {
final List<String> restartedConsensusPipes = new ArrayList<>();

pipeMetaKeeper
.getPipeMetaList()
.forEach(
pipeMeta -> {
final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
if (!PipeType.CONSENSUS.equals(pipeMeta.getStaticMeta().getPipeType())
|| !PipeStatus.STOPPED.equals(runtimeMeta.getStatus().get())
|| runtimeMeta.getIsStoppedByRuntimeException()) {
return;
}

runtimeMeta.getStatus().set(PipeStatus.RUNNING);
restartedConsensusPipes.add(pipeMeta.getStaticMeta().getPipeName());
});

if (!restartedConsensusPipes.isEmpty()) {
LOGGER.info(
"Recovered consensus pipes {} as RUNNING during snapshot load.", restartedConsensusPipes);
}
}

Comment on lines +985 to +989
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The INFO log prints the full list of restarted consensus pipe names, which can be very large (consensus pipes scale roughly with N*(N-1) per region). This may bloat logs during snapshot recovery; consider logging only the count (and optionally a small sample) at INFO, and the full list at DEBUG.

Suggested change
LOGGER.info(
"Recovered consensus pipes {} as RUNNING during snapshot load.", restartedConsensusPipes);
}
}
final List<String> restartedConsensusPipeSample =
samplePipeNamesForInfoLog(restartedConsensusPipes, 10);
LOGGER.info(
"Recovered {} consensus pipes as RUNNING during snapshot load. Sample: {}{}",
restartedConsensusPipes.size(),
restartedConsensusPipeSample,
restartedConsensusPipes.size() > restartedConsensusPipeSample.size() ? " ..." : "");
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Recovered consensus pipes {} as RUNNING during snapshot load.",
restartedConsensusPipes);
}
}
}
private List<String> samplePipeNamesForInfoLog(
final List<String> pipeNames, final int maxSampleSize) {
return new ArrayList<>(pipeNames.subList(0, Math.min(pipeNames.size(), maxSampleSize)));
}

Copilot uses AI. Check for mistakes.
/////////////////////////////// hashCode & equals ///////////////////////////////

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.junit.Before;
import org.junit.Test;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -149,4 +150,60 @@ public void testGetConsensusPipeStatusMapExcludesSubscriptionPipes() {
Assert.assertTrue(result.containsKey(consensusPipeName));
Assert.assertFalse(result.containsKey(subscriptionPipeName));
}

@Test
public void testProcessLoadSnapshotRestartsOnlyHealthyStoppedConsensusPipes() throws Exception {
DataRegionId regionId = new DataRegionId(100);
String consensusPipeToRestart = new ConsensusPipeName(regionId, 1, 2).toString();
String consensusPipeStoppedByException = new ConsensusPipeName(regionId, 2, 1).toString();
String userPipeName = "userPipe";

createPipe(consensusPipeToRestart, PipeStatus.STOPPED);
createPipe(consensusPipeStoppedByException, PipeStatus.STOPPED);
createPipe(userPipeName, PipeStatus.STOPPED);

pipeTaskInfo
.getPipeMetaByPipeName(consensusPipeStoppedByException)
.getRuntimeMeta()
.setIsStoppedByRuntimeException(true);

final File snapshotDir =
java.nio.file.Files.createTempDirectory("pipe-task-info-consensus-test").toFile();
try {
Assert.assertTrue(pipeTaskInfo.processTakeSnapshot(snapshotDir));

PipeTaskInfo recoveredPipeTaskInfo = new PipeTaskInfo();
recoveredPipeTaskInfo.processLoadSnapshot(snapshotDir);

Assert.assertEquals(
PipeStatus.RUNNING,
recoveredPipeTaskInfo
.getPipeMetaByPipeName(consensusPipeToRestart)
.getRuntimeMeta()
.getStatus()
.get());
Assert.assertEquals(
PipeStatus.STOPPED,
recoveredPipeTaskInfo
.getPipeMetaByPipeName(consensusPipeStoppedByException)
.getRuntimeMeta()
.getStatus()
.get());
Assert.assertTrue(
recoveredPipeTaskInfo
.getPipeMetaByPipeName(consensusPipeStoppedByException)
.getRuntimeMeta()
.getIsStoppedByRuntimeException());
Assert.assertEquals(
PipeStatus.STOPPED,
recoveredPipeTaskInfo
.getPipeMetaByPipeName(userPipeName)
.getRuntimeMeta()
.getStatus()
.get());
} finally {
new File(snapshotDir, "pipe_task_info.bin").delete();
snapshotDir.delete();
}
Comment on lines +204 to +207
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test cleanup hard-codes the snapshot file name ("pipe_task_info.bin") and ignores delete() results. Consider deleting the temp directory recursively (and/or deleting all files under snapshotDir) so cleanup doesn't depend on PipeTaskInfo's private snapshot filename and doesn’t silently leak temp dirs if deletion fails.

Copilot uses AI. Check for mistakes.
}
}
Loading