fix: decline native V1 scans on object_store-unsupported filesystem schemes#4525
fix: decline native V1 scans on object_store-unsupported filesystem schemes#4525schenksj wants to merge 7 commits into
Conversation
…chemes Comet's native readers go through object_store, which only understands a fixed set of URL schemes. A custom Hadoop FileSystem (e.g. registered via spark.hadoop.fs.<scheme>.impl) crashes the native reader at execution with "Generic URL error: Unable to recognise URL", with no graceful recovery. Decline such scans at planning time so Spark's Hadoop-FS-aware reader handles them. Whether object_store recognizes a scheme is answered by the native layer itself (NativeBase.isObjectStoreSchemeSupported, backed by object_store's ObjectStoreScheme::parse -- the same path prepare_object_store_with_configs uses) rather than a hardcoded list, so the planner can't drift from object_store's actual support. The user's libhdfs scheme config (spark.hadoop.fs.comet.libhdfs.schemes) is unioned in on the JVM side; results are cached per scheme; if native can't be consulted the scheme is assumed supported rather than over-restricting. Adds CometScanSchemeFallbackSuite, which asserts a `fake://` scan falls back to Spark; it fails without the gate (Comet claims the scan) and passes with it. Closes apache#4520 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
check-suites.py requires every *Suite.scala to appear in both pr_build_linux.yml and pr_build_macos.yml. Add the new CometScanSchemeFallbackSuite alongside its sibling org.apache.comet.rules suites. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
| // Per-scheme memo of `NativeBase.isObjectStoreSchemeSupported`. The answer depends only on the | ||
| // URL scheme, so we cache by scheme and never re-cross the JNI boundary for a repeated scheme. | ||
| private val schemeSupportCache = | ||
| new java.util.concurrent.ConcurrentHashMap[String, java.lang.Boolean]() |
There was a problem hiding this comment.
please add imports rather than use fully qualified class names
There was a problem hiding this comment.
Done. Added imports for java.net.URI, java.util.Locale, java.util.concurrent.ConcurrentHashMap, and java.lang.Boolean (aliased JBoolean) and dropped the fully-qualified references. Also fixed a leftover withInfo call in the same code (renamed to withFallbackReason in #4508) that was breaking compilation after merging main.
…la 2.12 SQLTestUtils.withSQLConf returns Unit on Spark 3.5 but a value on Spark 4.x, so assigning its block result to `val sparkPlan: SparkPlan` failed to compile under the spark-3.5 profile (type mismatch: found Unit, required SparkPlan). Capture the plan via a var assigned inside the block, which is cross-version-safe. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Address review feedback: import java.lang.Boolean (as JBoolean), java.net.URI, java.util.Locale and java.util.concurrent.ConcurrentHashMap rather than referencing them with fully-qualified class names in the newly-added scheme-gating code. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…rename) The unsupported-scheme fallback still called withInfo, the old name of withFallbackReason (renamed in apache#4508). It was the only remaining old-name call in the file and broke compilation after merging main; rename it to match the rest of CometScanRule. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Nice approach overall. Sourcing "can the native reader handle this scheme?" from I think there is one gap worth a look before merge: plain On the native side, On the JVM side, I built the branch with a default native library and probed the gate's helper directly to confirm: So Would it make sense to mirror the native default on the JVM, so the two stay in lockstep? val libhdfsSchemes: Set[String] = COMET_LIBHDFS_SCHEMES.get() match {
case Some(s) => s.split(",").map(_.trim.toLowerCase(Locale.ROOT)).filter(_.nonEmpty).toSet
case None => Set("hdfs") // native is_hdfs_scheme defaults to `scheme == "hdfs"` when unset
}A test asserting that an One smaller thing: is the V2 Disclosure: I used Claude Code to help review this PR, including building the branch and running the scheme-gate probe above. |
Address review feedback on apache#4525. When `spark.hadoop.fs.comet.libhdfs.schemes` is unset, the scheme gate now defaults `libhdfsSchemes` to `Set("hdfs")` rather than the empty set, mirroring the native default: `is_hdfs_scheme` (parquet_support.rs) treats `hdfs` as natively readable when the config is unset, and `create_hdfs_object_store` is in the default build (`default = ["hdfs-opendal"]`). Previously a plain `hdfs://` V1 scan was declined and silently fell back to Spark in the default HDFS configuration even though native could read it. `s3a`/`file` are unaffected (object_store recognizes them via `parse_url`); an explicit config value still takes over verbatim. Test: add `native scan claims hdfs:// when libhdfs.schemes is unset` to CometScanSchemeFallbackSuite, alongside the existing `fake://` decline case. It backs the `hdfs` scheme with a local FS (FakeHdfsSchemeFileSystem) so an `hdfs://` path is exercised without a live cluster, then asserts CometScanRule claims the scan. Verified RED (fails with `Set.empty`: scan falls back to Spark) -> GREEN (passes with `Set("hdfs")`) on Spark 3.5. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Thanks @andygrove — and for building the branch and probing the gate directly; that
Test. Added V2 Also fixed a latent compile break the gate carried: the decline branch still called |
Which issue does this PR close?
Closes #4520.
Rationale for this change
Comet's native readers go through
object_store, which only understands a fixed set of URL schemes. When a scan's path uses a custom HadoopFileSystemscheme (e.g. registered viaspark.hadoop.fs.<scheme>.impl), the native reader fails at execution withGeneric URL error: Unable to recognise URL "..."— there is no graceful recovery once native execution has started. This was surfaced by Delta tables opened with custom filesystem options (DeltaTable.forPath(spark, path, fsOptions)), where Delta reads its internal_delta_log/*.checkpoint.parquetvia ordinary V1 parquet scans that Comet then claimed and crashed on, but it reproduces for any V1 parquet scan on such a scheme.What changes are included in this PR?
CometScanRuledeclines a V1 native scan when its root-path scheme isn't natively readable, so Spark's Hadoop-FS-aware reader handles it. Rather than hardcode the object_store-supported scheme set in the planner (a mirror that drifts), the answer comes from the native layer itself: a newNativeBase.isObjectStoreSchemeSupportedJNI method backed byobject_store's ownObjectStoreScheme::parse— the same pathprepare_object_store_with_configsdispatches through. The user's libhdfs scheme config (spark.hadoop.fs.comet.libhdfs.schemes) is unioned in on the JVM side; results are cached per scheme; and if native can't be consulted the scheme is assumed supported rather than over-restricting.How are these changes tested?
CometScanSchemeFallbackSuiteregistersFakeHDFSFileSystemfor afake://scheme (not routed through libhdfs) and appliesCometScanRuleto the scan's physical plan. It asserts the scan falls back to Spark (noCometScanExec). The test fails without the gate (Comet claims thefake://scan) and passes with it. The libhdfs-scheme regression guard (ParquetReadFromFakeHadoopFsSuite) continues to engage Comet for configured libhdfs schemes.