feat/pcc-sync (CM-1086, CM-1087, CM-1088, CM-1089)#4006
feat/pcc-sync (CM-1086, CM-1087, CM-1088, CM-1089)#4006
Conversation
Signed-off-by: Uroš Marolt <uros@marolt.me>
There was a problem hiding this comment.
PR titles must follow Conventional Commits. Love from, Your reviewers ❤️.
There was a problem hiding this comment.
Pull request overview
Introduces a new Temporal worker to export PCC project hierarchy data from Snowflake to S3 (Parquet) and sync it into CDP (segments + insightsProjects), while refactoring shared Snowflake/S3/metadata components into @crowd/snowflake and updating the existing snowflake_connectors app to consume them.
Changes:
- Add
pcc_sync_workerapp with Temporal schedules/workflows, export/cleanup activities, Parquet parsing, and a DB-sync consumer. - Move/centralize Snowflake export job metadata + S3/Parquet consumption logic into
services/libs/snowflakeand update snowflake_connectors to use it. - Add DB migration for PCC sync support (segments.maturity +
pcc_projects_sync_errorstable + dedup index), plus worker Docker/compose setup.
Reviewed changes
Copilot reviewed 33 out of 34 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| services/libs/snowflake/src/snowflakeExporter.ts | Fix internal import to avoid self-package import/cycles. |
| services/libs/snowflake/src/s3Service.ts | New S3 download/delete + Parquet row iteration utility. |
| services/libs/snowflake/src/metadataStore.ts | Add platform filtering + named params for export job bookkeeping. |
| services/libs/snowflake/src/index.ts | Export new Snowflake lib surface (metadata store, S3 service, exporter). |
| services/libs/snowflake/package.json | Add S3 + Parquet deps and DB dependency for the library. |
| services/apps/snowflake_connectors/src/consumer/transformerConsumer.ts | Use @crowd/snowflake MetadataStore/S3Service; add enabled-platform filtering. |
| services/apps/snowflake_connectors/src/activities/exportActivity.ts | Switch imports to @crowd/snowflake. |
| services/apps/snowflake_connectors/src/activities/cleanupActivity.ts | Use shared MetadataStore/S3Service and pass enabled platforms to cleanup. |
| services/apps/snowflake_connectors/package.json | Remove direct S3/Parquet deps (now come from @crowd/snowflake). |
| services/apps/pcc_sync_worker/tsconfig.json | New TS config for PCC worker app. |
| services/apps/pcc_sync_worker/src/workflows/index.ts | Workflow exports. |
| services/apps/pcc_sync_worker/src/workflows/exportWorkflow.ts | Temporal workflow to run PCC export activity. |
| services/apps/pcc_sync_worker/src/workflows/cleanupWorkflow.ts | Temporal workflow to run PCC cleanup activity. |
| services/apps/pcc_sync_worker/src/scripts/triggerExport.ts | Manual script to start export workflow. |
| services/apps/pcc_sync_worker/src/scripts/triggerCleanup.ts | Manual script to start cleanup workflow. |
| services/apps/pcc_sync_worker/src/schedules/pccS3Export.ts | Temporal schedule registration for daily PCC export. |
| services/apps/pcc_sync_worker/src/schedules/pccS3Cleanup.ts | Temporal schedule registration for daily PCC cleanup. |
| services/apps/pcc_sync_worker/src/schedules/index.ts | Schedule exports. |
| services/apps/pcc_sync_worker/src/parser/types.ts | Parquet-row + parsed-project types. |
| services/apps/pcc_sync_worker/src/parser/rowParser.ts | Pure PCC row parsing + hierarchy mapping rules. |
| services/apps/pcc_sync_worker/src/parser/index.ts | Parser exports. |
| services/apps/pcc_sync_worker/src/main.ts | ServiceWorker archetype configuration. |
| services/apps/pcc_sync_worker/src/index.ts | Worker entrypoint: init + schedule + start consumer + start Temporal worker. |
| services/apps/pcc_sync_worker/src/consumer/pccProjectConsumer.ts | PCC job polling + Parquet processing + DB sync + error recording. |
| services/apps/pcc_sync_worker/src/config/settings.ts | Re-export Temporal config helpers. |
| services/apps/pcc_sync_worker/src/activities/index.ts | Activity exports. |
| services/apps/pcc_sync_worker/src/activities/exportActivity.ts | Snowflake recursive CTE export into S3 + metadata insert. |
| services/apps/pcc_sync_worker/src/activities/cleanupActivity.ts | Cleanup exported S3 files + mark jobs cleaned + Slack alerting on failures. |
| services/apps/pcc_sync_worker/package.json | PCC worker package manifest + scripts. |
| scripts/services/pcc-sync-worker.yaml | Compose service definitions for PCC worker (prod/dev). |
| scripts/services/docker/Dockerfile.pcc_sync_worker.dockerignore | Docker ignore file for PCC worker build context. |
| scripts/services/docker/Dockerfile.pcc_sync_worker | Multi-stage build for PCC worker. |
| backend/src/database/migrations/V1775312770__pcc-sync-worker-setup.sql | Add segments.maturity + PCC sync errors table + dedup index. |
| backend/src/database/migrations/U1775312770__pcc-sync-worker-setup.sql | Rollback for PCC sync DB changes. |
Comments suppressed due to low confidence (2)
services/libs/snowflake/src/metadataStore.ts:77
- When
platformsis provided as an empty array, this method falls back to no filter and will claim jobs for all platforms. That’s risky ifCROWD_SNOWFLAKE_ENABLED_PLATFORMSis accidentally empty/misconfigured. Consider treating an explicit emptyplatformslist as “match nothing” (return null early, or inject anAND FALSEfilter).
services/libs/snowflake/src/metadataStore.ts:125 platformsbeing an empty array currently results in no platform filter, so cleanup can target jobs for all platforms if the enabled-platforms list is empty/misconfigured. Consider returning[]early whenplatformsis provided but empty (or otherwise ensuring the filter matches nothing).
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| const parsed = parsePccRow(raw) | ||
|
|
||
| if (parsed.ok === false) { | ||
| errorCount++ | ||
| log.warn({ jobId: job.id, details: parsed.details }, 'Row schema mismatch — skipping') | ||
| if (!this.dryRun) { | ||
| await insertSyncError(tx, null, null, 'SCHEMA_MISMATCH', parsed.details) |
There was a problem hiding this comment.
On schema-mismatch rows, the sync error is always inserted with external_project_id/external_project_slug as null. If the raw row contains PROJECT_ID/SLUG, capturing them here would improve traceability and allow the dedup partial unique index to work (reducing table growth on repeated runs).
| const parsed = parsePccRow(raw) | |
| if (parsed.ok === false) { | |
| errorCount++ | |
| log.warn({ jobId: job.id, details: parsed.details }, 'Row schema mismatch — skipping') | |
| if (!this.dryRun) { | |
| await insertSyncError(tx, null, null, 'SCHEMA_MISMATCH', parsed.details) | |
| const rawRow = raw as Record<string, unknown> | |
| const rawProjectId = | |
| typeof rawRow.PROJECT_ID === 'string' ? rawRow.PROJECT_ID : null | |
| const rawProjectSlug = typeof rawRow.SLUG === 'string' ? rawRow.SLUG : null | |
| const parsed = parsePccRow(raw) | |
| if (parsed.ok === false) { | |
| errorCount++ | |
| log.warn({ jobId: job.id, details: parsed.details }, 'Row schema mismatch — skipping') | |
| if (!this.dryRun) { | |
| await insertSyncError( | |
| tx, | |
| rawProjectId, | |
| rawProjectSlug, | |
| 'SCHEMA_MISMATCH', | |
| parsed.details, | |
| ) |
| await this.metadataStore.markCompleted(job.id, { | ||
| transformedCount: upsertedCount, | ||
| skippedCount: skippedCount + mismatchCount + errorCount, | ||
| processingDurationMs: Date.now() - startTime, | ||
| }) | ||
| } catch (err) { | ||
| const errorMessage = err instanceof Error ? err.message : String(err) | ||
| log.error({ jobId: job.id, err }, 'PCC job failed') | ||
|
|
||
| try { | ||
| await this.metadataStore.markFailed(job.id, errorMessage, { | ||
| processingDurationMs: Date.now() - startTime, | ||
| }) | ||
| } catch (updateErr) { | ||
| log.error({ jobId: job.id, updateErr }, 'Failed to mark job as failed') |
There was a problem hiding this comment.
dryRun skips segment/insights/error writes, but the job is still marked completed in integration.snowflakeExportJobs. If the intent of dry-run is “no side effects”, this will still mutate state and prevent re-processing. Consider skipping markCompleted/markFailed (or writing to a separate metrics/log-only path) when dryRun is enabled.
| await this.metadataStore.markCompleted(job.id, { | |
| transformedCount: upsertedCount, | |
| skippedCount: skippedCount + mismatchCount + errorCount, | |
| processingDurationMs: Date.now() - startTime, | |
| }) | |
| } catch (err) { | |
| const errorMessage = err instanceof Error ? err.message : String(err) | |
| log.error({ jobId: job.id, err }, 'PCC job failed') | |
| try { | |
| await this.metadataStore.markFailed(job.id, errorMessage, { | |
| processingDurationMs: Date.now() - startTime, | |
| }) | |
| } catch (updateErr) { | |
| log.error({ jobId: job.id, updateErr }, 'Failed to mark job as failed') | |
| if (!this.dryRun) { | |
| await this.metadataStore.markCompleted(job.id, { | |
| transformedCount: upsertedCount, | |
| skippedCount: skippedCount + mismatchCount + errorCount, | |
| processingDurationMs: Date.now() - startTime, | |
| }) | |
| } else { | |
| log.info( | |
| { jobId: job.id, ...metrics, dryRun: this.dryRun }, | |
| 'Dry-run enabled: skipping markCompleted for PCC job', | |
| ) | |
| } | |
| } catch (err) { | |
| const errorMessage = err instanceof Error ? err.message : String(err) | |
| log.error({ jobId: job.id, err }, 'PCC job failed') | |
| if (!this.dryRun) { | |
| try { | |
| await this.metadataStore.markFailed(job.id, errorMessage, { | |
| processingDurationMs: Date.now() - startTime, | |
| }) | |
| } catch (updateErr) { | |
| log.error({ jobId: job.id, updateErr }, 'Failed to mark job as failed') | |
| } | |
| } else { | |
| log.info( | |
| { jobId: job.id, errorMessage, dryRun: this.dryRun }, | |
| 'Dry-run enabled: skipping markFailed for PCC job', | |
| ) |
| const job = await this.metadataStore.claimOldestPendingJob(undefined, this.enabledPlatforms) | ||
| log.info('Claiming job from metadata store', { job }) |
There was a problem hiding this comment.
If getEnabledPlatforms() returns an empty array (e.g., CROWD_SNOWFLAKE_ENABLED_PLATFORMS unset), passing it into claimOldestPendingJob will currently claim jobs for all platforms (because the store applies no filter when the list is empty). Consider guarding here (e.g., refuse to start the consumer / return early) so a misconfiguration can’t trigger cross-platform processing.
| const jobs = await metadataStore.getCleanableJobS3Paths( | ||
| intervalHours, | ||
| undefined, | ||
| true, | ||
| getEnabledPlatforms(), | ||
| ) |
There was a problem hiding this comment.
If getEnabledPlatforms() is empty due to misconfiguration, getCleanableJobS3Paths(..., platforms=[]) will currently apply no filter and return cleanable jobs for all platforms. Consider guarding against an empty enabled-platforms list before calling cleanup so it can’t delete unrelated exports.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit d71ee96. Configure here.
| } else if (platforms && platforms.length > 0) { | ||
| platformFilter = 'AND platform = ANY($(platforms)::text[])' | ||
| params = { platforms } | ||
| } |
There was a problem hiding this comment.
Empty platforms array disables platform filtering entirely
Medium Severity
When platforms is an empty array [], the platforms.length > 0 check is false, so no platformFilter is applied. This causes the query to match all platforms instead of no platforms. The snowflake connector's TransformerConsumer and cleanup activity both pass getEnabledPlatforms() as the platforms argument — and that function returns [] when CROWD_SNOWFLAKE_ENABLED_PLATFORMS is unset. In that scenario, the snowflake connector would claim PCC jobs, fail them permanently (since 'pcc' is not a supported platform in its registry), and prevent the PCC consumer from ever processing them.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit d71ee96. Configure here.


Note
Medium Risk
Adds new scheduled export/cleanup + a DB-updating consumer that mutates
segments/insightsProjectsfrom Snowflake data, plus new schema for error tracking; incorrect mappings or matching could impact production project metadata. Also changes shared Snowflake job-claim/cleanup filtering which could affect other connector workers’ processing/cleanup behavior.Overview
Introduces a new
pcc_sync_workerservice (with Docker + compose config) that schedules daily Temporal workflows to export PCC project hierarchy from Snowflake to S3 and to clean up old S3 exports.Adds a PCC job consumer that polls
integration.snowflakeExportJobsforplatform='pcc', streams Parquet rows from S3, parses/maps hierarchy, and updates existing CDPsegments(including newmaturity) andinsightsProjects, while recording non-auto-resolvable issues into a newpcc_projects_sync_errorstable with a deduping index.Refactors the shared
@crowd/snowflakelibrary to ownMetadataStore/S3Service/SnowflakeExporter(moving deps there), and extendsMetadataStoreAPIs to support platform filtering and configurable cleanup criteria;snowflake_connectorsis updated to use these shared components and to only claim/clean up jobs for enabled platforms.Reviewed by Cursor Bugbot for commit d71ee96. Bugbot is set up for automated code reviews on this repo. Configure here.