Skip to content

Fix IcebergIO conn pool crash by moving FileIO lifecycle to @Teardown#38149

Open
dejii wants to merge 4 commits intoapache:masterfrom
dejii:fix/iceberg-fileio-lifecycle
Open

Fix IcebergIO conn pool crash by moving FileIO lifecycle to @Teardown#38149
dejii wants to merge 4 commits intoapache:masterfrom
dejii:fix/iceberg-fileio-lifecycle

Conversation

@dejii
Copy link
Copy Markdown
Contributor

@dejii dejii commented Apr 13, 2026

Problem

IcebergIO write pipelines fail with "connection pool shut down" errors after processing more than one bundle on the same worker thread. The root cause:

  • The catalog is @MonotonicNonNull on the DoFn — created once, reused across all bundles on that DoFn instance
  • RecordWriterManager.close() (called in @FinishBundle) was calling table.io().close() to release FileIO resources
  • RESTSessionCatalog share a single FileIO across all tables. Closing it per-bundle permanently kills the connection pool on the catalog — all subsequent bundles fail

Fix

Remove FileIO close from RecordWriterManager — it must not own the lifecycle of a resource it doesn't own.

Add @Teardown to all IcebergIO write DoFns — the catalog (and its underlying FileIO) is closed exactly once when the DoFn instance is destroyed by the runner, not after each bundle:

  • WriteUngroupedRowsToFilesDoFn
  • WriteGroupedRowsToFilesDoFn
  • WriteDirectRowsToFilesDoFn
  • AppendFilesToTablesDoFn

Catalog compatibility

All major Iceberg catalogs verified against source:

Catalog FileIO in close()
RESTSessionCatalog ✅ via CloseableGroup + FileIOTracker - 1, 2
GlueCatalog ✅ via CloseableGroup + FileIOTracker - 1, 2
JdbcCatalog ✅ via CloseableGroup - 1, 2
NessieCatalog ✅ via CloseableGroup - 1, 2
HadoopCatalog N/A — defaults to HadoopFileIO which has no close() (HadoopFileIO.java)
HiveCatalog N/A — defaults to HadoopFileIO which has no close() (HadoopFileIO.java)

Tests

  • testRecordWriterManagerDoesNotCloseSharedFileIO — verifies FileIO survives RecordWriterManager.close()
  • testFileIOSurvivesAcrossBundles — two sequential bundles succeed with shared FileIO (core regression test)
  • testFullLifecycleBundlesThenCatalogClose — full DoFn lifecycle: multiple bundles + catalog.close() at teardown
  • testNonCloseableCatalogSkippedOnTeardown — verifies non-Closeable catalog (HadoopCatalog) is safely skipped

Also verified in production on Google Cloud Dataflow using a custom build of this fix. The pipeline writes to dynamic Iceberg destinations (multiple tables from a single pipeline). No connection pool errors observed with this fix.

follow up on: #37782


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a critical issue where IcebergIO write pipelines fail due to premature connection pool closure. By shifting the responsibility of closing the FileIO from the per-bundle RecordWriterManager to the DoFn's @teardown lifecycle, we ensure that shared resources remain active across multiple bundles, preventing 'connection pool shut down' errors while maintaining proper resource cleanup.

Highlights

  • Resource Lifecycle Management: Removed FileIO closure from RecordWriterManager.close() to prevent premature connection pool termination.
  • Teardown Implementation: Added @teardown methods to IcebergIO write DoFns to ensure the catalog is closed exactly once when the DoFn instance is destroyed.
  • Regression Testing: Added comprehensive tests to verify FileIO persistence across bundles and proper cleanup during DoFn teardown.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 13, 2026

@ahmedabu98 following up on #37782 - that fix correctly moved FileIO close from RecordWriter to RecordWriterManager, but it turns out there's a deeper issue that only manifests under high write volume to dynamic destinations (many bundles per worker).

The root cause: the catalog is @MonotonicNonNull on the DoFn and reused across all bundles on the same instance. RecordWriterManager.close() is called per bundle (@FinishBundle), so closing FileIO there, even deduplicated, kills the catalog's shared connection pool for all subsequent bundles on that DoFn.

This PR removes FileIO close from RecordWriterManager entirely and adds @Teardown to all four IcebergIO write DoFns, so the catalog (and its FileIO) is closed exactly once when the DoFn instance is destroyed.

Would appreciate your review here.

@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 13, 2026

The alternative from Beam's side would create brittle coupling to Iceberg internals. @Teardown is a clean boundary: Beam manages the catalog lifecycle, the catalog manages everything it owns.

@github-actions
Copy link
Copy Markdown
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 13, 2026

The check failures are not related to the code changes. e.g:

:sdks:java:maven-archetypes:examples:generateSources: generate-sources.sh exited with code 127 (command not found)

@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 13, 2026

assign set of reviewers

