From 7786c5c681553c5aa1a0e2f9b02e3e8348718fc3 Mon Sep 17 00:00:00 2001 From: Scott Schenkein Date: Fri, 29 May 2026 20:44:27 -0400 Subject: [PATCH 1/6] fix: decline native V1 scans on object_store-unsupported filesystem schemes Comet's native readers go through object_store, which only understands a fixed set of URL schemes. A custom Hadoop FileSystem (e.g. registered via spark.hadoop.fs..impl) crashes the native reader at execution with "Generic URL error: Unable to recognise URL", with no graceful recovery. Decline such scans at planning time so Spark's Hadoop-FS-aware reader handles them. Whether object_store recognizes a scheme is answered by the native layer itself (NativeBase.isObjectStoreSchemeSupported, backed by object_store's ObjectStoreScheme::parse -- the same path prepare_object_store_with_configs uses) rather than a hardcoded list, so the planner can't drift from object_store's actual support. The user's libhdfs scheme config (spark.hadoop.fs.comet.libhdfs.schemes) is unioned in on the JVM side; results are cached per scheme; if native can't be consulted the scheme is assumed supported rather than over-restricting. Adds CometScanSchemeFallbackSuite, which asserts a `fake://` scan falls back to Spark; it fails without the gate (Comet claims the scan) and passes with it. Closes #4520 Co-Authored-By: Claude Opus 4.7 --- native/core/src/lib.rs | 25 +++++ .../java/org/apache/comet/NativeBase.java | 11 +++ .../apache/comet/rules/CometScanRule.scala | 63 +++++++++++- .../rules/CometScanSchemeFallbackSuite.scala | 95 +++++++++++++++++++ 4 files changed, 193 insertions(+), 1 deletion(-) create mode 100644 spark/src/test/scala/org/apache/comet/rules/CometScanSchemeFallbackSuite.scala diff --git a/native/core/src/lib.rs b/native/core/src/lib.rs index 19a2d774a0..7d15c761ca 100644 --- a/native/core/src/lib.rs +++ b/native/core/src/lib.rs @@ -156,6 +156,31 @@ pub extern "system" fn Java_org_apache_comet_NativeBase_isFeatureEnabled( }) } +/// JNI method: does object_store recognize this URL's scheme? +/// +/// This is the source of truth for the JVM planner's "can Comet's native reader handle this +/// filesystem?" check. Comet's `prepare_object_store_with_configs` dispatches non-hdfs/non-s3 +/// schemes to object_store's `parse_url`, which is driven by `ObjectStoreScheme::parse`; an +/// unrecognized scheme (e.g. a custom Hadoop FileSystem) fails there at execution time. By +/// answering from `ObjectStoreScheme::parse` here, the planner can decline early without +/// hardcoding -- and drifting from -- the object_store-supported scheme set. (hdfs / libhdfs +/// schemes are handled separately on the JVM side via the user's libhdfs scheme config.) +#[no_mangle] +pub extern "system" fn Java_org_apache_comet_NativeBase_isObjectStoreSchemeSupported( + env: EnvUnowned, + _: JClass, + url: JString, +) -> jni::sys::jboolean { + try_unwrap_or_throw(&env, |env| { + let url_str: String = url.try_to_string(env)?; + let supported = url::Url::parse(&url_str) + .ok() + .map(|u| object_store::ObjectStoreScheme::parse(&u).is_ok()) + .unwrap_or(false); + Ok(supported) + }) +} + // Creates a default log4rs config, which logs to console with log level. fn default_logger_config(log_level: &str) -> CometResult { let console_append = ConsoleAppender::builder() diff --git a/spark/src/main/java/org/apache/comet/NativeBase.java b/spark/src/main/java/org/apache/comet/NativeBase.java index 074a4b1625..e2fcbb24a7 100644 --- a/spark/src/main/java/org/apache/comet/NativeBase.java +++ b/spark/src/main/java/org/apache/comet/NativeBase.java @@ -300,4 +300,15 @@ private static String resourceName() { * @return true if the feature is enabled, false otherwise */ public static native boolean isFeatureEnabled(String featureName); + + /** + * Check whether Comet's native object_store layer recognizes the given URL's scheme (i.e. the + * scan would be natively readable rather than failing at execution with "Unable to recognise + * URL"). This is the authoritative answer from object_store's own scheme parser, so the JVM + * planner never has to hardcode (and drift from) the set of supported schemes. + * + * @param url a fully-qualified URL whose scheme should be checked (e.g. "s3://bucket/path") + * @return true if object_store can construct a store for this scheme, false otherwise + */ + public static native boolean isObjectStoreSchemeSupported(String url); } diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 7601fa1c6b..b3d89322bc 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.comet.{CometConf, DataTypeSupport} +import org.apache.comet.{CometConf, DataTypeSupport, NativeBase} import org.apache.comet.CometConf._ import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, isSpark35Plus, withInfo, withInfos} import org.apache.comet.DataTypeSupport.isComplexType @@ -199,6 +199,40 @@ case class CometScanRule(session: SparkSession) withInfo(scanExec, s"Native Parquet scan requires ${COMET_EXEC_ENABLED.key} to be enabled") return None } + // Comet's native readers go through object_store, which only understands a fixed set of URL + // schemes. A custom Hadoop FileSystem (e.g. registered via spark.hadoop.fs..impl) would + // surface at execution time as `Generic URL error: Unable to recognise URL "..."`. Decline here + // so Spark's reader -- which goes through the Hadoop FS API and can resolve custom schemes -- + // handles the scan. Whether object_store recognizes a scheme is answered by the native layer + // itself (`NativeBase.isObjectStoreSchemeSupported`) rather than a hardcoded list, so the + // planner can't drift from object_store's actual support. + // + // EXCEPT schemes the user routes through libhdfs via `spark.hadoop.fs.comet.libhdfs.schemes` + // (e.g. `hdfs`, or a test `fake`): those ARE natively readable through the libhdfs object_store + // bridge, so they must NOT be declined here (regression guarded by + // ParquetReadFromFakeHadoopFsSuite). + val libhdfsSchemes: Set[String] = COMET_LIBHDFS_SCHEMES.get() match { + case Some(s) => + s.split(",").map(_.trim.toLowerCase(java.util.Locale.ROOT)).filter(_.nonEmpty).toSet + case None => Set.empty + } + val unsupportedFsSchemes = r.location.rootPaths + .map(_.toUri) + .filter { uri => + val sch = uri.getScheme + sch != null && { + val sl = sch.toLowerCase(java.util.Locale.ROOT) + !libhdfsSchemes.contains(sl) && !CometScanRule.isNativelyReadableScheme(uri) + } + } + .map(_.getScheme.toLowerCase(java.util.Locale.ROOT)) + .toSet + if (unsupportedFsSchemes.nonEmpty) { + withInfo( + scanExec, + s"Unsupported filesystem schemes: ${unsupportedFsSchemes.mkString(", ")}") + return None + } // Disabling the vectorized reader opts into parquet-mr's permissive behavior // (silent overflow / null-on-narrowing). Comet has no parquet-mr-equivalent // backend, so by default fall back to Spark. Users can opt in to letting Comet @@ -726,6 +760,33 @@ case class CometScanTypeChecker() extends DataTypeSupport with CometTypeShim { object CometScanRule extends Logging { + // Per-scheme memo of `NativeBase.isObjectStoreSchemeSupported`. The answer depends only on the + // URL scheme, so we cache by scheme and never re-cross the JNI boundary for a repeated scheme. + private val schemeSupportCache = + new java.util.concurrent.ConcurrentHashMap[String, java.lang.Boolean]() + + /** + * True when Comet's native object_store layer recognizes this URI's scheme (so the scan is + * natively readable). Delegates to the native layer -- the source of truth -- instead of a + * hardcoded scheme list. On any failure to consult native (e.g. the library isn't loaded on + * this JVM, or predates this method) we assume the scheme IS supported: the scheme gate is an + * early-fallback optimization, and a build without a working native library can't run Comet's + * native scan anyway, so declining here would only over-restrict. + */ + private[rules] def isNativelyReadableScheme(uri: java.net.URI): Boolean = { + val scheme = uri.getScheme + if (scheme == null) return true + schemeSupportCache + .computeIfAbsent( + scheme.toLowerCase(java.util.Locale.ROOT), + _ => + try java.lang.Boolean.valueOf(NativeBase.isObjectStoreSchemeSupported(uri.toString)) + catch { + case _: Throwable => java.lang.Boolean.TRUE + }) + .booleanValue() + } + /** * Tag set on a scan (`FileSourceScanExec` or `BatchScanExec`) that should be left as a plain * Spark scan rather than converted to a Comet scan. Written by diff --git a/spark/src/test/scala/org/apache/comet/rules/CometScanSchemeFallbackSuite.scala b/spark/src/test/scala/org/apache/comet/rules/CometScanSchemeFallbackSuite.scala new file mode 100644 index 0000000000..e7ab7ffc6c --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/rules/CometScanSchemeFallbackSuite.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.rules + +import java.io.File +import java.nio.file.Files +import java.util.UUID + +import org.apache.commons.io.FileUtils +import org.apache.spark.SparkConf +import org.apache.spark.sql.{CometTestBase, SaveMode} +import org.apache.spark.sql.comet.CometScanExec +import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} + +import org.apache.comet.CometConf +import org.apache.comet.hadoop.fs.FakeHDFSFileSystem + +/** + * Comet's native readers go through object_store, which only understands a fixed set of URL + * schemes. A custom Hadoop FileSystem scheme that object_store can't parse (here `fake://`) must + * NOT be claimed by the native scan -- it would fail at execution with "Unable to recognise URL". + * `CometScanRule` must decline it so Spark's Hadoop-FS-aware reader handles the scan. + * + * Unlike `ParquetReadFromFakeHadoopFsSuite`, this suite does NOT route the `fake` scheme through + * libhdfs (`spark.hadoop.fs.comet.libhdfs.schemes`), so it exercises the decline path. The test + * applies the rule directly to the physical plan and asserts fallback -- no query execution, so + * it doesn't depend on the native reader actually attempting (and failing on) the scheme. + */ +class CometScanSchemeFallbackSuite extends CometTestBase { + + private var fakeRootDir: File = _ + + override protected def sparkConf: SparkConf = { + val conf = super.sparkConf + conf.set("spark.hadoop.fs.fake.impl", "org.apache.comet.hadoop.fs.FakeHDFSFileSystem") + conf.set("spark.hadoop.fs.defaultFS", FakeHDFSFileSystem.PREFIX) + // Intentionally NOT setting CometConf.COMET_LIBHDFS_SCHEMES -- `fake` is not natively readable. + conf + } + + override def beforeAll(): Unit = { + fakeRootDir = Files.createTempDirectory(s"comet_scheme_${UUID.randomUUID().toString}").toFile + super.beforeAll() + } + + protected override def afterAll(): Unit = { + if (fakeRootDir != null) FileUtils.deleteDirectory(fakeRootDir) + super.afterAll() + } + + test("native scan declines a filesystem scheme object_store can't read (fake://)") { + val path = s"${FakeHDFSFileSystem.PREFIX}${fakeRootDir.getAbsolutePath}/data" + spark.range(0, 10).toDF("id").write.format("parquet").mode(SaveMode.Overwrite).save(path) + + // Obtain a clean Spark physical plan (Comet disabled) with the FileSourceScanExec, then apply + // CometScanRule directly. No execution -- we only check whether the rule claims the scan. + val sparkPlan: SparkPlan = withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + spark.read.parquet(path).queryExecution.executedPlan + } + + withSQLConf( + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true") { + val transformed = CometScanRule(spark).apply(stripAQEPlan(sparkPlan)) + + val cometScans = transformed.collect { case s: CometScanExec => s } + val sparkScans = transformed.collect { case s: FileSourceScanExec => s } + assert( + cometScans.isEmpty, + s"`fake://` is not object_store-readable; the native scan must fall back to Spark, " + + s"but Comet claimed it:\n$transformed") + assert( + sparkScans.size == 1, + s"expected the scan to remain a Spark FileSourceScanExec:\n$transformed") + } + } +} From b4570247ea92f707d35e110b4bb3eb283372c7db Mon Sep 17 00:00:00 2001 From: Scott Schenkein Date: Sat, 30 May 2026 15:34:36 -0400 Subject: [PATCH 2/6] ci: register CometScanSchemeFallbackSuite in PR build workflows check-suites.py requires every *Suite.scala to appear in both pr_build_linux.yml and pr_build_macos.yml. Add the new CometScanSchemeFallbackSuite alongside its sibling org.apache.comet.rules suites. Co-Authored-By: Claude Opus 4.8 (1M context) --- .github/workflows/pr_build_linux.yml | 1 + .github/workflows/pr_build_macos.yml | 1 + 2 files changed, 2 insertions(+) diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index 422232f546..5b984a904b 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -320,6 +320,7 @@ jobs: org.apache.spark.CometPluginsNonOverrideSuite org.apache.spark.CometPluginsUnifiedModeOverrideSuite org.apache.comet.rules.CometScanRuleSuite + org.apache.comet.rules.CometScanSchemeFallbackSuite org.apache.comet.rules.CometExecRuleSuite org.apache.spark.sql.CometTPCDSQuerySuite org.apache.spark.sql.CometTPCDSQueryTestSuite diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index d0a03eeb75..a810aab30e 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -162,6 +162,7 @@ jobs: org.apache.spark.CometPluginsNonOverrideSuite org.apache.spark.CometPluginsUnifiedModeOverrideSuite org.apache.comet.rules.CometScanRuleSuite + org.apache.comet.rules.CometScanSchemeFallbackSuite org.apache.comet.rules.CometExecRuleSuite org.apache.spark.sql.CometTPCDSQuerySuite org.apache.spark.sql.CometTPCDSQueryTestSuite From 8b1c4178b88e87293c20571c87d9f998905337dd Mon Sep 17 00:00:00 2001 From: Scott Schenkein Date: Sat, 30 May 2026 17:45:51 -0400 Subject: [PATCH 3/6] test: make CometScanSchemeFallbackSuite compile under Spark 3.5 / Scala 2.12 SQLTestUtils.withSQLConf returns Unit on Spark 3.5 but a value on Spark 4.x, so assigning its block result to `val sparkPlan: SparkPlan` failed to compile under the spark-3.5 profile (type mismatch: found Unit, required SparkPlan). Capture the plan via a var assigned inside the block, which is cross-version-safe. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../apache/comet/rules/CometScanSchemeFallbackSuite.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/rules/CometScanSchemeFallbackSuite.scala b/spark/src/test/scala/org/apache/comet/rules/CometScanSchemeFallbackSuite.scala index e7ab7ffc6c..74f8242879 100644 --- a/spark/src/test/scala/org/apache/comet/rules/CometScanSchemeFallbackSuite.scala +++ b/spark/src/test/scala/org/apache/comet/rules/CometScanSchemeFallbackSuite.scala @@ -71,8 +71,11 @@ class CometScanSchemeFallbackSuite extends CometTestBase { // Obtain a clean Spark physical plan (Comet disabled) with the FileSourceScanExec, then apply // CometScanRule directly. No execution -- we only check whether the rule claims the scan. - val sparkPlan: SparkPlan = withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - spark.read.parquet(path).queryExecution.executedPlan + // Capture via a var inside the block: `SQLTestUtils.withSQLConf` returns Unit on Spark 3.5 + // but a value on Spark 4.x, so we can't return the plan out of it cross-version. + var sparkPlan: SparkPlan = null + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + sparkPlan = spark.read.parquet(path).queryExecution.executedPlan } withSQLConf( From 7e8b136d2398cdf29587b205869376963f1dea1c Mon Sep 17 00:00:00 2001 From: Scott Schenkein Date: Sat, 30 May 2026 17:50:12 -0400 Subject: [PATCH 4/6] refactor: use imports instead of fully-qualified names in CometScanRule Address review feedback: import java.lang.Boolean (as JBoolean), java.net.URI, java.util.Locale and java.util.concurrent.ConcurrentHashMap rather than referencing them with fully-qualified class names in the newly-added scheme-gating code. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../apache/comet/rules/CometScanRule.scala | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 3c32b661e3..d4f86db960 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -19,7 +19,10 @@ package org.apache.comet.rules +import java.lang.{Boolean => JBoolean} import java.net.URI +import java.util.Locale +import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable import scala.collection.mutable.ListBuffer @@ -215,7 +218,7 @@ case class CometScanRule(session: SparkSession) // ParquetReadFromFakeHadoopFsSuite). val libhdfsSchemes: Set[String] = COMET_LIBHDFS_SCHEMES.get() match { case Some(s) => - s.split(",").map(_.trim.toLowerCase(java.util.Locale.ROOT)).filter(_.nonEmpty).toSet + s.split(",").map(_.trim.toLowerCase(Locale.ROOT)).filter(_.nonEmpty).toSet case None => Set.empty } val unsupportedFsSchemes = r.location.rootPaths @@ -223,11 +226,11 @@ case class CometScanRule(session: SparkSession) .filter { uri => val sch = uri.getScheme sch != null && { - val sl = sch.toLowerCase(java.util.Locale.ROOT) + val sl = sch.toLowerCase(Locale.ROOT) !libhdfsSchemes.contains(sl) && !CometScanRule.isNativelyReadableScheme(uri) } } - .map(_.getScheme.toLowerCase(java.util.Locale.ROOT)) + .map(_.getScheme.toLowerCase(Locale.ROOT)) .toSet if (unsupportedFsSchemes.nonEmpty) { withInfo( @@ -765,7 +768,7 @@ object CometScanRule extends Logging { // Per-scheme memo of `NativeBase.isObjectStoreSchemeSupported`. The answer depends only on the // URL scheme, so we cache by scheme and never re-cross the JNI boundary for a repeated scheme. private val schemeSupportCache = - new java.util.concurrent.ConcurrentHashMap[String, java.lang.Boolean]() + new ConcurrentHashMap[String, JBoolean]() /** * True when Comet's native object_store layer recognizes this URI's scheme (so the scan is @@ -775,16 +778,16 @@ object CometScanRule extends Logging { * early-fallback optimization, and a build without a working native library can't run Comet's * native scan anyway, so declining here would only over-restrict. */ - private[rules] def isNativelyReadableScheme(uri: java.net.URI): Boolean = { + private[rules] def isNativelyReadableScheme(uri: URI): Boolean = { val scheme = uri.getScheme if (scheme == null) return true schemeSupportCache .computeIfAbsent( - scheme.toLowerCase(java.util.Locale.ROOT), + scheme.toLowerCase(Locale.ROOT), _ => - try java.lang.Boolean.valueOf(NativeBase.isObjectStoreSchemeSupported(uri.toString)) + try JBoolean.valueOf(NativeBase.isObjectStoreSchemeSupported(uri.toString)) catch { - case _: Throwable => java.lang.Boolean.TRUE + case _: Throwable => JBoolean.TRUE }) .booleanValue() } From a1a29a04a9c72962208513e5dc1fb56e3a44808e Mon Sep 17 00:00:00 2001 From: Scott Schenkein Date: Sat, 30 May 2026 19:24:50 -0400 Subject: [PATCH 5/6] fix: use withFallbackReason in scheme gate (leftover from #4508 rename) The unsupported-scheme fallback still called withInfo, the old name of withFallbackReason (renamed in #4508). It was the only remaining old-name call in the file and broke compilation after merging main; rename it to match the rest of CometScanRule. Co-Authored-By: Claude Opus 4.8 (1M context) --- spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index d4f86db960..97e712d420 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -233,7 +233,7 @@ case class CometScanRule(session: SparkSession) .map(_.getScheme.toLowerCase(Locale.ROOT)) .toSet if (unsupportedFsSchemes.nonEmpty) { - withInfo( + withFallbackReason( scanExec, s"Unsupported filesystem schemes: ${unsupportedFsSchemes.mkString(", ")}") return None From f5e66092cd94778a5d455f1cdda2568412115b3c Mon Sep 17 00:00:00 2001 From: Scott Schenkein Date: Sun, 31 May 2026 14:28:36 -0400 Subject: [PATCH 6/6] fix: default hdfs scheme to natively-readable in CometScanRule gate Address review feedback on #4525. When `spark.hadoop.fs.comet.libhdfs.schemes` is unset, the scheme gate now defaults `libhdfsSchemes` to `Set("hdfs")` rather than the empty set, mirroring the native default: `is_hdfs_scheme` (parquet_support.rs) treats `hdfs` as natively readable when the config is unset, and `create_hdfs_object_store` is in the default build (`default = ["hdfs-opendal"]`). Previously a plain `hdfs://` V1 scan was declined and silently fell back to Spark in the default HDFS configuration even though native could read it. `s3a`/`file` are unaffected (object_store recognizes them via `parse_url`); an explicit config value still takes over verbatim. Test: add `native scan claims hdfs:// when libhdfs.schemes is unset` to CometScanSchemeFallbackSuite, alongside the existing `fake://` decline case. It backs the `hdfs` scheme with a local FS (FakeHdfsSchemeFileSystem) so an `hdfs://` path is exercised without a live cluster, then asserts CometScanRule claims the scan. Verified RED (fails with `Set.empty`: scan falls back to Spark) -> GREEN (passes with `Set("hdfs")`) on Spark 3.5. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../apache/comet/rules/CometScanRule.scala | 9 +++- .../hadoop/fs/FakeHdfsSchemeFileSystem.java | 52 +++++++++++++++++++ .../rules/CometScanSchemeFallbackSuite.scala | 38 +++++++++++++- 3 files changed, 96 insertions(+), 3 deletions(-) create mode 100644 spark/src/test/java/org/apache/comet/hadoop/fs/FakeHdfsSchemeFileSystem.java diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 97e712d420..c284c3f7b2 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -216,10 +216,17 @@ case class CometScanRule(session: SparkSession) // (e.g. `hdfs`, or a test `fake`): those ARE natively readable through the libhdfs object_store // bridge, so they must NOT be declined here (regression guarded by // ParquetReadFromFakeHadoopFsSuite). + // + // The default mirrors the native side: when the config is unset, `is_hdfs_scheme` + // (native/core/src/parquet/parquet_support.rs) treats `hdfs` as natively readable, and + // `create_hdfs_object_store` is in the default build (`default = ["hdfs-opendal"]`). If we + // defaulted to an empty set here, a plain `hdfs://` V1 scan would be declined and fall back to + // Spark even though native can read it -- a silent regression for HDFS users in the default + // configuration. So default to `Set("hdfs")` to stay in lockstep with the native default. val libhdfsSchemes: Set[String] = COMET_LIBHDFS_SCHEMES.get() match { case Some(s) => s.split(",").map(_.trim.toLowerCase(Locale.ROOT)).filter(_.nonEmpty).toSet - case None => Set.empty + case None => Set("hdfs") } val unsupportedFsSchemes = r.location.rootPaths .map(_.toUri) diff --git a/spark/src/test/java/org/apache/comet/hadoop/fs/FakeHdfsSchemeFileSystem.java b/spark/src/test/java/org/apache/comet/hadoop/fs/FakeHdfsSchemeFileSystem.java new file mode 100644 index 0000000000..eeed348416 --- /dev/null +++ b/spark/src/test/java/org/apache/comet/hadoop/fs/FakeHdfsSchemeFileSystem.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.hadoop.fs; + +import java.net.URI; + +import org.apache.hadoop.fs.RawLocalFileSystem; + +/** + * A local-disk-backed FileSystem that reports the {@code hdfs} scheme, so a test can write/read an + * {@code hdfs://} path without a live HDFS cluster. Used to assert that {@code CometScanRule} still + * claims an {@code hdfs://} scan when {@code spark.hadoop.fs.comet.libhdfs.schemes} is unset -- + * i.e. the JVM scheme gate's default stays in lockstep with the native {@code is_hdfs_scheme} + * default. + */ +public class FakeHdfsSchemeFileSystem extends RawLocalFileSystem { + + public static final String PREFIX = "hdfs://fake-namenode"; + + public FakeHdfsSchemeFileSystem() { + // Avoid `URI scheme is not "file"` error on + // RawLocalFileSystem$DeprecatedRawLocalFileStatus.getOwner + RawLocalFileSystem.useStatIfAvailable(); + } + + @Override + public String getScheme() { + return "hdfs"; + } + + @Override + public URI getUri() { + return URI.create(PREFIX); + } +} diff --git a/spark/src/test/scala/org/apache/comet/rules/CometScanSchemeFallbackSuite.scala b/spark/src/test/scala/org/apache/comet/rules/CometScanSchemeFallbackSuite.scala index 74f8242879..2e81306125 100644 --- a/spark/src/test/scala/org/apache/comet/rules/CometScanSchemeFallbackSuite.scala +++ b/spark/src/test/scala/org/apache/comet/rules/CometScanSchemeFallbackSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.comet.CometScanExec import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} import org.apache.comet.CometConf -import org.apache.comet.hadoop.fs.FakeHDFSFileSystem +import org.apache.comet.hadoop.fs.{FakeHDFSFileSystem, FakeHdfsSchemeFileSystem} /** * Comet's native readers go through object_store, which only understands a fixed set of URL @@ -50,8 +50,12 @@ class CometScanSchemeFallbackSuite extends CometTestBase { override protected def sparkConf: SparkConf = { val conf = super.sparkConf conf.set("spark.hadoop.fs.fake.impl", "org.apache.comet.hadoop.fs.FakeHDFSFileSystem") + // Back the `hdfs` scheme with a local FS so we can exercise an `hdfs://` path without a live + // cluster. `hdfs` is natively readable by default, so this scan must be CLAIMED, not declined. + conf.set("spark.hadoop.fs.hdfs.impl", "org.apache.comet.hadoop.fs.FakeHdfsSchemeFileSystem") conf.set("spark.hadoop.fs.defaultFS", FakeHDFSFileSystem.PREFIX) - // Intentionally NOT setting CometConf.COMET_LIBHDFS_SCHEMES -- `fake` is not natively readable. + // Intentionally NOT setting CometConf.COMET_LIBHDFS_SCHEMES -- `fake` is not natively readable, + // and `hdfs` must still be claimed by default (mirrors the native `is_hdfs_scheme` default). conf } @@ -95,4 +99,34 @@ class CometScanSchemeFallbackSuite extends CometTestBase { s"expected the scan to remain a Spark FileSourceScanExec:\n$transformed") } } + + test("native scan claims hdfs:// when libhdfs.schemes is unset (native-default lockstep)") { + // Native's `is_hdfs_scheme` treats `hdfs` as readable when `fs.comet.libhdfs.schemes` is unset, + // and `create_hdfs_object_store` is in the default build. The JVM gate must agree: with the + // config unset, an `hdfs://` scan must be CLAIMED by Comet, not declined to Spark. Guards the + // `case None => Set("hdfs")` default in CometScanRule against the silent-fallback regression + // Andy flagged in #4525. + val path = s"${FakeHdfsSchemeFileSystem.PREFIX}${fakeRootDir.getAbsolutePath}/hdfs-data" + spark.range(0, 10).toDF("id").write.format("parquet").mode(SaveMode.Overwrite).save(path) + + var sparkPlan: SparkPlan = null + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + sparkPlan = spark.read.parquet(path).queryExecution.executedPlan + } + + withSQLConf( + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true") { + val transformed = CometScanRule(spark).apply(stripAQEPlan(sparkPlan)) + + val cometScans = transformed.collect { case s: CometScanExec => s } + val sparkScans = transformed.collect { case s: FileSourceScanExec => s } + assert( + cometScans.size == 1, + s"`hdfs://` is natively readable by default; Comet must claim the scan, " + + s"but it fell back to Spark:\n$transformed") + assert(sparkScans.isEmpty, s"expected no leftover Spark FileSourceScanExec:\n$transformed") + } + } }