From 1af237507f6fa8a79abbf63c58d3a2c92117579b Mon Sep 17 00:00:00 2001 From: abhinavDhulipala Date: Sat, 4 Apr 2026 13:19:57 -0700 Subject: [PATCH 1/5] auto-pagination with Redis cache for paginated tool outputs --- .../app/api/workflows/[id]/execute/route.ts | 5 + apps/sim/blocks/blocks/zendesk.ts | 3 +- apps/sim/executor/execution/block-executor.ts | 2 + apps/sim/lib/paginated-cache/adapter.ts | 20 ++ apps/sim/lib/paginated-cache/index.ts | 14 + apps/sim/lib/paginated-cache/paginate.test.ts | 306 ++++++++++++++++++ apps/sim/lib/paginated-cache/paginate.ts | 192 +++++++++++ .../lib/paginated-cache/redis-cache.test.ts | 192 +++++++++++ apps/sim/lib/paginated-cache/redis-cache.ts | 148 +++++++++ apps/sim/lib/paginated-cache/types.test.ts | 59 ++++ apps/sim/lib/paginated-cache/types.ts | 56 ++++ .../executor/queued-workflow-execution.ts | 8 + apps/sim/lib/workflows/streaming/streaming.ts | 6 +- apps/sim/tools/index.ts | 31 ++ apps/sim/tools/types.ts | 7 + apps/sim/tools/zendesk/get_tickets.ts | 41 +-- packages/testing/src/mocks/redis.mock.ts | 2 + 17 files changed, 1071 insertions(+), 21 deletions(-) create mode 100644 apps/sim/lib/paginated-cache/adapter.ts create mode 100644 apps/sim/lib/paginated-cache/index.ts create mode 100644 apps/sim/lib/paginated-cache/paginate.test.ts create mode 100644 apps/sim/lib/paginated-cache/paginate.ts create mode 100644 apps/sim/lib/paginated-cache/redis-cache.test.ts create mode 100644 apps/sim/lib/paginated-cache/redis-cache.ts create mode 100644 apps/sim/lib/paginated-cache/types.test.ts create mode 100644 apps/sim/lib/paginated-cache/types.ts diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index 37e87cb4de2..e906ecd2ac6 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -35,6 +35,7 @@ import { } from '@/lib/execution/manual-cancellation' import { preprocessExecution } from '@/lib/execution/preprocessing' import { LoggingSession } from '@/lib/logs/execution/logging-session' +import { cleanupPaginatedCache } from '@/lib/paginated-cache/paginate' import { cleanupExecutionBase64Cache, hydrateUserFilesWithBase64, @@ -988,6 +989,9 @@ async function handleExecutePost( void cleanupExecutionBase64Cache(executionId).catch((error) => { reqLogger.error('Failed to cleanup base64 cache', { error }) }) + void cleanupPaginatedCache(executionId).catch((error) => { + reqLogger.error('Failed to cleanup paginated cache', { error }) + }) } } } @@ -1483,6 +1487,7 @@ async function handleExecutePost( timeoutController.cleanup() if (executionId) { await cleanupExecutionBase64Cache(executionId) + await cleanupPaginatedCache(executionId) } if (!isStreamClosed) { try { diff --git a/apps/sim/blocks/blocks/zendesk.ts b/apps/sim/blocks/blocks/zendesk.ts index 1a994a6d4fa..4e37ed44d6d 100644 --- a/apps/sim/blocks/blocks/zendesk.ts +++ b/apps/sim/blocks/blocks/zendesk.ts @@ -496,7 +496,6 @@ Return ONLY the search query - no explanations.`, condition: { field: 'operation', value: [ - 'get_tickets', 'get_users', 'get_organizations', 'search_users', @@ -514,7 +513,7 @@ Return ONLY the search query - no explanations.`, description: 'Cursor value from a previous response to fetch the next page of results', condition: { field: 'operation', - value: ['get_tickets', 'get_users', 'get_organizations', 'search'], + value: ['get_users', 'get_organizations', 'search'], }, mode: 'advanced', }, diff --git a/apps/sim/executor/execution/block-executor.ts b/apps/sim/executor/execution/block-executor.ts index 5044eab5639..2d56fbf707e 100644 --- a/apps/sim/executor/execution/block-executor.ts +++ b/apps/sim/executor/execution/block-executor.ts @@ -1,5 +1,6 @@ import { createLogger, type Logger } from '@sim/logger' import { redactApiKeys } from '@/lib/core/security/redaction' +import { hydrateCacheReferences } from '@/lib/paginated-cache/paginate' import { getBaseUrl } from '@/lib/core/utils/urls' import { containsUserFileWithMetadata, @@ -109,6 +110,7 @@ export class BlockExecutor { } resolvedInputs = this.resolver.resolveInputs(ctx, node.id, block.config.params, block) + resolvedInputs = await hydrateCacheReferences(resolvedInputs) if (blockLog) { blockLog.input = this.sanitizeInputsForLog(resolvedInputs) diff --git a/apps/sim/lib/paginated-cache/adapter.ts b/apps/sim/lib/paginated-cache/adapter.ts new file mode 100644 index 00000000000..ca35f427178 --- /dev/null +++ b/apps/sim/lib/paginated-cache/adapter.ts @@ -0,0 +1,20 @@ +import type { CachedPage, CacheMetadata } from '@/lib/paginated-cache/types' + +/** + * Storage-agnostic interface for paginated cache operations. + * TTL and other storage-specific concerns belong in implementations. + */ +export interface PaginatedCacheStorageAdapter { + /** Store a single page of items */ + storePage(cacheId: string, pageIndex: number, items: unknown[]): Promise + /** Store cache metadata */ + storeMetadata(cacheId: string, metadata: CacheMetadata): Promise + /** Retrieve a single page. Returns null if not found or expired. */ + getPage(cacheId: string, pageIndex: number): Promise + /** Retrieve cache metadata. Returns null if not found or expired. */ + getMetadata(cacheId: string): Promise + /** Retrieve all pages in order. Throws if any page is missing. */ + getAllPages(cacheId: string, totalPages: number): Promise + /** Delete all data for a cache entry */ + delete(cacheId: string): Promise +} diff --git a/apps/sim/lib/paginated-cache/index.ts b/apps/sim/lib/paginated-cache/index.ts new file mode 100644 index 00000000000..4749a0d8584 --- /dev/null +++ b/apps/sim/lib/paginated-cache/index.ts @@ -0,0 +1,14 @@ +export type { PaginatedCacheStorageAdapter } from '@/lib/paginated-cache/adapter' +export { RedisPaginatedCache } from '@/lib/paginated-cache/redis-cache' +export { + autoPaginate, + cleanupPaginatedCache, + hydrateCacheReferences, +} from '@/lib/paginated-cache/paginate' +export { + isPaginatedCacheReference, + type CachedPage, + type CacheMetadata, + type PaginatedCacheReference, + type ToolPaginationConfig, +} from '@/lib/paginated-cache/types' diff --git a/apps/sim/lib/paginated-cache/paginate.test.ts b/apps/sim/lib/paginated-cache/paginate.test.ts new file mode 100644 index 00000000000..ed9cf8f0649 --- /dev/null +++ b/apps/sim/lib/paginated-cache/paginate.test.ts @@ -0,0 +1,306 @@ +/** + * @vitest-environment node + */ +import { loggerMock } from '@sim/testing' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const { mockGetRedisClient, mockStorePage, mockStoreMetadata, mockGetAllPages } = vi.hoisted( + () => ({ + mockGetRedisClient: vi.fn(), + mockStorePage: vi.fn(), + mockStoreMetadata: vi.fn(), + mockGetAllPages: vi.fn(), + }) +) + +vi.mock('@sim/logger', () => loggerMock) +vi.mock('@/lib/core/config/redis', () => ({ + getRedisClient: mockGetRedisClient, +})) +vi.mock('@/lib/paginated-cache/redis-cache', () => ({ + RedisPaginatedCache: vi.fn().mockImplementation(() => ({ + storePage: mockStorePage, + storeMetadata: mockStoreMetadata, + getAllPages: mockGetAllPages, + })), +})) + +import { autoPaginate, hydrateCacheReferences } from '@/lib/paginated-cache/paginate' +import type { ToolResponse } from '@/tools/types' + +function makePageResponse( + items: unknown[], + hasMore: boolean, + cursor: string | null +): ToolResponse { + return { + success: true, + output: { + tickets: items, + paging: { has_more: hasMore, after_cursor: cursor }, + metadata: { total_returned: items.length, has_more: hasMore }, + }, + } +} + +const paginationConfig = { + pageField: 'tickets', + getItems: (output: Record) => (output.tickets as unknown[]) ?? [], + getNextPageToken: (output: Record) => { + const paging = output.paging as Record | undefined + return paging?.has_more && paging?.after_cursor ? (paging.after_cursor as string) : null + }, + buildNextPageParams: (params: Record, token: string | number) => ({ + ...params, + pageAfter: String(token), + }), +} + +describe('autoPaginate', () => { + let mockExecuteTool: ReturnType + + beforeEach(() => { + vi.clearAllMocks() + mockGetRedisClient.mockReturnValue({}) + mockStorePage.mockResolvedValue(undefined) + mockStoreMetadata.mockResolvedValue(undefined) + mockExecuteTool = vi.fn() + }) + + it('throws when Redis is unavailable', async () => { + mockGetRedisClient.mockReturnValue(null) + + await expect( + autoPaginate({ + initialResult: makePageResponse([{ id: 1 }], false, null), + params: {}, + paginationConfig, + executeTool: mockExecuteTool, + toolId: 'zendesk_get_tickets', + executionId: 'exec-1', + }) + ).rejects.toThrow('Redis is required for auto-pagination but is not available') + }) + + it('handles a single page with no more pages', async () => { + const initialResult = makePageResponse([{ id: 1 }, { id: 2 }], false, null) + + const result = await autoPaginate({ + initialResult, + params: {}, + paginationConfig, + executeTool: mockExecuteTool, + toolId: 'zendesk_get_tickets', + executionId: 'exec-1', + }) + + expect(mockStorePage).toHaveBeenCalledOnce() + expect(mockStorePage).toHaveBeenCalledWith(expect.any(String), 0, [{ id: 1 }, { id: 2 }]) + expect(mockStoreMetadata).toHaveBeenCalledOnce() + expect(mockStoreMetadata).toHaveBeenCalledWith( + expect.any(String), + expect.objectContaining({ totalPages: 1, totalItems: 2 }) + ) + expect(mockExecuteTool).not.toHaveBeenCalled() + expect(result.output.tickets).toEqual( + expect.objectContaining({ _type: 'paginated_cache_ref', totalPages: 1, totalItems: 2 }) + ) + }) + + it('fetches all pages and preserves last page metadata', async () => { + const initialResult = makePageResponse([{ id: 1 }], true, 'cursor-1') + mockExecuteTool + .mockResolvedValueOnce(makePageResponse([{ id: 2 }], true, 'cursor-2')) + .mockResolvedValueOnce(makePageResponse([{ id: 3 }], false, null)) + + const result = await autoPaginate({ + initialResult, + params: { query: 'test' }, + paginationConfig, + executeTool: mockExecuteTool, + toolId: 'zendesk_get_tickets', + executionId: 'exec-1', + }) + + expect(mockStorePage).toHaveBeenCalledTimes(3) + expect(mockStoreMetadata).toHaveBeenCalledWith( + expect.any(String), + expect.objectContaining({ totalPages: 3, totalItems: 3 }) + ) + expect(result.output.tickets).toEqual( + expect.objectContaining({ _type: 'paginated_cache_ref', totalPages: 3, totalItems: 3 }) + ) + expect(result.output.paging).toEqual({ has_more: false, after_cursor: null }) + expect(result.output.metadata).toEqual({ total_returned: 1, has_more: false }) + }) + + it('respects maxPages', async () => { + const configWithMax = { ...paginationConfig, maxPages: 2 } + const initialResult = makePageResponse([{ id: 1 }], true, 'cursor-1') + mockExecuteTool.mockResolvedValue(makePageResponse([{ id: 2 }], true, 'cursor-next')) + + const result = await autoPaginate({ + initialResult, + params: {}, + paginationConfig: configWithMax, + executeTool: mockExecuteTool, + toolId: 'zendesk_get_tickets', + executionId: 'exec-1', + }) + + expect(mockStorePage).toHaveBeenCalledTimes(2) + expect(mockExecuteTool).toHaveBeenCalledTimes(1) + expect(result.output.tickets).toEqual(expect.objectContaining({ totalPages: 2 })) + }) + + it('throws on page fetch failure', async () => { + const initialResult = makePageResponse([{ id: 1 }], true, 'cursor-1') + mockExecuteTool.mockResolvedValueOnce({ + success: false, + output: {}, + error: 'rate limited', + }) + + await expect( + autoPaginate({ + initialResult, + params: {}, + paginationConfig, + executeTool: mockExecuteTool, + toolId: 'zendesk_get_tickets', + executionId: 'exec-1', + }) + ).rejects.toThrow('Auto-pagination failed on page 1: rate limited') + }) + + it('passes skipPostProcess=true for subsequent pages', async () => { + const initialResult = makePageResponse([{ id: 1 }], true, 'cursor-1') + mockExecuteTool.mockResolvedValueOnce(makePageResponse([{ id: 2 }], false, null)) + + await autoPaginate({ + initialResult, + params: {}, + paginationConfig, + executeTool: mockExecuteTool, + toolId: 'zendesk_get_tickets', + executionId: 'exec-1', + }) + + expect(mockExecuteTool).toHaveBeenCalledWith( + 'zendesk_get_tickets', + expect.objectContaining({ pageAfter: 'cursor-1' }), + true + ) + }) + + it('generates cacheId with expected format', async () => { + const initialResult = makePageResponse([{ id: 1 }], false, null) + + await autoPaginate({ + initialResult, + params: {}, + paginationConfig, + executeTool: mockExecuteTool, + toolId: 'zendesk_get_tickets', + executionId: 'exec-42', + }) + + const storedCacheId = mockStoreMetadata.mock.calls[0][0] as string + expect(storedCacheId).toMatch(/^exec-42:zendesk_get_tickets:tickets:\d+$/) + }) +}) + +describe('hydrateCacheReferences', () => { + beforeEach(() => { + vi.clearAllMocks() + mockGetRedisClient.mockReturnValue({}) + mockGetAllPages.mockResolvedValue([]) + }) + + it('returns the same object by reference when no refs present', async () => { + const input = { name: 'test', count: 42, nested: { value: true } } + + const result = await hydrateCacheReferences(input) + + expect(result).toBe(input) + expect(mockGetAllPages).not.toHaveBeenCalled() + }) + + it('hydrates a single reference', async () => { + mockGetAllPages.mockResolvedValue([ + { pageIndex: 0, itemCount: 2, items: [{ id: 1 }, { id: 2 }], storedAt: 100 }, + { pageIndex: 1, itemCount: 1, items: [{ id: 3 }], storedAt: 200 }, + ]) + + const input = { + tickets: { + _type: 'paginated_cache_ref' as const, + cacheId: 'cache-123', + totalPages: 2, + totalItems: 3, + pageField: 'tickets', + }, + } + + const result = await hydrateCacheReferences(input) + + expect(result.tickets).toEqual([{ id: 1 }, { id: 2 }, { id: 3 }]) + expect(mockGetAllPages).toHaveBeenCalledWith('cache-123', 2) + }) + + it('hydrates a nested reference', async () => { + mockGetAllPages.mockResolvedValue([ + { pageIndex: 0, itemCount: 1, items: ['item-a'], storedAt: 100 }, + ]) + + const input = { + data: { + inner: { + _type: 'paginated_cache_ref' as const, + cacheId: 'cache-nested', + totalPages: 1, + totalItems: 1, + pageField: 'items', + }, + }, + } + + const result = await hydrateCacheReferences(input) + + expect((result.data as Record).inner).toEqual(['item-a']) + }) + + it('throws when Redis is unavailable during hydration', async () => { + mockGetRedisClient.mockReturnValue(null) + + const input = { + tickets: { + _type: 'paginated_cache_ref' as const, + cacheId: 'cache-no-redis', + totalPages: 1, + totalItems: 1, + pageField: 'tickets', + }, + } + + await expect(hydrateCacheReferences(input)).rejects.toThrow( + 'Redis is required to hydrate paginated cache reference' + ) + }) + + it('throws when getAllPages fails', async () => { + mockGetAllPages.mockRejectedValue(new Error('Connection lost')) + + const input = { + tickets: { + _type: 'paginated_cache_ref' as const, + cacheId: 'cache-fail', + totalPages: 2, + totalItems: 10, + pageField: 'tickets', + }, + } + + await expect(hydrateCacheReferences(input)).rejects.toThrow('Connection lost') + }) +}) diff --git a/apps/sim/lib/paginated-cache/paginate.ts b/apps/sim/lib/paginated-cache/paginate.ts new file mode 100644 index 00000000000..c55a9b75e6e --- /dev/null +++ b/apps/sim/lib/paginated-cache/paginate.ts @@ -0,0 +1,192 @@ +import { createLogger } from '@sim/logger' +import { getRedisClient } from '@/lib/core/config/redis' +import { RedisPaginatedCache } from '@/lib/paginated-cache/redis-cache' +import { isPaginatedCacheReference } from '@/lib/paginated-cache/types' +import type { PaginatedCacheReference, ToolPaginationConfig } from '@/lib/paginated-cache/types' +import type { ToolResponse } from '@/tools/types' + +const logger = createLogger('Paginator') + +const DEFAULT_MAX_PAGES = 100 + +interface AutoPaginateOptions { + initialResult: ToolResponse + params: Record + paginationConfig: ToolPaginationConfig + executeTool: ( + toolId: string, + params: Record, + skipPostProcess?: boolean + ) => Promise + toolId: string + executionId: string +} + +export async function autoPaginate(options: AutoPaginateOptions): Promise { + const { initialResult, params, paginationConfig: config, executeTool, toolId, executionId } = + options + const maxPages = config.maxPages ?? DEFAULT_MAX_PAGES + + const redis = getRedisClient() + if (!redis) { + throw new Error('Redis is required for auto-pagination but is not available') + } + + const cache = new RedisPaginatedCache(redis) + const cacheId = `${executionId}:${toolId}:${config.pageField}:${Date.now()}` + + let totalItems = 0 + let pageIndex = 0 + let lastOutput = initialResult.output + + const initialItems = config.getItems(initialResult.output) + await cache.storePage(cacheId, pageIndex, initialItems) + totalItems += initialItems.length + pageIndex++ + + let nextToken = config.getNextPageToken(initialResult.output) + while (nextToken !== null && pageIndex < maxPages) { + const nextParams = config.buildNextPageParams(params, nextToken) + const pageResult = await executeTool(toolId, nextParams, true) + + if (!pageResult.success) { + throw new Error( + `Auto-pagination failed on page ${pageIndex}: ${pageResult.error ?? 'Unknown error'}` + ) + } + + const pageItems = config.getItems(pageResult.output) + await cache.storePage(cacheId, pageIndex, pageItems) + totalItems += pageItems.length + lastOutput = pageResult.output + pageIndex++ + + nextToken = config.getNextPageToken(pageResult.output) + } + + const totalPages = pageIndex + const metadata = { cacheId, totalPages, totalItems, pageField: config.pageField } + await cache.storeMetadata(cacheId, metadata) + + const reference: PaginatedCacheReference = { + _type: 'paginated_cache_ref', + cacheId, + totalPages, + totalItems, + pageField: config.pageField, + } + + logger.info('Auto-pagination complete', { cacheId, totalPages, totalItems, toolId }) + + return { + ...initialResult, + output: { + ...lastOutput, + [config.pageField]: reference, + }, + } +} + +/** + * Deep-walk inputs and replace any PaginatedCacheReference with hydrated data. + * No-op if no references found. Throws on any failure. + */ +export async function hydrateCacheReferences( + inputs: Record +): Promise> { + if (!containsCacheReference(inputs)) { + return inputs + } + return (await deepHydrate(inputs)) as Record +} + +function containsCacheReference(value: unknown): boolean { + if (isPaginatedCacheReference(value)) return true + if (Array.isArray(value)) return value.some(containsCacheReference) + if (typeof value === 'object' && value !== null) { + return Object.values(value as Record).some(containsCacheReference) + } + return false +} + +async function deepHydrate(value: unknown): Promise { + if (isPaginatedCacheReference(value)) { + return hydrateReference(value) + } + + if (Array.isArray(value)) { + return Promise.all(value.map(deepHydrate)) + } + + if (typeof value === 'object' && value !== null) { + const entries = Object.entries(value as Record) + const hydrated: Record = {} + for (const [key, val] of entries) { + hydrated[key] = await deepHydrate(val) + } + return hydrated + } + + return value +} + +async function hydrateReference(ref: PaginatedCacheReference): Promise { + const redis = getRedisClient() + if (!redis) { + throw new Error( + `Redis is required to hydrate paginated cache reference (cacheId: ${ref.cacheId}) but is not available` + ) + } + + const cache = new RedisPaginatedCache(redis) + const pages = await cache.getAllPages(ref.cacheId, ref.totalPages) + + const items: unknown[] = [] + for (const page of pages) { + items.push(...page.items) + } + + logger.info('Hydrated cache reference', { + cacheId: ref.cacheId, + totalPages: ref.totalPages, + totalItems: items.length, + }) + + return items +} + +/** + * Cleans up paginated cache entries for a specific execution. + * Should be called at the end of workflow execution. + */ +export async function cleanupPaginatedCache(executionId: string): Promise { + const redis = getRedisClient() + if (!redis) { + return + } + + const pattern = `pagcache:*${executionId}:*` + + try { + let cursor = '0' + let deletedCount = 0 + + do { + const [nextCursor, keys] = await redis.scan(cursor, 'MATCH', pattern, 'COUNT', 100) + cursor = nextCursor + + if (keys.length > 0) { + await redis.del(...keys) + deletedCount += keys.length + } + } while (cursor !== '0') + + if (deletedCount > 0) { + logger.info(`Cleaned up ${deletedCount} paginated cache entries for execution ${executionId}`) + } + } catch (error) { + logger.warn(`Failed to cleanup paginated cache for execution ${executionId}`, { + error: error instanceof Error ? error.message : String(error), + }) + } +} diff --git a/apps/sim/lib/paginated-cache/redis-cache.test.ts b/apps/sim/lib/paginated-cache/redis-cache.test.ts new file mode 100644 index 00000000000..d5b423d4cec --- /dev/null +++ b/apps/sim/lib/paginated-cache/redis-cache.test.ts @@ -0,0 +1,192 @@ +/** + * @vitest-environment node + */ +import { createMockRedis, loggerMock, type MockRedis } from '@sim/testing' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +vi.mock('@sim/logger', () => loggerMock) +vi.mock('@/lib/core/config/env', () => ({ + env: { EXECUTION_TIMEOUT_ASYNC_ENTERPRISE: '5400' }, +})) + +import { RedisPaginatedCache } from '@/lib/paginated-cache/redis-cache' +import type { CacheMetadata } from '@/lib/paginated-cache/types' + +/** 2× the mocked EXECUTION_TIMEOUT_ASYNC_ENTERPRISE (5400s) */ +const EXPECTED_DEFAULT_TTL_MS = 5400 * 1000 * 2 + +describe('RedisPaginatedCache', () => { + let mockRedis: MockRedis + let cache: RedisPaginatedCache + + beforeEach(() => { + vi.clearAllMocks() + mockRedis = createMockRedis() + cache = new RedisPaginatedCache(mockRedis as never) + }) + + describe('storePage', () => { + it('stores JSON with correct key pattern and PX TTL', async () => { + const items = [{ id: 1 }, { id: 2 }] + + await cache.storePage('cache-123', 0, items) + + expect(mockRedis.set).toHaveBeenCalledOnce() + const [key, value, pxFlag, ttl] = mockRedis.set.mock.calls[0] + expect(key).toBe('pagcache:page:cache-123:0') + expect(pxFlag).toBe('PX') + expect(ttl).toBe(EXPECTED_DEFAULT_TTL_MS) + + const parsed = JSON.parse(value) + expect(parsed.pageIndex).toBe(0) + expect(parsed.itemCount).toBe(2) + expect(parsed.items).toEqual(items) + expect(typeof parsed.storedAt).toBe('number') + }) + + it('propagates Redis errors', async () => { + mockRedis.set.mockRejectedValue(new Error('ECONNREFUSED')) + + await expect(cache.storePage('cache-123', 0, [])).rejects.toThrow('ECONNREFUSED') + }) + }) + + describe('storeMetadata', () => { + it('stores JSON with correct key pattern and PX TTL', async () => { + const metadata: CacheMetadata = { + cacheId: 'cache-123', + totalPages: 3, + totalItems: 150, + pageField: 'tickets', + } + + await cache.storeMetadata('cache-123', metadata) + + expect(mockRedis.set).toHaveBeenCalledOnce() + const [key, value, pxFlag, ttl] = mockRedis.set.mock.calls[0] + expect(key).toBe('pagcache:meta:cache-123') + expect(pxFlag).toBe('PX') + expect(ttl).toBe(EXPECTED_DEFAULT_TTL_MS) + + const parsed = JSON.parse(value) + expect(parsed).toEqual(metadata) + }) + }) + + describe('getPage', () => { + it('returns parsed CachedPage when data exists', async () => { + const page = { pageIndex: 0, itemCount: 2, items: [{ id: 1 }, { id: 2 }], storedAt: 100 } + mockRedis.get.mockResolvedValue(JSON.stringify(page)) + + const result = await cache.getPage('cache-123', 0) + + expect(mockRedis.get).toHaveBeenCalledWith('pagcache:page:cache-123:0') + expect(result).toEqual(page) + }) + + it('returns null when key is missing', async () => { + const result = await cache.getPage('cache-123', 0) + + expect(result).toBeNull() + }) + + it('deletes key and returns null when JSON is corrupted', async () => { + mockRedis.get.mockResolvedValue('not-valid-json{{{') + + const result = await cache.getPage('cache-123', 0) + + expect(result).toBeNull() + expect(mockRedis.del).toHaveBeenCalledWith('pagcache:page:cache-123:0') + }) + }) + + describe('getMetadata', () => { + it('returns parsed CacheMetadata when data exists', async () => { + const metadata: CacheMetadata = { + cacheId: 'cache-123', + totalPages: 3, + totalItems: 150, + pageField: 'tickets', + } + mockRedis.get.mockResolvedValue(JSON.stringify(metadata)) + + const result = await cache.getMetadata('cache-123') + + expect(mockRedis.get).toHaveBeenCalledWith('pagcache:meta:cache-123') + expect(result).toEqual(metadata) + }) + + it('returns null when metadata is missing', async () => { + const result = await cache.getMetadata('cache-123') + + expect(result).toBeNull() + }) + }) + + describe('getAllPages', () => { + it('calls mget with correct keys and returns parsed pages in order', async () => { + const page0 = { pageIndex: 0, itemCount: 1, items: ['a'], storedAt: 100 } + const page1 = { pageIndex: 1, itemCount: 1, items: ['b'], storedAt: 200 } + const page2 = { pageIndex: 2, itemCount: 1, items: ['c'], storedAt: 300 } + mockRedis.mget.mockResolvedValue([ + JSON.stringify(page0), + JSON.stringify(page1), + JSON.stringify(page2), + ]) + + const result = await cache.getAllPages('cache-123', 3) + + expect(mockRedis.mget).toHaveBeenCalledWith( + 'pagcache:page:cache-123:0', + 'pagcache:page:cache-123:1', + 'pagcache:page:cache-123:2' + ) + expect(result).toEqual([page0, page1, page2]) + }) + + it('throws when a page is missing', async () => { + mockRedis.mget.mockResolvedValue([JSON.stringify({ pageIndex: 0 }), null]) + + await expect(cache.getAllPages('cache-123', 2)).rejects.toThrow( + 'Missing page 1 for cache entry cache-123' + ) + }) + + it('throws when a page is corrupted', async () => { + mockRedis.mget.mockResolvedValue([JSON.stringify({ pageIndex: 0 }), 'corrupt{{{']) + + await expect(cache.getAllPages('cache-123', 2)).rejects.toThrow( + 'Corrupted page 1 for cache entry cache-123' + ) + }) + }) + + describe('delete', () => { + it('deletes meta key and scans+deletes page keys', async () => { + mockRedis.scan + .mockResolvedValueOnce(['42', ['pagcache:page:cache-123:0', 'pagcache:page:cache-123:1']]) + .mockResolvedValueOnce(['0', ['pagcache:page:cache-123:2']]) + + await cache.delete('cache-123') + + expect(mockRedis.del).toHaveBeenCalledWith('pagcache:meta:cache-123') + expect(mockRedis.del).toHaveBeenCalledWith( + 'pagcache:page:cache-123:0', + 'pagcache:page:cache-123:1' + ) + expect(mockRedis.del).toHaveBeenCalledWith('pagcache:page:cache-123:2') + }) + }) + + describe('custom TTL', () => { + it('uses the TTL passed in the constructor', async () => { + const customTtl = 5 * 60 * 1000 + const customCache = new RedisPaginatedCache(mockRedis as never, customTtl) + + await customCache.storePage('cache-456', 0, [{ id: 1 }]) + + const [, , , ttl] = mockRedis.set.mock.calls[0] + expect(ttl).toBe(customTtl) + }) + }) +}) diff --git a/apps/sim/lib/paginated-cache/redis-cache.ts b/apps/sim/lib/paginated-cache/redis-cache.ts new file mode 100644 index 00000000000..2cd9d10c8a2 --- /dev/null +++ b/apps/sim/lib/paginated-cache/redis-cache.ts @@ -0,0 +1,148 @@ +import { createLogger } from '@sim/logger' +import type Redis from 'ioredis' +import { env } from '@/lib/core/config/env' +import type { PaginatedCacheStorageAdapter } from '@/lib/paginated-cache/adapter' +import type { CachedPage, CacheMetadata } from '@/lib/paginated-cache/types' + +const logger = createLogger('RedisPaginatedCache') + +const REDIS_KEY_PREFIX = 'pagcache:' + +/** Safety-net TTL: 2× the max async execution timeout. Explicit cleanup is the primary mechanism. */ +const DEFAULT_TTL_MS = Number(env.EXECUTION_TIMEOUT_ASYNC_ENTERPRISE) * 1000 * 2 + +export class RedisPaginatedCache implements PaginatedCacheStorageAdapter { + constructor( + private redis: Redis, + private ttlMs: number = DEFAULT_TTL_MS + ) {} + + private getMetaKey(cacheId: string): string { + return `${REDIS_KEY_PREFIX}meta:${cacheId}` + } + + private getPageKey(cacheId: string, pageIndex: number): string { + return `${REDIS_KEY_PREFIX}page:${cacheId}:${pageIndex}` + } + + async storePage(cacheId: string, pageIndex: number, items: unknown[]): Promise { + try { + const page: CachedPage = { + pageIndex, + itemCount: items.length, + items, + storedAt: Date.now(), + } + const key = this.getPageKey(cacheId, pageIndex) + await this.redis.set(key, JSON.stringify(page), 'PX', this.ttlMs) + } catch (error) { + logger.error('Failed to store page', { cacheId, pageIndex, error }) + throw error + } + } + + async storeMetadata(cacheId: string, metadata: CacheMetadata): Promise { + try { + const key = this.getMetaKey(cacheId) + await this.redis.set(key, JSON.stringify(metadata), 'PX', this.ttlMs) + } catch (error) { + logger.error('Failed to store metadata', { cacheId, error }) + throw error + } + } + + async getPage(cacheId: string, pageIndex: number): Promise { + try { + const key = this.getPageKey(cacheId, pageIndex) + const data = await this.redis.get(key) + + if (!data) { + return null + } + + try { + return JSON.parse(data) as CachedPage + } catch { + logger.warn('Corrupted page entry, deleting:', key) + await this.redis.del(key) + return null + } + } catch (error) { + logger.error('Failed to get page', { cacheId, pageIndex, error }) + throw error + } + } + + async getMetadata(cacheId: string): Promise { + try { + const key = this.getMetaKey(cacheId) + const data = await this.redis.get(key) + + if (!data) { + return null + } + + try { + return JSON.parse(data) as CacheMetadata + } catch { + logger.warn('Corrupted metadata entry, deleting:', key) + await this.redis.del(key) + return null + } + } catch (error) { + logger.error('Failed to get metadata', { cacheId, error }) + throw error + } + } + + async getAllPages(cacheId: string, totalPages: number): Promise { + try { + const keys = Array.from({ length: totalPages }, (_, i) => this.getPageKey(cacheId, i)) + const results = await this.redis.mget(...keys) + + const pages: CachedPage[] = [] + for (let i = 0; i < results.length; i++) { + const raw = results[i] + if (!raw) { + throw new Error(`Missing page ${i} for cache entry ${cacheId}`) + } + + try { + pages.push(JSON.parse(raw) as CachedPage) + } catch { + throw new Error(`Corrupted page ${i} for cache entry ${cacheId}`) + } + } + + return pages + } catch (error) { + logger.error('Failed to get all pages', { cacheId, totalPages, error }) + throw error + } + } + + async delete(cacheId: string): Promise { + try { + await this.redis.del(this.getMetaKey(cacheId)) + + let cursor = '0' + let deletedCount = 0 + const pattern = `${REDIS_KEY_PREFIX}page:${cacheId}:*` + + do { + const [nextCursor, keys] = await this.redis.scan(cursor, 'MATCH', pattern, 'COUNT', 100) + cursor = nextCursor + + if (keys.length > 0) { + await this.redis.del(...keys) + deletedCount += keys.length + } + } while (cursor !== '0') + + logger.info(`Deleted cache entry ${cacheId} (${deletedCount} page keys)`) + } catch (error) { + logger.error('Failed to delete cache entry', { cacheId, error }) + throw error + } + } +} diff --git a/apps/sim/lib/paginated-cache/types.test.ts b/apps/sim/lib/paginated-cache/types.test.ts new file mode 100644 index 00000000000..f4e6ebdc5fa --- /dev/null +++ b/apps/sim/lib/paginated-cache/types.test.ts @@ -0,0 +1,59 @@ +/** + * @vitest-environment node + */ +import { describe, expect, it } from 'vitest' + +import { isPaginatedCacheReference } from '@/lib/paginated-cache/types' + +describe('isPaginatedCacheReference', () => { + it('returns true for a valid reference object', () => { + const ref = { + _type: 'paginated_cache_ref', + cacheId: 'cache-123', + totalPages: 5, + totalItems: 250, + pageField: 'tickets', + } + + expect(isPaginatedCacheReference(ref)).toBe(true) + }) + + it('returns false for null', () => { + expect(isPaginatedCacheReference(null)).toBe(false) + }) + + it('returns false for an object missing _type', () => { + const obj = { + cacheId: 'cache-123', + totalPages: 5, + totalItems: 250, + pageField: 'tickets', + } + + expect(isPaginatedCacheReference(obj)).toBe(false) + }) + + it('returns false for an object with wrong _type value', () => { + const obj = { + _type: 'something_else', + cacheId: 'cache-123', + totalPages: 5, + totalItems: 250, + pageField: 'tickets', + } + + expect(isPaginatedCacheReference(obj)).toBe(false) + }) + + it('returns false when a required field has the wrong type', () => { + const obj = { + _type: 'paginated_cache_ref', + cacheId: 'cache-123', + totalPages: '5', + totalItems: 250, + pageField: 'tickets', + } + + expect(isPaginatedCacheReference(obj)).toBe(false) + }) +}) diff --git a/apps/sim/lib/paginated-cache/types.ts b/apps/sim/lib/paginated-cache/types.ts new file mode 100644 index 00000000000..ca71c73038c --- /dev/null +++ b/apps/sim/lib/paginated-cache/types.ts @@ -0,0 +1,56 @@ +/** + * Pure pagination semantics — storage-agnostic. + */ +export interface ToolPaginationConfig> { + /** The field in the tool output containing the page of data (e.g. 'tickets') */ + pageField: string + /** Extract the items array from a single page response */ + getItems: (output: O) => unknown[] + /** Extract the next page token, or null if no more pages */ + getNextPageToken: (output: O) => string | number | null + /** Build params for fetching the next page */ + buildNextPageParams: ( + currentParams: Record, + token: string | number + ) => Record + /** Maximum pages to fetch. Default: 100 */ + maxPages?: number +} + +/** Lightweight reference stored in blockStates instead of full data */ +export interface PaginatedCacheReference { + _type: 'paginated_cache_ref' + cacheId: string + totalPages: number + totalItems: number + pageField: string +} + +/** A single cached page */ +export interface CachedPage { + pageIndex: number + itemCount: number + items: unknown[] + storedAt: number +} + +/** Summary metadata for a paginated cache entry */ +export interface CacheMetadata { + cacheId: string + totalPages: number + totalItems: number + pageField: string +} + +/** Type guard for PaginatedCacheReference */ +export function isPaginatedCacheReference(value: unknown): value is PaginatedCacheReference { + if (typeof value !== 'object' || value === null) return false + const obj = value as Record + return ( + obj._type === 'paginated_cache_ref' && + typeof obj.cacheId === 'string' && + typeof obj.totalPages === 'number' && + typeof obj.totalItems === 'number' && + typeof obj.pageField === 'string' + ) +} diff --git a/apps/sim/lib/workflows/executor/queued-workflow-execution.ts b/apps/sim/lib/workflows/executor/queued-workflow-execution.ts index c60ba860a11..3ef6059d950 100644 --- a/apps/sim/lib/workflows/executor/queued-workflow-execution.ts +++ b/apps/sim/lib/workflows/executor/queued-workflow-execution.ts @@ -3,6 +3,7 @@ import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/event-buffer' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' +import { cleanupPaginatedCache } from '@/lib/paginated-cache/paginate' import { cleanupExecutionBase64Cache, hydrateUserFilesWithBase64, @@ -335,5 +336,12 @@ export async function executeQueuedWorkflowJob( error: error instanceof Error ? error.message : String(error), }) }) + + await cleanupPaginatedCache(executionId).catch((error) => { + logger.error('Failed to cleanup queued workflow paginated cache', { + executionId, + error: error instanceof Error ? error.message : String(error), + }) + }) } } diff --git a/apps/sim/lib/workflows/streaming/streaming.ts b/apps/sim/lib/workflows/streaming/streaming.ts index 081c497b8ee..7efd4be1ac4 100644 --- a/apps/sim/lib/workflows/streaming/streaming.ts +++ b/apps/sim/lib/workflows/streaming/streaming.ts @@ -8,6 +8,7 @@ import { import { encodeSSE } from '@/lib/core/utils/sse' import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' import { processStreamingBlockLogs } from '@/lib/tokenization' +import { cleanupPaginatedCache } from '@/lib/paginated-cache/paginate' import { cleanupExecutionBase64Cache, hydrateUserFilesWithBase64, @@ -348,6 +349,7 @@ export async function createStreamingResponse( if (executionId) { await cleanupExecutionBase64Cache(executionId) + await cleanupPaginatedCache(executionId) } controller.close() @@ -359,6 +361,7 @@ export async function createStreamingResponse( if (executionId) { await cleanupExecutionBase64Cache(executionId) + await cleanupPaginatedCache(executionId) } controller.close() @@ -373,8 +376,9 @@ export async function createStreamingResponse( if (executionId) { try { await cleanupExecutionBase64Cache(executionId) + await cleanupPaginatedCache(executionId) } catch (error) { - logger.error(`[${requestId}] Failed to cleanup base64 cache`, { error }) + logger.error(`[${requestId}] Failed to cleanup cache`, { error }) } } }, diff --git a/apps/sim/tools/index.ts b/apps/sim/tools/index.ts index 737dfa47bbd..7a66d350a89 100644 --- a/apps/sim/tools/index.ts +++ b/apps/sim/tools/index.ts @@ -26,6 +26,7 @@ import type { ToolResponse, ToolRetryConfig, } from '@/tools/types' +import { autoPaginate } from '@/lib/paginated-cache/paginate' import { formatRequestParams, getTool, validateRequiredParametersAfterMerge } from '@/tools/utils' import * as toolsUtilsServer from '@/tools/utils.server' @@ -819,6 +820,21 @@ export async function executeTool( // Process file outputs if execution context is available finalResult = await processFileOutputs(finalResult, tool, executionContext) + // Auto-paginate if tool has pagination config and there are more pages + if (tool.pagination && finalResult.success && !skipPostProcess && executionContext?.executionId) { + const nextToken = tool.pagination.getNextPageToken(finalResult.output) + if (nextToken !== null) { + finalResult = await autoPaginate({ + initialResult: finalResult, + params: contextParams, + paginationConfig: tool.pagination, + executeTool, + toolId: normalizedToolId, + executionId: executionContext.executionId, + }) + } + } + // Add timing data to the result const endTime = new Date() const endTimeISO = endTime.toISOString() @@ -874,6 +890,21 @@ export async function executeTool( // Process file outputs if execution context is available finalResult = await processFileOutputs(finalResult, tool, executionContext) + // Auto-paginate if tool has pagination config and there are more pages + if (tool.pagination && finalResult.success && !skipPostProcess && executionContext?.executionId) { + const nextToken = tool.pagination.getNextPageToken(finalResult.output) + if (nextToken !== null) { + finalResult = await autoPaginate({ + initialResult: finalResult, + params: contextParams, + paginationConfig: tool.pagination, + executeTool, + toolId: normalizedToolId, + executionId: executionContext.executionId, + }) + } + } + // Add timing data to the result const endTime = new Date() const endTimeISO = endTime.toISOString() diff --git a/apps/sim/tools/types.ts b/apps/sim/tools/types.ts index 48c44535f71..0d0a992ac1b 100644 --- a/apps/sim/tools/types.ts +++ b/apps/sim/tools/types.ts @@ -1,5 +1,6 @@ import type { HostedKeyRateLimitConfig } from '@/lib/core/rate-limiter' import type { OAuthService } from '@/lib/oauth' +import type { ToolPaginationConfig } from '@/lib/paginated-cache/types' export type BYOKProviderId = | 'openai' @@ -176,6 +177,12 @@ export interface ToolConfig

{ * Usage is billed according to the pricing config. */ hosting?: ToolHostingConfig

+ + /** + * Optional pagination configuration for tools that return paginated data. + * When provided, the executor automatically fetches all pages and caches them in Redis. + */ + pagination?: ToolPaginationConfig } export interface TableRow { diff --git a/apps/sim/tools/zendesk/get_tickets.ts b/apps/sim/tools/zendesk/get_tickets.ts index ffda867b0d5..4b5953da1cb 100644 --- a/apps/sim/tools/zendesk/get_tickets.ts +++ b/apps/sim/tools/zendesk/get_tickets.ts @@ -1,6 +1,5 @@ import type { ToolConfig } from '@/tools/types' import { - appendCursorPaginationParams, buildZendeskUrl, extractCursorPagingInfo, handleZendeskError, @@ -19,7 +18,7 @@ export interface ZendeskGetTicketsParams { assigneeId?: string organizationId?: string sort?: string - perPage?: string + /** Internal: set by auto-pagination via buildNextPageParams */ pageAfter?: string } @@ -44,7 +43,7 @@ export const zendeskGetTicketsTool: ToolConfig) => (output.tickets as unknown[]) ?? [], + getNextPageToken: (output: Record) => { + const paging = output.paging as Record | undefined + if (paging?.has_more && paging?.after_cursor) { + return paging.after_cursor as string + } + return null + }, + buildNextPageParams: (params: Record, token: string | number) => ({ + ...params, + pageAfter: String(token), + }), + maxPages: 100, + }, } diff --git a/packages/testing/src/mocks/redis.mock.ts b/packages/testing/src/mocks/redis.mock.ts index d32f0286bd0..983ad853427 100644 --- a/packages/testing/src/mocks/redis.mock.ts +++ b/packages/testing/src/mocks/redis.mock.ts @@ -30,6 +30,8 @@ export function createMockRedis() { exists: vi.fn().mockResolvedValue(0), expire: vi.fn().mockResolvedValue(1), ttl: vi.fn().mockResolvedValue(-1), + mget: vi.fn().mockResolvedValue([]), + scan: vi.fn().mockResolvedValue(['0', []]), // List operations lpush: vi.fn().mockResolvedValue(1), From a2972b36c73f719220b922143bbb878da88ada7e Mon Sep 17 00:00:00 2001 From: abhinavDhulipala Date: Sat, 4 Apr 2026 15:42:37 -0700 Subject: [PATCH 2/5] last_output count fixed, other nits addressed --- apps/sim/executor/execution/block-executor.ts | 2 +- apps/sim/lib/paginated-cache/paginate.test.ts | 74 +++++++++++++++-- apps/sim/lib/paginated-cache/paginate.ts | 77 +++++++++++------- apps/sim/lib/paginated-cache/types.ts | 2 +- apps/sim/tools/index.ts | 80 ++++++++++++------- apps/sim/tools/types.ts | 2 +- apps/sim/tools/zendesk/get_tickets.ts | 12 ++- 7 files changed, 173 insertions(+), 76 deletions(-) diff --git a/apps/sim/executor/execution/block-executor.ts b/apps/sim/executor/execution/block-executor.ts index 2d56fbf707e..796c30da99e 100644 --- a/apps/sim/executor/execution/block-executor.ts +++ b/apps/sim/executor/execution/block-executor.ts @@ -1,7 +1,7 @@ import { createLogger, type Logger } from '@sim/logger' import { redactApiKeys } from '@/lib/core/security/redaction' -import { hydrateCacheReferences } from '@/lib/paginated-cache/paginate' import { getBaseUrl } from '@/lib/core/utils/urls' +import { hydrateCacheReferences } from '@/lib/paginated-cache/paginate' import { containsUserFileWithMetadata, hydrateUserFilesWithBase64, diff --git a/apps/sim/lib/paginated-cache/paginate.test.ts b/apps/sim/lib/paginated-cache/paginate.test.ts index ed9cf8f0649..9d8e1b24cfc 100644 --- a/apps/sim/lib/paginated-cache/paginate.test.ts +++ b/apps/sim/lib/paginated-cache/paginate.test.ts @@ -28,11 +28,7 @@ vi.mock('@/lib/paginated-cache/redis-cache', () => ({ import { autoPaginate, hydrateCacheReferences } from '@/lib/paginated-cache/paginate' import type { ToolResponse } from '@/tools/types' -function makePageResponse( - items: unknown[], - hasMore: boolean, - cursor: string | null -): ToolResponse { +function makePageResponse(items: unknown[], hasMore: boolean, cursor: string | null): ToolResponse { return { success: true, output: { @@ -206,7 +202,73 @@ describe('autoPaginate', () => { }) const storedCacheId = mockStoreMetadata.mock.calls[0][0] as string - expect(storedCacheId).toMatch(/^exec-42:zendesk_get_tickets:tickets:\d+$/) + expect(storedCacheId).toMatch( + /^exec-42:zendesk_get_tickets:tickets:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/ + ) + }) + + it('does not inject fields that the tool output does not have', async () => { + const noMetadataConfig = { + ...paginationConfig, + pageField: 'items', + getItems: (output: Record) => (output.items as unknown[]) ?? [], + } + const initialResult: ToolResponse = { + success: true, + output: { + items: [{ id: 1 }], + cursor: 'abc', + }, + } + + const result = await autoPaginate({ + initialResult, + params: {}, + paginationConfig: noMetadataConfig, + executeTool: mockExecuteTool, + toolId: 'custom_tool', + executionId: 'exec-1', + }) + + const outputKeys = Object.keys(result.output) + expect(outputKeys).toContain('items') + expect(outputKeys).toContain('cursor') + expect(outputKeys).not.toContain('metadata') + expect(outputKeys).not.toContain('paging') + }) +}) + +describe('cleanupPaginatedCache', () => { + let mockScan: ReturnType + let mockDel: ReturnType + + beforeEach(() => { + vi.clearAllMocks() + mockScan = vi.fn().mockResolvedValue(['0', []]) + mockDel = vi.fn().mockResolvedValue(1) + mockGetRedisClient.mockReturnValue({ scan: mockScan, del: mockDel }) + }) + + it('scans with prefix-based patterns and deletes matching keys', async () => { + mockScan + .mockResolvedValueOnce(['0', ['pagcache:page:exec-1:tool:field:uuid:0']]) + .mockResolvedValueOnce(['0', ['pagcache:meta:exec-1:tool:field:uuid']]) + + const { cleanupPaginatedCache } = await import('@/lib/paginated-cache/paginate') + await cleanupPaginatedCache('exec-1') + + expect(mockScan).toHaveBeenCalledWith('0', 'MATCH', 'pagcache:page:exec-1:*', 'COUNT', 100) + expect(mockScan).toHaveBeenCalledWith('0', 'MATCH', 'pagcache:meta:exec-1:*', 'COUNT', 100) + expect(mockDel).toHaveBeenCalledTimes(2) + }) + + it('no-ops when Redis is unavailable', async () => { + mockGetRedisClient.mockReturnValue(null) + + const { cleanupPaginatedCache } = await import('@/lib/paginated-cache/paginate') + await cleanupPaginatedCache('exec-1') + + expect(mockScan).not.toHaveBeenCalled() }) }) diff --git a/apps/sim/lib/paginated-cache/paginate.ts b/apps/sim/lib/paginated-cache/paginate.ts index c55a9b75e6e..72862a87b41 100644 --- a/apps/sim/lib/paginated-cache/paginate.ts +++ b/apps/sim/lib/paginated-cache/paginate.ts @@ -1,13 +1,15 @@ +import crypto from 'node:crypto' import { createLogger } from '@sim/logger' import { getRedisClient } from '@/lib/core/config/redis' +import type { PaginatedCacheStorageAdapter } from '@/lib/paginated-cache/adapter' import { RedisPaginatedCache } from '@/lib/paginated-cache/redis-cache' -import { isPaginatedCacheReference } from '@/lib/paginated-cache/types' import type { PaginatedCacheReference, ToolPaginationConfig } from '@/lib/paginated-cache/types' +import { isPaginatedCacheReference } from '@/lib/paginated-cache/types' import type { ToolResponse } from '@/tools/types' const logger = createLogger('Paginator') -const DEFAULT_MAX_PAGES = 100 +const DEFAULT_MAX_PAGES = 10_000 interface AutoPaginateOptions { initialResult: ToolResponse @@ -23,8 +25,14 @@ interface AutoPaginateOptions { } export async function autoPaginate(options: AutoPaginateOptions): Promise { - const { initialResult, params, paginationConfig: config, executeTool, toolId, executionId } = - options + const { + initialResult, + params, + paginationConfig: config, + executeTool, + toolId, + executionId, + } = options const maxPages = config.maxPages ?? DEFAULT_MAX_PAGES const redis = getRedisClient() @@ -33,7 +41,7 @@ export async function autoPaginate(options: AutoPaginateOptions): Promise + + const redis = getRedisClient() + if (!redis) { + throw new Error('Redis is required to hydrate paginated cache references but is not available') + } + + const adapter = new RedisPaginatedCache(redis) + return (await deepHydrate(inputs, adapter)) as Record } function containsCacheReference(value: unknown): boolean { @@ -109,20 +124,23 @@ function containsCacheReference(value: unknown): boolean { return false } -async function deepHydrate(value: unknown): Promise { +async function deepHydrate( + value: unknown, + adapter: PaginatedCacheStorageAdapter +): Promise { if (isPaginatedCacheReference(value)) { - return hydrateReference(value) + return hydrateReference(value, adapter) } if (Array.isArray(value)) { - return Promise.all(value.map(deepHydrate)) + return Promise.all(value.map((v) => deepHydrate(v, adapter))) } if (typeof value === 'object' && value !== null) { const entries = Object.entries(value as Record) const hydrated: Record = {} for (const [key, val] of entries) { - hydrated[key] = await deepHydrate(val) + hydrated[key] = await deepHydrate(val, adapter) } return hydrated } @@ -130,16 +148,11 @@ async function deepHydrate(value: unknown): Promise { return value } -async function hydrateReference(ref: PaginatedCacheReference): Promise { - const redis = getRedisClient() - if (!redis) { - throw new Error( - `Redis is required to hydrate paginated cache reference (cacheId: ${ref.cacheId}) but is not available` - ) - } - - const cache = new RedisPaginatedCache(redis) - const pages = await cache.getAllPages(ref.cacheId, ref.totalPages) +async function hydrateReference( + ref: PaginatedCacheReference, + adapter: PaginatedCacheStorageAdapter +): Promise { + const pages = await adapter.getAllPages(ref.cacheId, ref.totalPages) const items: unknown[] = [] for (const page of pages) { @@ -165,21 +178,23 @@ export async function cleanupPaginatedCache(executionId: string): Promise return } - const pattern = `pagcache:*${executionId}:*` + const patterns = [`pagcache:page:${executionId}:*`, `pagcache:meta:${executionId}:*`] try { - let cursor = '0' let deletedCount = 0 - do { - const [nextCursor, keys] = await redis.scan(cursor, 'MATCH', pattern, 'COUNT', 100) - cursor = nextCursor - - if (keys.length > 0) { - await redis.del(...keys) - deletedCount += keys.length - } - } while (cursor !== '0') + for (const pattern of patterns) { + let cursor = '0' + do { + const [nextCursor, keys] = await redis.scan(cursor, 'MATCH', pattern, 'COUNT', 100) + cursor = nextCursor + + if (keys.length > 0) { + await redis.del(...keys) + deletedCount += keys.length + } + } while (cursor !== '0') + } if (deletedCount > 0) { logger.info(`Cleaned up ${deletedCount} paginated cache entries for execution ${executionId}`) diff --git a/apps/sim/lib/paginated-cache/types.ts b/apps/sim/lib/paginated-cache/types.ts index ca71c73038c..42abee46285 100644 --- a/apps/sim/lib/paginated-cache/types.ts +++ b/apps/sim/lib/paginated-cache/types.ts @@ -13,7 +13,7 @@ export interface ToolPaginationConfig> { currentParams: Record, token: string | number ) => Record - /** Maximum pages to fetch. Default: 100 */ + /** Maximum pages to fetch. Default: 10,000 */ maxPages?: number } diff --git a/apps/sim/tools/index.ts b/apps/sim/tools/index.ts index 7a66d350a89..104b90d1692 100644 --- a/apps/sim/tools/index.ts +++ b/apps/sim/tools/index.ts @@ -13,6 +13,7 @@ import { generateRequestId } from '@/lib/core/utils/request' import { getBaseUrl, getInternalApiBaseUrl } from '@/lib/core/utils/urls' import { SIM_VIA_HEADER, serializeCallChain } from '@/lib/execution/call-chain' import { parseMcpToolId } from '@/lib/mcp/utils' +import { autoPaginate } from '@/lib/paginated-cache/paginate' import { isCustomTool, isMcpTool } from '@/executor/constants' import { resolveSkillContent } from '@/executor/handlers/agent/skills-resolver' import type { ExecutionContext } from '@/executor/types' @@ -26,7 +27,6 @@ import type { ToolResponse, ToolRetryConfig, } from '@/tools/types' -import { autoPaginate } from '@/lib/paginated-cache/paginate' import { formatRequestParams, getTool, validateRequiredParametersAfterMerge } from '@/tools/utils' import * as toolsUtilsServer from '@/tools/utils.server' @@ -600,6 +600,40 @@ async function processFileOutputs( } } +/** + * If the tool has a pagination config and there are more pages, auto-paginate + * and replace the page field with a Redis cache reference. + */ +async function maybeAutoPaginate( + tool: ToolConfig, + finalResult: ToolResponse, + contextParams: Record, + normalizedToolId: string, + skipPostProcess: boolean, + executionContext?: ExecutionContext +): Promise { + if ( + !tool.pagination || + !finalResult.success || + skipPostProcess || + !executionContext?.executionId + ) { + return finalResult + } + const nextToken = tool.pagination.getNextPageToken(finalResult.output) + if (nextToken === null) { + return finalResult + } + return autoPaginate({ + initialResult: finalResult, + params: contextParams, + paginationConfig: tool.pagination, + executeTool, + toolId: normalizedToolId, + executionId: executionContext.executionId, + }) +} + /** * Execute a tool by making the appropriate HTTP request * All requests go directly - internal routes use regular fetch, external use SSRF-protected fetch @@ -820,20 +854,14 @@ export async function executeTool( // Process file outputs if execution context is available finalResult = await processFileOutputs(finalResult, tool, executionContext) - // Auto-paginate if tool has pagination config and there are more pages - if (tool.pagination && finalResult.success && !skipPostProcess && executionContext?.executionId) { - const nextToken = tool.pagination.getNextPageToken(finalResult.output) - if (nextToken !== null) { - finalResult = await autoPaginate({ - initialResult: finalResult, - params: contextParams, - paginationConfig: tool.pagination, - executeTool, - toolId: normalizedToolId, - executionId: executionContext.executionId, - }) - } - } + finalResult = await maybeAutoPaginate( + tool, + finalResult, + contextParams, + normalizedToolId, + skipPostProcess, + executionContext + ) // Add timing data to the result const endTime = new Date() @@ -890,20 +918,14 @@ export async function executeTool( // Process file outputs if execution context is available finalResult = await processFileOutputs(finalResult, tool, executionContext) - // Auto-paginate if tool has pagination config and there are more pages - if (tool.pagination && finalResult.success && !skipPostProcess && executionContext?.executionId) { - const nextToken = tool.pagination.getNextPageToken(finalResult.output) - if (nextToken !== null) { - finalResult = await autoPaginate({ - initialResult: finalResult, - params: contextParams, - paginationConfig: tool.pagination, - executeTool, - toolId: normalizedToolId, - executionId: executionContext.executionId, - }) - } - } + finalResult = await maybeAutoPaginate( + tool, + finalResult, + contextParams, + normalizedToolId, + skipPostProcess, + executionContext + ) // Add timing data to the result const endTime = new Date() diff --git a/apps/sim/tools/types.ts b/apps/sim/tools/types.ts index 0d0a992ac1b..33700fdf10b 100644 --- a/apps/sim/tools/types.ts +++ b/apps/sim/tools/types.ts @@ -182,7 +182,7 @@ export interface ToolConfig

{ * Optional pagination configuration for tools that return paginated data. * When provided, the executor automatically fetches all pages and caches them in Redis. */ - pagination?: ToolPaginationConfig + pagination?: ToolPaginationConfig> } export interface TableRow { diff --git a/apps/sim/tools/zendesk/get_tickets.ts b/apps/sim/tools/zendesk/get_tickets.ts index 4b5953da1cb..f6d9f2aa417 100644 --- a/apps/sim/tools/zendesk/get_tickets.ts +++ b/apps/sim/tools/zendesk/get_tickets.ts @@ -185,18 +185,16 @@ export const zendeskGetTicketsTool: ToolConfig) => (output.tickets as unknown[]) ?? [], - getNextPageToken: (output: Record) => { - const paging = output.paging as Record | undefined - if (paging?.has_more && paging?.after_cursor) { - return paging.after_cursor as string + getItems: (output) => output.tickets ?? [], + getNextPageToken: (output) => { + if (output.paging?.has_more && output.paging?.after_cursor) { + return output.paging.after_cursor } return null }, - buildNextPageParams: (params: Record, token: string | number) => ({ + buildNextPageParams: (params, token) => ({ ...params, pageAfter: String(token), }), - maxPages: 100, }, } From 87a1c62e177a26af0c2748a9f95949ca9d1b0077 Mon Sep 17 00:00:00 2001 From: abhinavDhulipala Date: Sat, 4 Apr 2026 17:05:54 -0700 Subject: [PATCH 3/5] pass execution context through & other minor bug fixes --- apps/sim/lib/paginated-cache/paginate.test.ts | 6 ++++- apps/sim/lib/paginated-cache/paginate.ts | 9 +++++-- .../executor/queued-workflow-execution.ts | 17 ++++--------- apps/sim/lib/workflows/streaming/streaming.ts | 24 ++++++++++--------- apps/sim/tools/index.ts | 10 ++++---- 5 files changed, 35 insertions(+), 31 deletions(-) diff --git a/apps/sim/lib/paginated-cache/paginate.test.ts b/apps/sim/lib/paginated-cache/paginate.test.ts index 9d8e1b24cfc..0586ad75dd1 100644 --- a/apps/sim/lib/paginated-cache/paginate.test.ts +++ b/apps/sim/lib/paginated-cache/paginate.test.ts @@ -169,9 +169,10 @@ describe('autoPaginate', () => { ).rejects.toThrow('Auto-pagination failed on page 1: rate limited') }) - it('passes skipPostProcess=true for subsequent pages', async () => { + it('passes executionContext and skipAutoPaginate for subsequent pages', async () => { const initialResult = makePageResponse([{ id: 1 }], true, 'cursor-1') mockExecuteTool.mockResolvedValueOnce(makePageResponse([{ id: 2 }], false, null)) + const mockContext = { workflowId: 'wf-1', executionId: 'exec-1' } await autoPaginate({ initialResult, @@ -180,11 +181,14 @@ describe('autoPaginate', () => { executeTool: mockExecuteTool, toolId: 'zendesk_get_tickets', executionId: 'exec-1', + executionContext: mockContext as never, }) expect(mockExecuteTool).toHaveBeenCalledWith( 'zendesk_get_tickets', expect.objectContaining({ pageAfter: 'cursor-1' }), + false, + mockContext, true ) }) diff --git a/apps/sim/lib/paginated-cache/paginate.ts b/apps/sim/lib/paginated-cache/paginate.ts index 72862a87b41..adf8bbe231a 100644 --- a/apps/sim/lib/paginated-cache/paginate.ts +++ b/apps/sim/lib/paginated-cache/paginate.ts @@ -5,6 +5,7 @@ import type { PaginatedCacheStorageAdapter } from '@/lib/paginated-cache/adapter import { RedisPaginatedCache } from '@/lib/paginated-cache/redis-cache' import type { PaginatedCacheReference, ToolPaginationConfig } from '@/lib/paginated-cache/types' import { isPaginatedCacheReference } from '@/lib/paginated-cache/types' +import type { ExecutionContext } from '@/executor/types' import type { ToolResponse } from '@/tools/types' const logger = createLogger('Paginator') @@ -18,10 +19,13 @@ interface AutoPaginateOptions { executeTool: ( toolId: string, params: Record, - skipPostProcess?: boolean + skipPostProcess?: boolean, + executionContext?: ExecutionContext, + skipAutoPaginate?: boolean ) => Promise toolId: string executionId: string + executionContext?: ExecutionContext } export async function autoPaginate(options: AutoPaginateOptions): Promise { @@ -32,6 +36,7 @@ export async function autoPaginate(options: AutoPaginateOptions): Promise { - logger.error('Failed to cleanup queued workflow base64 cache', { - executionId, - error: error instanceof Error ? error.message : String(error), - }) - }) - - await cleanupPaginatedCache(executionId).catch((error) => { - logger.error('Failed to cleanup queued workflow paginated cache', { - executionId, - error: error instanceof Error ? error.message : String(error), - }) - }) + await Promise.allSettled([ + cleanupExecutionBase64Cache(executionId), + cleanupPaginatedCache(executionId), + ]) } } diff --git a/apps/sim/lib/workflows/streaming/streaming.ts b/apps/sim/lib/workflows/streaming/streaming.ts index 7efd4be1ac4..d319588004e 100644 --- a/apps/sim/lib/workflows/streaming/streaming.ts +++ b/apps/sim/lib/workflows/streaming/streaming.ts @@ -7,8 +7,8 @@ import { } from '@/lib/core/utils/response-format' import { encodeSSE } from '@/lib/core/utils/sse' import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' -import { processStreamingBlockLogs } from '@/lib/tokenization' import { cleanupPaginatedCache } from '@/lib/paginated-cache/paginate' +import { processStreamingBlockLogs } from '@/lib/tokenization' import { cleanupExecutionBase64Cache, hydrateUserFilesWithBase64, @@ -348,8 +348,10 @@ export async function createStreamingResponse( controller.enqueue(encodeSSE('[DONE]')) if (executionId) { - await cleanupExecutionBase64Cache(executionId) - await cleanupPaginatedCache(executionId) + await Promise.allSettled([ + cleanupExecutionBase64Cache(executionId), + cleanupPaginatedCache(executionId), + ]) } controller.close() @@ -360,8 +362,10 @@ export async function createStreamingResponse( ) if (executionId) { - await cleanupExecutionBase64Cache(executionId) - await cleanupPaginatedCache(executionId) + await Promise.allSettled([ + cleanupExecutionBase64Cache(executionId), + cleanupPaginatedCache(executionId), + ]) } controller.close() @@ -374,12 +378,10 @@ export async function createStreamingResponse( timeoutController.abort() timeoutController.cleanup() if (executionId) { - try { - await cleanupExecutionBase64Cache(executionId) - await cleanupPaginatedCache(executionId) - } catch (error) { - logger.error(`[${requestId}] Failed to cleanup cache`, { error }) - } + await Promise.allSettled([ + cleanupExecutionBase64Cache(executionId), + cleanupPaginatedCache(executionId), + ]) } }, }) diff --git a/apps/sim/tools/index.ts b/apps/sim/tools/index.ts index 104b90d1692..dc957f33c3f 100644 --- a/apps/sim/tools/index.ts +++ b/apps/sim/tools/index.ts @@ -609,13 +609,13 @@ async function maybeAutoPaginate( finalResult: ToolResponse, contextParams: Record, normalizedToolId: string, - skipPostProcess: boolean, + skipAutoPaginate: boolean, executionContext?: ExecutionContext ): Promise { if ( !tool.pagination || !finalResult.success || - skipPostProcess || + skipAutoPaginate || !executionContext?.executionId ) { return finalResult @@ -631,6 +631,7 @@ async function maybeAutoPaginate( executeTool, toolId: normalizedToolId, executionId: executionContext.executionId, + executionContext, }) } @@ -642,7 +643,8 @@ export async function executeTool( toolId: string, params: Record, skipPostProcess = false, - executionContext?: ExecutionContext + executionContext?: ExecutionContext, + skipAutoPaginate = false ): Promise { // Capture start time for precise timing const startTime = new Date() @@ -859,7 +861,7 @@ export async function executeTool( finalResult, contextParams, normalizedToolId, - skipPostProcess, + skipAutoPaginate, executionContext ) From 9ae620991fc18cfa3615e944ebf2f6c6c257c04f Mon Sep 17 00:00:00 2001 From: abhinavDhulipala Date: Sat, 4 Apr 2026 18:15:30 -0700 Subject: [PATCH 4/5] zendesk v2 with paginated output --- apps/sim/blocks/blocks/zendesk.ts | 4 +- apps/sim/lib/paginated-cache/index.ts | 4 +- apps/sim/lib/paginated-cache/paginate.test.ts | 10 +- apps/sim/lib/paginated-cache/paginate.ts | 7 +- apps/sim/lib/paginated-cache/redis-cache.ts | 2 +- apps/sim/lib/paginated-cache/types.test.ts | 1 - apps/sim/tools/index.ts | 6 +- apps/sim/tools/registry.ts | 2 + apps/sim/tools/zendesk/get_tickets.ts | 39 ++-- apps/sim/tools/zendesk/get_tickets_v2.ts | 202 ++++++++++++++++++ apps/sim/tools/zendesk/index.ts | 1 + 11 files changed, 236 insertions(+), 42 deletions(-) create mode 100644 apps/sim/tools/zendesk/get_tickets_v2.ts diff --git a/apps/sim/blocks/blocks/zendesk.ts b/apps/sim/blocks/blocks/zendesk.ts index 4e37ed44d6d..d00ac2d4d2a 100644 --- a/apps/sim/blocks/blocks/zendesk.ts +++ b/apps/sim/blocks/blocks/zendesk.ts @@ -496,6 +496,7 @@ Return ONLY the search query - no explanations.`, condition: { field: 'operation', value: [ + 'get_tickets', 'get_users', 'get_organizations', 'search_users', @@ -533,6 +534,7 @@ Return ONLY the search query - no explanations.`, tools: { access: [ 'zendesk_get_tickets', + 'zendesk_get_tickets_v2', 'zendesk_get_ticket', 'zendesk_create_ticket', 'zendesk_create_tickets_bulk', @@ -563,7 +565,7 @@ Return ONLY the search query - no explanations.`, tool: (params) => { switch (params.operation) { case 'get_tickets': - return 'zendesk_get_tickets' + return 'zendesk_get_tickets_v2' case 'get_ticket': return 'zendesk_get_ticket' case 'create_ticket': diff --git a/apps/sim/lib/paginated-cache/index.ts b/apps/sim/lib/paginated-cache/index.ts index 4749a0d8584..64392fe35f9 100644 --- a/apps/sim/lib/paginated-cache/index.ts +++ b/apps/sim/lib/paginated-cache/index.ts @@ -1,14 +1,14 @@ export type { PaginatedCacheStorageAdapter } from '@/lib/paginated-cache/adapter' -export { RedisPaginatedCache } from '@/lib/paginated-cache/redis-cache' export { autoPaginate, cleanupPaginatedCache, hydrateCacheReferences, } from '@/lib/paginated-cache/paginate' +export { RedisPaginatedCache } from '@/lib/paginated-cache/redis-cache' export { - isPaginatedCacheReference, type CachedPage, type CacheMetadata, + isPaginatedCacheReference, type PaginatedCacheReference, type ToolPaginationConfig, } from '@/lib/paginated-cache/types' diff --git a/apps/sim/lib/paginated-cache/paginate.test.ts b/apps/sim/lib/paginated-cache/paginate.test.ts index 0586ad75dd1..ed2dad14664 100644 --- a/apps/sim/lib/paginated-cache/paginate.test.ts +++ b/apps/sim/lib/paginated-cache/paginate.test.ts @@ -126,8 +126,9 @@ describe('autoPaginate', () => { expect(result.output.tickets).toEqual( expect.objectContaining({ _type: 'paginated_cache_ref', totalPages: 3, totalItems: 3 }) ) - expect(result.output.paging).toEqual({ has_more: false, after_cursor: null }) - expect(result.output.metadata).toEqual({ total_returned: 1, has_more: false }) + expect(result.output.paging).toBeUndefined() + expect(result.output.metadata).toBeUndefined() + expect(Object.keys(result.output)).toEqual(['tickets']) }) it('respects maxPages', async () => { @@ -235,10 +236,7 @@ describe('autoPaginate', () => { }) const outputKeys = Object.keys(result.output) - expect(outputKeys).toContain('items') - expect(outputKeys).toContain('cursor') - expect(outputKeys).not.toContain('metadata') - expect(outputKeys).not.toContain('paging') + expect(outputKeys).toEqual(['items']) }) }) diff --git a/apps/sim/lib/paginated-cache/paginate.ts b/apps/sim/lib/paginated-cache/paginate.ts index adf8bbe231a..7659d6116b5 100644 --- a/apps/sim/lib/paginated-cache/paginate.ts +++ b/apps/sim/lib/paginated-cache/paginate.ts @@ -50,9 +50,8 @@ export async function autoPaginate(options: AutoPaginateOptions): Promise { diff --git a/apps/sim/tools/index.ts b/apps/sim/tools/index.ts index dc957f33c3f..423ac3910f6 100644 --- a/apps/sim/tools/index.ts +++ b/apps/sim/tools/index.ts @@ -620,10 +620,6 @@ async function maybeAutoPaginate( ) { return finalResult } - const nextToken = tool.pagination.getNextPageToken(finalResult.output) - if (nextToken === null) { - return finalResult - } return autoPaginate({ initialResult: finalResult, params: contextParams, @@ -925,7 +921,7 @@ export async function executeTool( finalResult, contextParams, normalizedToolId, - skipPostProcess, + skipAutoPaginate, executionContext ) diff --git a/apps/sim/tools/registry.ts b/apps/sim/tools/registry.ts index 7508d98dac5..6594cb19268 100644 --- a/apps/sim/tools/registry.ts +++ b/apps/sim/tools/registry.ts @@ -2669,6 +2669,7 @@ import { zendeskGetOrganizationsTool, zendeskGetOrganizationTool, zendeskGetTicketsTool, + zendeskGetTicketsV2Tool, zendeskGetTicketTool, zendeskGetUsersTool, zendeskGetUserTool, @@ -5016,6 +5017,7 @@ export const tools: Record = { mailchimp_create_batch_operation: mailchimpCreateBatchOperationTool, mailchimp_delete_batch_operation: mailchimpDeleteBatchOperationTool, zendesk_get_tickets: zendeskGetTicketsTool, + zendesk_get_tickets_v2: zendeskGetTicketsV2Tool, zendesk_get_ticket: zendeskGetTicketTool, zendesk_create_ticket: zendeskCreateTicketTool, zendesk_create_tickets_bulk: zendeskCreateTicketsBulkTool, diff --git a/apps/sim/tools/zendesk/get_tickets.ts b/apps/sim/tools/zendesk/get_tickets.ts index f6d9f2aa417..ffda867b0d5 100644 --- a/apps/sim/tools/zendesk/get_tickets.ts +++ b/apps/sim/tools/zendesk/get_tickets.ts @@ -1,5 +1,6 @@ import type { ToolConfig } from '@/tools/types' import { + appendCursorPaginationParams, buildZendeskUrl, extractCursorPagingInfo, handleZendeskError, @@ -18,7 +19,7 @@ export interface ZendeskGetTicketsParams { assigneeId?: string organizationId?: string sort?: string - /** Internal: set by auto-pagination via buildNextPageParams */ + perPage?: string pageAfter?: string } @@ -43,7 +44,7 @@ export const zendeskGetTicketsTool: ToolConfig output.tickets ?? [], - getNextPageToken: (output) => { - if (output.paging?.has_more && output.paging?.after_cursor) { - return output.paging.after_cursor - } - return null - }, - buildNextPageParams: (params, token) => ({ - ...params, - pageAfter: String(token), - }), - }, } diff --git a/apps/sim/tools/zendesk/get_tickets_v2.ts b/apps/sim/tools/zendesk/get_tickets_v2.ts new file mode 100644 index 00000000000..b0f2c781ed1 --- /dev/null +++ b/apps/sim/tools/zendesk/get_tickets_v2.ts @@ -0,0 +1,202 @@ +import type { ToolConfig } from '@/tools/types' +import { + buildZendeskUrl, + extractCursorPagingInfo, + handleZendeskError, + TICKETS_ARRAY_OUTPUT, +} from '@/tools/zendesk/types' + +export interface ZendeskGetTicketsV2Params { + email: string + apiToken: string + subdomain: string + status?: string + priority?: string + type?: string + assigneeId?: string + organizationId?: string + sort?: string + perPage?: string + /** Internal: set by auto-pagination via buildNextPageParams */ + pageAfter?: string +} + +export interface ZendeskGetTicketsV2Response { + success: boolean + output: { + tickets: any[] + paging?: { + after_cursor: string | null + has_more: boolean + } + metadata: { + total_returned: number + has_more: boolean + } + success: boolean + } +} + +export const zendeskGetTicketsV2Tool: ToolConfig< + ZendeskGetTicketsV2Params, + ZendeskGetTicketsV2Response +> = { + id: 'zendesk_get_tickets_v2', + name: 'Get All Tickets from Zendesk', + description: + 'Retrieve all tickets from Zendesk with optional filtering. Automatically paginates through all results.', + version: '2.0.0', + + params: { + email: { + type: 'string', + required: true, + visibility: 'user-only', + description: 'Your Zendesk email address', + }, + apiToken: { + type: 'string', + required: true, + visibility: 'hidden', + description: 'Zendesk API token', + }, + subdomain: { + type: 'string', + required: true, + visibility: 'user-only', + description: 'Your Zendesk subdomain (e.g., "mycompany" for mycompany.zendesk.com)', + }, + status: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Filter by status: "new", "open", "pending", "hold", "solved", or "closed"', + }, + priority: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Filter by priority: "low", "normal", "high", or "urgent"', + }, + type: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Filter by type: "problem", "incident", "question", or "task"', + }, + assigneeId: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Filter by assignee user ID as a numeric string (e.g., "12345")', + }, + organizationId: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Filter by organization ID as a numeric string (e.g., "67890")', + }, + sort: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: + 'Sort field for ticket listing (only applies without filters): "updated_at", "id", or "status". Prefix with "-" for descending (e.g., "-updated_at")', + }, + perPage: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Results per page as a number string (default: "100", max: "100")', + }, + }, + + request: { + url: (params) => { + const hasFilters = + params.status || + params.priority || + params.type || + params.assigneeId || + params.organizationId + + if (hasFilters) { + const searchTerms: string[] = ['type:ticket'] + if (params.status) searchTerms.push(`status:${params.status}`) + if (params.priority) searchTerms.push(`priority:${params.priority}`) + if (params.type) searchTerms.push(`ticket_type:${params.type}`) + if (params.assigneeId) searchTerms.push(`assignee_id:${params.assigneeId}`) + if (params.organizationId) searchTerms.push(`organization_id:${params.organizationId}`) + + const queryParams = new URLSearchParams() + queryParams.append('query', searchTerms.join(' ')) + queryParams.append('filter[type]', 'ticket') + queryParams.append('page[size]', params.perPage || '100') + if (params.pageAfter) queryParams.append('page[after]', params.pageAfter) + + return `${buildZendeskUrl(params.subdomain, '/search/export')}?${queryParams.toString()}` + } + + const queryParams = new URLSearchParams() + if (params.sort) queryParams.append('sort', params.sort) + queryParams.append('page[size]', params.perPage || '100') + if (params.pageAfter) queryParams.append('page[after]', params.pageAfter) + + const query = queryParams.toString() + const url = buildZendeskUrl(params.subdomain, '/tickets') + return query ? `${url}?${query}` : url + }, + method: 'GET', + headers: (params) => { + const credentials = `${params.email}/token:${params.apiToken}` + const base64Credentials = Buffer.from(credentials).toString('base64') + return { + Authorization: `Basic ${base64Credentials}`, + 'Content-Type': 'application/json', + } + }, + }, + + transformResponse: async (response: Response) => { + if (!response.ok) { + const data = await response.json() + handleZendeskError(data, response.status, 'get_tickets') + } + + const data = await response.json() + const tickets = data.tickets || data.results || [] + const paging = extractCursorPagingInfo(data) + + return { + success: true, + output: { + tickets, + paging, + metadata: { + total_returned: tickets.length, + has_more: paging.has_more, + }, + success: true, + }, + } + }, + + outputs: { + tickets: TICKETS_ARRAY_OUTPUT, + }, + + pagination: { + pageField: 'tickets', + getItems: (output) => output.tickets ?? [], + getNextPageToken: (output) => { + if (output.paging?.has_more && output.paging?.after_cursor) { + return output.paging.after_cursor + } + return null + }, + buildNextPageParams: (params, token) => ({ + ...params, + pageAfter: String(token), + }), + }, +} diff --git a/apps/sim/tools/zendesk/index.ts b/apps/sim/tools/zendesk/index.ts index 20e60c0e039..dc154cfaa92 100644 --- a/apps/sim/tools/zendesk/index.ts +++ b/apps/sim/tools/zendesk/index.ts @@ -13,6 +13,7 @@ export { zendeskGetOrganizationTool } from './get_organization' export { zendeskGetOrganizationsTool } from './get_organizations' export { zendeskGetTicketTool } from './get_ticket' export { zendeskGetTicketsTool } from './get_tickets' +export { zendeskGetTicketsV2Tool } from './get_tickets_v2' export { zendeskGetUserTool } from './get_user' export { zendeskGetUsersTool } from './get_users' export { zendeskMergeTicketsTool } from './merge_tickets' From 3fb6e344b720be7c5806d215d0b7ce3315a3488c Mon Sep 17 00:00:00 2001 From: abhinavDhulipala Date: Sat, 4 Apr 2026 19:17:13 -0700 Subject: [PATCH 5/5] hydrate block state on server side output --- apps/sim/executor/execution/block-executor.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/apps/sim/executor/execution/block-executor.ts b/apps/sim/executor/execution/block-executor.ts index 796c30da99e..94c8f168ba9 100644 --- a/apps/sim/executor/execution/block-executor.ts +++ b/apps/sim/executor/execution/block-executor.ts @@ -110,7 +110,6 @@ export class BlockExecutor { } resolvedInputs = this.resolver.resolveInputs(ctx, node.id, block.config.params, block) - resolvedInputs = await hydrateCacheReferences(resolvedInputs) if (blockLog) { blockLog.input = this.sanitizeInputsForLog(resolvedInputs) @@ -169,6 +168,10 @@ export class BlockExecutor { })) as NormalizedBlockOutput } + normalizedOutput = (await hydrateCacheReferences( + normalizedOutput as Record + )) as NormalizedBlockOutput + const duration = performance.now() - startTime if (blockLog) {