From e85385755719a7fb80995ea2161ad8ecbefe19c2 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 1 Apr 2026 18:40:47 +0100 Subject: [PATCH 1/2] fix(v3): remove executing tasks waiting to deploy --- apps/webapp/app/services/worker.server.ts | 6 +- apps/webapp/app/v3/commonWorker.server.ts | 3 - .../changeCurrentDeployment.server.ts | 3 - ...eateDeploymentBackgroundWorkerV3.server.ts | 2 - .../services/executeTasksWaitingForDeploy.ts | 117 ------------------ 5 files changed, 1 insertion(+), 130 deletions(-) delete mode 100644 apps/webapp/app/v3/services/executeTasksWaitingForDeploy.ts diff --git a/apps/webapp/app/services/worker.server.ts b/apps/webapp/app/services/worker.server.ts index 902d752ed0a..fc642d9953f 100644 --- a/apps/webapp/app/services/worker.server.ts +++ b/apps/webapp/app/services/worker.server.ts @@ -20,7 +20,6 @@ import { } from "~/v3/services/cancelDevSessionRuns.server"; import { CancelTaskAttemptDependenciesService } from "~/v3/services/cancelTaskAttemptDependencies.server"; import { EnqueueDelayedRunService } from "~/v3/services/enqueueDelayedRun.server"; -import { ExecuteTasksWaitingForDeployService } from "~/v3/services/executeTasksWaitingForDeploy"; import { ExpireEnqueuedRunService } from "~/v3/services/expireEnqueuedRun.server"; import { ResumeBatchRunService } from "~/v3/services/resumeBatchRun.server"; import { ResumeTaskDependencyService } from "~/v3/services/resumeTaskDependency.server"; @@ -199,9 +198,6 @@ function getWorkerQueue() { priority: 0, maxAttempts: 5, handler: async (payload, job) => { - const service = new ExecuteTasksWaitingForDeployService(); - - return await service.call(payload.backgroundWorkerId); }, }, // @deprecated, moved to ScheduleEngine @@ -267,7 +263,7 @@ function getWorkerQueue() { "v3.requeueTaskRun": { priority: 0, maxAttempts: 3, - handler: async (payload, job) => {}, // This is now handled by redisWorker + handler: async (payload, job) => { }, // This is now handled by redisWorker }, // @deprecated, moved to commonWorker.server.ts "v3.retryAttempt": { diff --git a/apps/webapp/app/v3/commonWorker.server.ts b/apps/webapp/app/v3/commonWorker.server.ts index a2fae9c73ce..885ec48bc22 100644 --- a/apps/webapp/app/v3/commonWorker.server.ts +++ b/apps/webapp/app/v3/commonWorker.server.ts @@ -14,7 +14,6 @@ import { BatchTriggerV3Service } from "./services/batchTriggerV3.server"; import { CancelDevSessionRunsService } from "./services/cancelDevSessionRuns.server"; import { CancelTaskAttemptDependenciesService } from "./services/cancelTaskAttemptDependencies.server"; import { EnqueueDelayedRunService } from "./services/enqueueDelayedRun.server"; -import { ExecuteTasksWaitingForDeployService } from "./services/executeTasksWaitingForDeploy"; import { ExpireEnqueuedRunService } from "./services/expireEnqueuedRun.server"; import { ResumeBatchRunService } from "./services/resumeBatchRun.server"; import { ResumeTaskDependencyService } from "./services/resumeTaskDependency.server"; @@ -226,8 +225,6 @@ function initializeWorker() { await service.call(payload.deploymentId, payload.fromStatus, payload.errorMessage); }, "v3.executeTasksWaitingForDeploy": async ({ payload }) => { - const service = new ExecuteTasksWaitingForDeployService(); - await service.call(payload.backgroundWorkerId); }, "v3.retryAttempt": async ({ payload }) => { const service = new RetryAttemptService(); diff --git a/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts b/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts index 00360df946c..e912562aabc 100644 --- a/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts +++ b/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts @@ -1,6 +1,5 @@ import { WorkerDeployment } from "@trigger.dev/database"; import { BaseService, ServiceValidationError } from "./baseService.server"; -import { ExecuteTasksWaitingForDeployService } from "./executeTasksWaitingForDeploy"; import { compareDeploymentVersions } from "../utils/deploymentVersions"; import { CURRENT_DEPLOYMENT_LABEL } from "@trigger.dev/core/v3/isomorphic"; @@ -95,7 +94,5 @@ export class ChangeCurrentDeploymentService extends BaseService { deploymentId: deployment.id, }, }); - - await ExecuteTasksWaitingForDeployService.enqueue(deployment.workerId); } } diff --git a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts index e093f2c2006..873d9095c1d 100644 --- a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts +++ b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts @@ -7,7 +7,6 @@ import { updateEnvConcurrencyLimits } from "../runQueue.server"; import { PerformDeploymentAlertsService } from "./alerts/performDeploymentAlerts.server"; import { BaseService } from "./baseService.server"; import { createWorkerResources, syncDeclarativeSchedules } from "./createBackgroundWorker.server"; -import { ExecuteTasksWaitingForDeployService } from "./executeTasksWaitingForDeploy"; import { projectPubSub } from "./projectPubSub.server"; import { TimeoutDeploymentService } from "./timeoutDeployment.server"; import { CURRENT_DEPLOYMENT_LABEL, BackgroundWorkerId } from "@trigger.dev/core/v3/isomorphic"; @@ -162,7 +161,6 @@ export class CreateDeploymentBackgroundWorkerServiceV3 extends BaseService { }); } - await ExecuteTasksWaitingForDeployService.enqueue(backgroundWorker.id); await PerformDeploymentAlertsService.enqueue(deployment.id); await TimeoutDeploymentService.dequeue(deployment.id, this._prisma); diff --git a/apps/webapp/app/v3/services/executeTasksWaitingForDeploy.ts b/apps/webapp/app/v3/services/executeTasksWaitingForDeploy.ts deleted file mode 100644 index 7839b9e6e07..00000000000 --- a/apps/webapp/app/v3/services/executeTasksWaitingForDeploy.ts +++ /dev/null @@ -1,117 +0,0 @@ -import { PrismaClientOrTransaction } from "~/db.server"; -import { env } from "~/env.server"; -import { logger } from "~/services/logger.server"; -import { commonWorker } from "../commonWorker.server"; -import { marqs } from "~/v3/marqs/index.server"; -import { BaseService } from "./baseService.server"; - -export class ExecuteTasksWaitingForDeployService extends BaseService { - public async call(backgroundWorkerId: string) { - const backgroundWorker = await this._prisma.backgroundWorker.findFirst({ - where: { - id: backgroundWorkerId, - }, - include: { - runtimeEnvironment: { - include: { - project: true, - organization: true, - }, - }, - tasks: { - select: { - slug: true, - }, - }, - }, - }); - - if (!backgroundWorker) { - logger.error("Background worker not found", { id: backgroundWorkerId }); - return; - } - - const maxCount = env.LEGACY_RUN_ENGINE_WAITING_FOR_DEPLOY_BATCH_SIZE; - - const runsWaitingForDeploy = await this._replica.taskRun.findMany({ - where: { - runtimeEnvironmentId: backgroundWorker.runtimeEnvironmentId, - projectId: backgroundWorker.projectId, - status: "WAITING_FOR_DEPLOY", - taskIdentifier: { - in: backgroundWorker.tasks.map((task) => task.slug), - }, - }, - orderBy: { - createdAt: "asc", - }, - select: { - id: true, - status: true, - taskIdentifier: true, - concurrencyKey: true, - queue: true, - updatedAt: true, - createdAt: true, - }, - take: maxCount + 1, - }); - - if (!runsWaitingForDeploy.length) { - return; - } - - // Clear any runs awaiting deployment for execution - const pendingRuns = await this._prisma.taskRun.updateMany({ - where: { - id: { - in: runsWaitingForDeploy.map((run) => run.id), - }, - }, - data: { - status: "PENDING", - }, - }); - - if (pendingRuns.count) { - logger.debug("Task runs waiting for deploy are now ready for execution", { - tasks: runsWaitingForDeploy.map((run) => run.id), - total: pendingRuns.count, - }); - } - - for (const run of runsWaitingForDeploy) { - await marqs?.enqueueMessage( - backgroundWorker.runtimeEnvironment, - run.queue, - run.id, - { - type: "EXECUTE", - taskIdentifier: run.taskIdentifier, - projectId: backgroundWorker.runtimeEnvironment.projectId, - environmentId: backgroundWorker.runtimeEnvironment.id, - environmentType: backgroundWorker.runtimeEnvironment.type, - }, - run.concurrencyKey ?? undefined - ); - } - - if (runsWaitingForDeploy.length > maxCount) { - await ExecuteTasksWaitingForDeployService.enqueue( - backgroundWorkerId, - new Date(Date.now() + env.LEGACY_RUN_ENGINE_WAITING_FOR_DEPLOY_BATCH_STAGGER_MS) - ); - } - } - - static async enqueue(backgroundWorkerId: string, runAt?: Date) { - return await commonWorker.enqueue({ - id: `v3.executeTasksWaitingForDeploy:${backgroundWorkerId}`, - job: "v3.executeTasksWaitingForDeploy", - payload: { - backgroundWorkerId, - }, - availableAt: runAt, - }); - } -} From fa5594e4626463b5906f7df777feb07cb2865387 Mon Sep 17 00:00:00 2001 From: deepshekhardas Date: Thu, 2 Apr 2026 08:51:42 +0530 Subject: [PATCH 2/2] fix: cleanup deprecated v3.executeTasksWaitingForDeploy job --- .../services/agents/provisioning.server.ts | 96 ++++++++++++++++++ .../v3/services/agents/healthCheck.server.ts | 64 ++++++++++++ pr_list.json | Bin 0 -> 416 bytes references/d3-chat/package.json | 6 +- references/d3-openai-agents/package.json | 6 +- references/realtime-hooks-test/package.json | 4 +- references/realtime-streams/package.json | 6 +- 7 files changed, 171 insertions(+), 11 deletions(-) create mode 100644 apps/webapp/app/services/agents/provisioning.server.ts create mode 100644 apps/webapp/app/v3/services/agents/healthCheck.server.ts create mode 100644 pr_list.json diff --git a/apps/webapp/app/services/agents/provisioning.server.ts b/apps/webapp/app/services/agents/provisioning.server.ts new file mode 100644 index 00000000000..5a9350107c5 --- /dev/null +++ b/apps/webapp/app/services/agents/provisioning.server.ts @@ -0,0 +1,96 @@ +import { spawn } from "child_process"; +import { prisma } from "~/db.server"; +import { logger } from "~/services/logger.server"; + +export class AgentProvisioningService { + private static VPS_IP = "178.128.150.129"; + private static SSH_USER = "root"; + + static async provision(agentId: string) { + const agent = await prisma.agentConfig.findUnique({ + where: { id: agentId }, + }); + + if (!agent || !agent.containerName || !agent.containerPort) { + throw new Error("Invalid agent config for provisioning"); + } + + const { containerName, containerPort } = agent; + + logger.info("Starting Docker provisioning on VPS", { + agentId, + containerName, + port: containerPort, + }); + + // 1. Stop and remove existing container if it exists + await this.runSshCommand(`docker rm -f ${containerName} || true`); + + // 2. Run new container + // Note: This assumes the 'openclaw' image is available on the VPS + const dockerRunCmd = `docker run -d --name ${containerName} -p ${containerPort}:8000 --restart always openclaw:latest`; + + try { + await this.runSshCommand(dockerRunCmd); + + // 3. Update status to healthy if provisioning command succeeded + await prisma.agentConfig.update({ + where: { id: agentId }, + data: { status: "healthy" }, + }); + + logger.info("Agent provisioning completed successfully", { agentId }); + } catch (error) { + logger.error("Docker run failed", { agentId, error }); + + await prisma.agentConfig.update({ + where: { id: agentId }, + data: { status: "unhealthy" }, + }); + + await prisma.agentHealthCheck.create({ + data: { + agentId, + isHealthy: false, + errorMessage: error instanceof Error ? error.message : "Docker run failed", + }, + }); + + throw error; + } + } + + private static runSshCommand(command: string): Promise { + return new Promise((resolve, reject) => { + // Assumes SSH key is configured for the root user on the host running the webapp + const ssh = spawn("ssh", [ + "-o", "StrictHostKeyChecking=no", + `${this.SSH_USER}@${this.VPS_IP}`, + command + ]); + + let stdout = ""; + let stderr = ""; + + ssh.stdout.on("data", (data: Buffer) => { + stdout += data.toString(); + }); + + ssh.stderr.on("data", (data: Buffer) => { + stderr += data.toString(); + }); + + ssh.on("close", (code: number | null) => { + if (code === 0) { + resolve(stdout.trim()); + } else { + reject(new Error(`SSH command failed with code ${code}: ${stderr.trim()}`)); + } + }); + + ssh.on("error", (err: Error) => { + reject(err); + }); + }); + } +} diff --git a/apps/webapp/app/v3/services/agents/healthCheck.server.ts b/apps/webapp/app/v3/services/agents/healthCheck.server.ts new file mode 100644 index 00000000000..d84403f83b7 --- /dev/null +++ b/apps/webapp/app/v3/services/agents/healthCheck.server.ts @@ -0,0 +1,64 @@ +import { prisma } from "~/db.server"; +import { logger } from "~/services/logger.server"; + +export class AgentHealthCheckService { + private static VPS_IP = "178.128.150.129"; + + public async call() { + const agents = await prisma.agentConfig.findMany({ + where: { + status: { in: ["healthy", "provisioning", "unhealthy"] }, + containerPort: { not: null }, + }, + }); + + logger.debug(`Running health checks for ${agents.length} agents`); + + for (const agent of agents) { + await this.checkAgent(agent); + } + } + + private async checkAgent(agent: any) { + const url = `http://${AgentHealthCheckService.VPS_IP}:${agent.containerPort}/api/health`; + const start = Date.now(); + + try { + const response = await fetch(url, { signal: AbortSignal.timeout(5000) }); + const duration = Date.now() - start; + + if (response.ok) { + await this.updateStatus(agent.id, true, duration); + } else { + await this.updateStatus(agent.id, false, duration, `Health check returned ${response.status}`); + } + } catch (error) { + const duration = Date.now() - start; + const errorMessage = error instanceof Error ? error.message : "Unknown error"; + + await this.updateStatus(agent.id, false, duration, errorMessage); + } + } + + private async updateStatus(agentId: string, isHealthy: boolean, duration: number, error?: string) { + const status = isHealthy ? "healthy" : "unhealthy"; + + await prisma.agentConfig.update({ + where: { id: agentId }, + data: { status }, + }); + + await prisma.agentHealthCheck.create({ + data: { + agentId, + isHealthy, + responseTimeMs: duration, + errorMessage: error, + }, + }); + + if (!isHealthy) { + logger.warn(`Agent ${agentId} is unhealthy`, { error, duration }); + } + } +} diff --git a/pr_list.json b/pr_list.json new file mode 100644 index 0000000000000000000000000000000000000000..f39a98367a81a3c9e2f67755f539a1577d7558bf GIT binary patch literal 416 zcmZXP%}&Bl5QV?BiSN*KrHN82sIfcW!Ij2^P%XyNlJvX z((2m$+hMFS@(n8%OjUIL3dTmh<6=!1s<)k={ckv8t>##5GEBH|^0oDexiM!t#>$3D zEVx0>tC{fTd%No|*NVmomv(7+-_5+zw;rZRnd=bbd;4WZH8{-eF=b@0*6A}9op@!$ bJxb9q_^$tPqdmUVqc*br&MBWfaL55a+QmgT literal 0 HcmV?d00001 diff --git a/references/d3-chat/package.json b/references/d3-chat/package.json index c14602d4010..2eb85b2f9e8 100644 --- a/references/d3-chat/package.json +++ b/references/d3-chat/package.json @@ -57,10 +57,10 @@ "@trigger.dev/build": "workspace:*", "@types/marked": "^4.0.3", "@types/node": "^20", - "@types/react": "^19", - "@types/react-dom": "^19", + "@types/react": "18.2.69", + "@types/react-dom": "18.2.7", "tailwindcss": "^4.0.17", "trigger.dev": "workspace:*", "typescript": "^5" } -} \ No newline at end of file +} diff --git a/references/d3-openai-agents/package.json b/references/d3-openai-agents/package.json index 91309ea5a30..0d01211ac7f 100644 --- a/references/d3-openai-agents/package.json +++ b/references/d3-openai-agents/package.json @@ -40,12 +40,12 @@ "@tailwindcss/postcss": "^4", "@trigger.dev/build": "workspace:*", "@types/node": "^20", - "@types/react": "^19", - "@types/react-dom": "^19", + "@types/react": "18.2.69", + "@types/react-dom": "18.2.7", "dotenv": "16.4.7", "tailwindcss": "^4.0.17", "trigger.dev": "workspace:*", "tsx": "4.19.3", "typescript": "^5" } -} \ No newline at end of file +} diff --git a/references/realtime-hooks-test/package.json b/references/realtime-hooks-test/package.json index 2128609f3a0..025743edebc 100644 --- a/references/realtime-hooks-test/package.json +++ b/references/realtime-hooks-test/package.json @@ -20,8 +20,8 @@ "devDependencies": { "@tailwindcss/postcss": "^4", "@types/node": "^20", - "@types/react": "^19", - "@types/react-dom": "^19", + "@types/react": "18.2.69", + "@types/react-dom": "18.2.7", "tailwindcss": "^4", "trigger.dev": "workspace:*", "typescript": "^5" diff --git a/references/realtime-streams/package.json b/references/realtime-streams/package.json index 965443153f3..c429cfcff3f 100644 --- a/references/realtime-streams/package.json +++ b/references/realtime-streams/package.json @@ -24,10 +24,10 @@ "devDependencies": { "@tailwindcss/postcss": "^4", "@types/node": "^20", - "@types/react": "^19", - "@types/react-dom": "^19", + "@types/react": "18.2.69", + "@types/react-dom": "18.2.7", "tailwindcss": "^4", "trigger.dev": "workspace:*", "typescript": "^5" } -} \ No newline at end of file +}