@github-actions
Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @chamikaramj for label java.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@ahmedabu98
Copy link
Copy Markdown
Contributor

Thanks @dejii, overall this looks good. I'm wondering though, if our assumption that closing the Catalog will indeed close all underlying FileIOs. Did you check if this is true for different catalog implementations?

Copy link
Copy Markdown
Contributor

@stankiewicz stankiewicz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit lost with overall lifecycle of catalog here.

@stankiewicz
Copy link
Copy Markdown
Contributor

hey,

I understand the need to close the catalog.
Doing it per bundle is an overkill, especially if for each bundle there are many tables to write to, catalog will waste plenty of time on loading table credentials.
Not doing it at all, especially with dynamic destinations and with vended credentials, fileIOTracker will grow without limits and we may constantly refresh credentials for tables that we don't need credentials anymore.

@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 13, 2026

Hey @stankiewicz - thanks for the review.

Doing it per bundle is an overkill, especially if for each bundle there are many tables to write to, catalog will waste plenty of time on loading table credentials.

Just to clarify - closing FileIO per bundle is the current behavior, and it's not wasted time but thrown errors when using dynamic destinations: the catalog tries to reuse a dead connection pool on every subsequent bundle, and the pipeline ultimately fails.

@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 13, 2026

Thanks @dejii, overall this looks good. I'm wondering though, if our assumption that closing the Catalog will indeed close all underlying FileIOs. Did you check if this is true for different catalog implementations?

@ahmedabu98 Yes, confirmed - verified against source and included links in the PR description. REST, Glue, JDBC, and Nessie catalogs all properly close their FileIO via CloseableGroup in close(). Hadoop and Hive catalogs do not close their FileIO, but this is likely intentional - they default to HadoopFileIO which has no close() implementation since Hadoop's FileSystem manages its own lifecycle.

@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 14, 2026

@stankiewicz Thanks for the review - addressed all the feedback from our discussion. Here's what changed:

  • Catalog lifecycle: Each DoFn now creates its own catalog via catalogConfig.newCatalog() in @Setup and closes it in @Teardown. This removes the reliance on the shared cachedCatalog in IcebergCatalogConfig, so the fix works correctly regardless of whether the DoFn is passed as instance or deserialized. The existing catalog() method is preserved for driver-side operations (pipeline construction, schema inference, etc.).

  • Static table cache: LAST_REFRESHED_TABLE_CACHE is no longer static - each DoFn owns its own cache instance and passes it to RecordWriterManager via the constructor. This prevents a closed catalog's dead Table objects from poisoning other DoFn instances.

@dejii dejii requested a review from stankiewicz April 14, 2026 00:01
@stankiewicz
Copy link
Copy Markdown
Contributor

stankiewicz commented Apr 14, 2026

@stankiewicz Thanks for the review - addressed all the feedback from our discussion. Here's what changed:

  • Catalog lifecycle: Each DoFn now creates its own catalog via catalogConfig.newCatalog() in @Setup and closes it in @Teardown. This removes the reliance on the shared cachedCatalog in IcebergCatalogConfig, so the fix works correctly regardless of whether the DoFn is passed as instance or deserialized. The existing catalog() method is preserved for driver-side operations (pipeline construction, schema inference, etc.).
  • Static table cache: LAST_REFRESHED_TABLE_CACHE is no longer static - each DoFn owns its own cache instance and passes it to RecordWriterManager via the constructor. This prevents a closed catalog's dead Table objects from poisoning other DoFn instances.

thanks @dejii .

What is important to note is:

What Previous Change
table cache - table identifier --> storage credentials, io per VM per doFn life / harness thread
iceberg catalog instance per harness thread per doFn life / per harness thread
io close finish bundle doFn.teardown closes catalog (Which closes all IOs that need to be closed)
leak no memory leak slow memory leak - if doFn runs for long, catalog will grow over time with trackedFileIO with vended credentials

@ahmedabu98 we've discussed having multiple catalog and some quota issues. Can you share some thoughts? This change doesn't reduce amount of catalogs used over time.

@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 14, 2026

@stankiewicz Thanks for the summary table. One comment on the leak characterization:

There shouldn't be a memory leak. FileIOTracker uses Caffeine's weakKeys() with a removal listener that calls fileIO.close() when a TableOperations key is GC'd (FileIOTracker.java#L37-L47). The strong reference chain is tableCache -> Table -> TableOperations. This should happen continuously throughout the DoFn's lifetime, not just at teardown, so even a DoFn running for long will naturally clean up per-table FileIOs for tables that go idle. The tracker is bounded by the number of distinct tables written to in the last 10 minutes.

@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 14, 2026

On catalog quota: On Dataflow Runner v2 (and any runner that deserializes DoFns), the number of catalogs per worker is unchanged. The only case where catalog count increases is on runners that pass DoFns by instance.

@stankiewicz
Copy link
Copy Markdown
Contributor

thanks, fix spotless errors, please.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants