Skip to content

[FLINK-39421][table] Fix metadata filter contract and add coverage#28162

Open
jnh5y wants to merge 1 commit into
apache:masterfrom
jnh5y:flink-39421-metadata-filter-fixes
Open

[FLINK-39421][table] Fix metadata filter contract and add coverage#28162
jnh5y wants to merge 1 commit into
apache:masterfrom
jnh5y:flink-39421-metadata-filter-fixes

Conversation

@jnh5y
Copy link
Copy Markdown
Contributor

@jnh5y jnh5y commented May 15, 2026

Repair the metadata filter push-down contract introduced in FLINK-39421 so that connectors can express arbitrary subset acceptance through MetadataFilterResult, and add a coverage precondition + integration tests.

Rule changes (PushFilterIntoTableSourceScanRule):

  • Replace position-based slicing (accepted-as-prefix, remaining-as-suffix) with identity-based correlation. Each ResolvedExpression in accepted/ remaining is looked up by instance identity to recover the original RexNode (one in metadata-key index space for spec storage, one in scan-row index space for the runtime Calc above the scan). The ResolvedExpression list is immutable, so identity round-trip is the natural correspondence.
  • Add a coverage precondition: every input ResolvedExpression must appear in accepted, remaining, or both.
  • Widen the early-return so the rule terminates when no buckets can usefully push (physical predicates can't be pushed AND metadata predicates are empty), avoiding wasted Hep iterations.
  • Route a metadata-only predicate that cannot currently be pushed (source does not support metadata push-down, or the spec was already attached) directly to the runtime Calc rather than the physical path.

Spec changes (MetadataFilterPushDownSpec):

  • apply() enforces the same identity-based round-trip on compiled-plan restore — every previously-accepted predicate must come back from the source, identified by ResolvedExpression instance identity.
  • needAdjustFieldReferenceAfterProjection() now returns false. The spec stores predicates in metadata-row index space against a self-contained predicateRowType used at restore; narrowing the scan via ProjectPushDownSpec does not invalidate them, so ScanReuser is now free to reuse a scan when two queries differ only in projection but share the same metadata filter.

Helper change (FilterPushDownSpec):

  • resolvePredicates returns a ResolvedPredicates struct carrying both the resolved expressions and an identity-keyed reverse map from each resolved expression to the input RexNode it came from. Used by the rule and by MetadataFilterPushDownSpec.apply().

What is the purpose of the change

(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)

Brief change log

(for example:)

  • The TaskInfo is stored in the blob store on job creation time as a persistent artifact
  • Deployments RPC transmits only the blob storage reference
  • TaskManagers retrieve the TaskInfo from the blob cache

Verifying this change

This change added tests and can be verified as follows:

Tests:

  • New MetadataFilterInReadingMetadataTest cases: testBestEffortMetadataFilter, testSubsetAcceptedNonPrefix, and testSourceDroppingPredicateRaisesError.
  • New MetadataFilterResultShapesITCase covers the four shapes a source can return end-to-end through MiniCluster: accept-all, accept-none, partial split, and best-effort overlap.
  • DynamicTableSourceSpecSerdeTest spec3 round-trips MetadataFilterPushDownSpec through JSON serde.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)
    Generated-by: Claude (Opus 4.7)

Repair the metadata filter push-down contract introduced in FLINK-39421
so that connectors can express arbitrary subset acceptance through
MetadataFilterResult, and add a coverage precondition + integration
tests.

Rule changes (PushFilterIntoTableSourceScanRule):
- Replace position-based slicing (accepted-as-prefix, remaining-as-suffix)
  with identity-based correlation. Each ResolvedExpression in accepted/
  remaining is looked up by instance identity to recover the original
  RexNode (one in metadata-key index space for spec storage, one in
  scan-row index space for the runtime Calc above the scan). The
  ResolvedExpression list is immutable, so identity round-trip is the
  natural correspondence.
- Add a coverage precondition: every input ResolvedExpression must
  appear in accepted, remaining, or both. Dropping a predicate from
  both lists silently produces wrong results — the predicate would be
  neither pushed nor preserved.
- Widen the early-return so the rule terminates when no buckets can
  usefully push (physical predicates can't be pushed AND metadata
  predicates are empty), avoiding wasted Hep iterations.
- Route a metadata-only predicate that cannot currently be pushed
  (source does not support metadata push-down, or the spec was already
  attached) directly to the runtime Calc rather than the physical path.
  Routing it physically produced a stale FilterPushDownSpec with
  out-of-range RexInputRef indices that crashed compiled-plan restore.

Spec changes (MetadataFilterPushDownSpec):
- apply() enforces the same identity-based round-trip on compiled-plan
  restore — every previously-accepted predicate must come back from the
  source, identified by ResolvedExpression instance identity.
- needAdjustFieldReferenceAfterProjection() now returns false. The spec
  stores predicates in metadata-row index space against a self-contained
  predicateRowType used at restore; narrowing the scan via
  ProjectPushDownSpec does not invalidate them, so ScanReuser is now
  free to reuse a scan when two queries differ only in projection but
  share the same metadata filter.

Helper change (FilterPushDownSpec):
- resolvePredicates returns a ResolvedPredicates struct carrying both
  the resolved expressions and an identity-keyed reverse map from each
  resolved expression to the input RexNode it came from. Used by the
  rule and by MetadataFilterPushDownSpec.apply().

Test infrastructure:
- TestValuesScanTableSourceWithWatermarkPushDown.copy() now propagates
  enableMetadataFilterPushDown. Without this, copy() silently disables
  metadata filter push-down for the watermark-enabled test source,
  making the combination of watermark + metadata filter push-down
  untestable through TestValuesTableFactory.

Tests:
- New MetadataFilterInReadingMetadataTest cases:
  * testBestEffortMetadataFilter — overlap pattern (accepted == remaining)
    pushes metadataFilter onto the scan AND keeps a runtime Calc.
  * testSubsetAcceptedNonPrefix — source accepts inputs at non-contiguous
    positions; spec stores exactly the accepted predicates and the
    runtime Calc carries exactly the rejected predicate.
  * testSourceDroppingPredicateRaisesError — source returning empty
    accepted AND empty remaining triggers the new coverage precondition.
- New MetadataFilterResultShapesITCase covers the four shapes a source
  can return end-to-end through MiniCluster: accept-all, accept-none,
  partial split, and best-effort overlap.
- DynamicTableSourceSpecSerdeTest spec3 round-trips MetadataFilterPushDownSpec
  through JSON serde.
@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented May 15, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants