diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index 422232f546..5b984a904b 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -320,6 +320,7 @@ jobs: org.apache.spark.CometPluginsNonOverrideSuite org.apache.spark.CometPluginsUnifiedModeOverrideSuite org.apache.comet.rules.CometScanRuleSuite + org.apache.comet.rules.CometScanSchemeFallbackSuite org.apache.comet.rules.CometExecRuleSuite org.apache.spark.sql.CometTPCDSQuerySuite org.apache.spark.sql.CometTPCDSQueryTestSuite diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index d0a03eeb75..a810aab30e 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -162,6 +162,7 @@ jobs: org.apache.spark.CometPluginsNonOverrideSuite org.apache.spark.CometPluginsUnifiedModeOverrideSuite org.apache.comet.rules.CometScanRuleSuite + org.apache.comet.rules.CometScanSchemeFallbackSuite org.apache.comet.rules.CometExecRuleSuite org.apache.spark.sql.CometTPCDSQuerySuite org.apache.spark.sql.CometTPCDSQueryTestSuite diff --git a/native/core/src/lib.rs b/native/core/src/lib.rs index 19a2d774a0..7d15c761ca 100644 --- a/native/core/src/lib.rs +++ b/native/core/src/lib.rs @@ -156,6 +156,31 @@ pub extern "system" fn Java_org_apache_comet_NativeBase_isFeatureEnabled( }) } +/// JNI method: does object_store recognize this URL's scheme? +/// +/// This is the source of truth for the JVM planner's "can Comet's native reader handle this +/// filesystem?" check. Comet's `prepare_object_store_with_configs` dispatches non-hdfs/non-s3 +/// schemes to object_store's `parse_url`, which is driven by `ObjectStoreScheme::parse`; an +/// unrecognized scheme (e.g. a custom Hadoop FileSystem) fails there at execution time. By +/// answering from `ObjectStoreScheme::parse` here, the planner can decline early without +/// hardcoding -- and drifting from -- the object_store-supported scheme set. (hdfs / libhdfs +/// schemes are handled separately on the JVM side via the user's libhdfs scheme config.) +#[no_mangle] +pub extern "system" fn Java_org_apache_comet_NativeBase_isObjectStoreSchemeSupported( + env: EnvUnowned, + _: JClass, + url: JString, +) -> jni::sys::jboolean { + try_unwrap_or_throw(&env, |env| { + let url_str: String = url.try_to_string(env)?; + let supported = url::Url::parse(&url_str) + .ok() + .map(|u| object_store::ObjectStoreScheme::parse(&u).is_ok()) + .unwrap_or(false); + Ok(supported) + }) +} + // Creates a default log4rs config, which logs to console with log level. fn default_logger_config(log_level: &str) -> CometResult { let console_append = ConsoleAppender::builder() diff --git a/spark/src/main/java/org/apache/comet/NativeBase.java b/spark/src/main/java/org/apache/comet/NativeBase.java index 074a4b1625..e2fcbb24a7 100644 --- a/spark/src/main/java/org/apache/comet/NativeBase.java +++ b/spark/src/main/java/org/apache/comet/NativeBase.java @@ -300,4 +300,15 @@ private static String resourceName() { * @return true if the feature is enabled, false otherwise */ public static native boolean isFeatureEnabled(String featureName); + + /** + * Check whether Comet's native object_store layer recognizes the given URL's scheme (i.e. the + * scan would be natively readable rather than failing at execution with "Unable to recognise + * URL"). This is the authoritative answer from object_store's own scheme parser, so the JVM + * planner never has to hardcode (and drift from) the set of supported schemes. + * + * @param url a fully-qualified URL whose scheme should be checked (e.g. "s3://bucket/path") + * @return true if object_store can construct a store for this scheme, false otherwise + */ + public static native boolean isObjectStoreSchemeSupported(String url); } diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 6dfcdcff25..c284c3f7b2 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -19,7 +19,10 @@ package org.apache.comet.rules +import java.lang.{Boolean => JBoolean} import java.net.URI +import java.util.Locale +import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable import scala.collection.mutable.ListBuffer @@ -40,7 +43,7 @@ import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.comet.{CometConf, DataTypeSupport} +import org.apache.comet.{CometConf, DataTypeSupport, NativeBase} import org.apache.comet.CometConf._ import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, isSpark35Plus, withFallbackReason, withFallbackReasons} import org.apache.comet.DataTypeSupport.isComplexType @@ -201,6 +204,47 @@ case class CometScanRule(session: SparkSession) s"Native Parquet scan requires ${COMET_EXEC_ENABLED.key} to be enabled") return None } + // Comet's native readers go through object_store, which only understands a fixed set of URL + // schemes. A custom Hadoop FileSystem (e.g. registered via spark.hadoop.fs..impl) would + // surface at execution time as `Generic URL error: Unable to recognise URL "..."`. Decline here + // so Spark's reader -- which goes through the Hadoop FS API and can resolve custom schemes -- + // handles the scan. Whether object_store recognizes a scheme is answered by the native layer + // itself (`NativeBase.isObjectStoreSchemeSupported`) rather than a hardcoded list, so the + // planner can't drift from object_store's actual support. + // + // EXCEPT schemes the user routes through libhdfs via `spark.hadoop.fs.comet.libhdfs.schemes` + // (e.g. `hdfs`, or a test `fake`): those ARE natively readable through the libhdfs object_store + // bridge, so they must NOT be declined here (regression guarded by + // ParquetReadFromFakeHadoopFsSuite). + // + // The default mirrors the native side: when the config is unset, `is_hdfs_scheme` + // (native/core/src/parquet/parquet_support.rs) treats `hdfs` as natively readable, and + // `create_hdfs_object_store` is in the default build (`default = ["hdfs-opendal"]`). If we + // defaulted to an empty set here, a plain `hdfs://` V1 scan would be declined and fall back to + // Spark even though native can read it -- a silent regression for HDFS users in the default + // configuration. So default to `Set("hdfs")` to stay in lockstep with the native default. + val libhdfsSchemes: Set[String] = COMET_LIBHDFS_SCHEMES.get() match { + case Some(s) => + s.split(",").map(_.trim.toLowerCase(Locale.ROOT)).filter(_.nonEmpty).toSet + case None => Set("hdfs") + } + val unsupportedFsSchemes = r.location.rootPaths + .map(_.toUri) + .filter { uri => + val sch = uri.getScheme + sch != null && { + val sl = sch.toLowerCase(Locale.ROOT) + !libhdfsSchemes.contains(sl) && !CometScanRule.isNativelyReadableScheme(uri) + } + } + .map(_.getScheme.toLowerCase(Locale.ROOT)) + .toSet + if (unsupportedFsSchemes.nonEmpty) { + withFallbackReason( + scanExec, + s"Unsupported filesystem schemes: ${unsupportedFsSchemes.mkString(", ")}") + return None + } // Disabling the vectorized reader opts into parquet-mr's permissive behavior // (silent overflow / null-on-narrowing). Comet has no parquet-mr-equivalent // backend, so by default fall back to Spark. Users can opt in to letting Comet @@ -728,6 +772,33 @@ case class CometScanTypeChecker() extends DataTypeSupport with CometTypeShim { object CometScanRule extends Logging { + // Per-scheme memo of `NativeBase.isObjectStoreSchemeSupported`. The answer depends only on the + // URL scheme, so we cache by scheme and never re-cross the JNI boundary for a repeated scheme. + private val schemeSupportCache = + new ConcurrentHashMap[String, JBoolean]() + + /** + * True when Comet's native object_store layer recognizes this URI's scheme (so the scan is + * natively readable). Delegates to the native layer -- the source of truth -- instead of a + * hardcoded scheme list. On any failure to consult native (e.g. the library isn't loaded on + * this JVM, or predates this method) we assume the scheme IS supported: the scheme gate is an + * early-fallback optimization, and a build without a working native library can't run Comet's + * native scan anyway, so declining here would only over-restrict. + */ + private[rules] def isNativelyReadableScheme(uri: URI): Boolean = { + val scheme = uri.getScheme + if (scheme == null) return true + schemeSupportCache + .computeIfAbsent( + scheme.toLowerCase(Locale.ROOT), + _ => + try JBoolean.valueOf(NativeBase.isObjectStoreSchemeSupported(uri.toString)) + catch { + case _: Throwable => JBoolean.TRUE + }) + .booleanValue() + } + /** * Tag set on a scan (`FileSourceScanExec` or `BatchScanExec`) that should be left as a plain * Spark scan rather than converted to a Comet scan. Written by diff --git a/spark/src/test/java/org/apache/comet/hadoop/fs/FakeHdfsSchemeFileSystem.java b/spark/src/test/java/org/apache/comet/hadoop/fs/FakeHdfsSchemeFileSystem.java new file mode 100644 index 0000000000..eeed348416 --- /dev/null +++ b/spark/src/test/java/org/apache/comet/hadoop/fs/FakeHdfsSchemeFileSystem.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.hadoop.fs; + +import java.net.URI; + +import org.apache.hadoop.fs.RawLocalFileSystem; + +/** + * A local-disk-backed FileSystem that reports the {@code hdfs} scheme, so a test can write/read an + * {@code hdfs://} path without a live HDFS cluster. Used to assert that {@code CometScanRule} still + * claims an {@code hdfs://} scan when {@code spark.hadoop.fs.comet.libhdfs.schemes} is unset -- + * i.e. the JVM scheme gate's default stays in lockstep with the native {@code is_hdfs_scheme} + * default. + */ +public class FakeHdfsSchemeFileSystem extends RawLocalFileSystem { + + public static final String PREFIX = "hdfs://fake-namenode"; + + public FakeHdfsSchemeFileSystem() { + // Avoid `URI scheme is not "file"` error on + // RawLocalFileSystem$DeprecatedRawLocalFileStatus.getOwner + RawLocalFileSystem.useStatIfAvailable(); + } + + @Override + public String getScheme() { + return "hdfs"; + } + + @Override + public URI getUri() { + return URI.create(PREFIX); + } +} diff --git a/spark/src/test/scala/org/apache/comet/rules/CometScanSchemeFallbackSuite.scala b/spark/src/test/scala/org/apache/comet/rules/CometScanSchemeFallbackSuite.scala new file mode 100644 index 0000000000..2e81306125 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/rules/CometScanSchemeFallbackSuite.scala @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.rules + +import java.io.File +import java.nio.file.Files +import java.util.UUID + +import org.apache.commons.io.FileUtils +import org.apache.spark.SparkConf +import org.apache.spark.sql.{CometTestBase, SaveMode} +import org.apache.spark.sql.comet.CometScanExec +import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} + +import org.apache.comet.CometConf +import org.apache.comet.hadoop.fs.{FakeHDFSFileSystem, FakeHdfsSchemeFileSystem} + +/** + * Comet's native readers go through object_store, which only understands a fixed set of URL + * schemes. A custom Hadoop FileSystem scheme that object_store can't parse (here `fake://`) must + * NOT be claimed by the native scan -- it would fail at execution with "Unable to recognise URL". + * `CometScanRule` must decline it so Spark's Hadoop-FS-aware reader handles the scan. + * + * Unlike `ParquetReadFromFakeHadoopFsSuite`, this suite does NOT route the `fake` scheme through + * libhdfs (`spark.hadoop.fs.comet.libhdfs.schemes`), so it exercises the decline path. The test + * applies the rule directly to the physical plan and asserts fallback -- no query execution, so + * it doesn't depend on the native reader actually attempting (and failing on) the scheme. + */ +class CometScanSchemeFallbackSuite extends CometTestBase { + + private var fakeRootDir: File = _ + + override protected def sparkConf: SparkConf = { + val conf = super.sparkConf + conf.set("spark.hadoop.fs.fake.impl", "org.apache.comet.hadoop.fs.FakeHDFSFileSystem") + // Back the `hdfs` scheme with a local FS so we can exercise an `hdfs://` path without a live + // cluster. `hdfs` is natively readable by default, so this scan must be CLAIMED, not declined. + conf.set("spark.hadoop.fs.hdfs.impl", "org.apache.comet.hadoop.fs.FakeHdfsSchemeFileSystem") + conf.set("spark.hadoop.fs.defaultFS", FakeHDFSFileSystem.PREFIX) + // Intentionally NOT setting CometConf.COMET_LIBHDFS_SCHEMES -- `fake` is not natively readable, + // and `hdfs` must still be claimed by default (mirrors the native `is_hdfs_scheme` default). + conf + } + + override def beforeAll(): Unit = { + fakeRootDir = Files.createTempDirectory(s"comet_scheme_${UUID.randomUUID().toString}").toFile + super.beforeAll() + } + + protected override def afterAll(): Unit = { + if (fakeRootDir != null) FileUtils.deleteDirectory(fakeRootDir) + super.afterAll() + } + + test("native scan declines a filesystem scheme object_store can't read (fake://)") { + val path = s"${FakeHDFSFileSystem.PREFIX}${fakeRootDir.getAbsolutePath}/data" + spark.range(0, 10).toDF("id").write.format("parquet").mode(SaveMode.Overwrite).save(path) + + // Obtain a clean Spark physical plan (Comet disabled) with the FileSourceScanExec, then apply + // CometScanRule directly. No execution -- we only check whether the rule claims the scan. + // Capture via a var inside the block: `SQLTestUtils.withSQLConf` returns Unit on Spark 3.5 + // but a value on Spark 4.x, so we can't return the plan out of it cross-version. + var sparkPlan: SparkPlan = null + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + sparkPlan = spark.read.parquet(path).queryExecution.executedPlan + } + + withSQLConf( + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true") { + val transformed = CometScanRule(spark).apply(stripAQEPlan(sparkPlan)) + + val cometScans = transformed.collect { case s: CometScanExec => s } + val sparkScans = transformed.collect { case s: FileSourceScanExec => s } + assert( + cometScans.isEmpty, + s"`fake://` is not object_store-readable; the native scan must fall back to Spark, " + + s"but Comet claimed it:\n$transformed") + assert( + sparkScans.size == 1, + s"expected the scan to remain a Spark FileSourceScanExec:\n$transformed") + } + } + + test("native scan claims hdfs:// when libhdfs.schemes is unset (native-default lockstep)") { + // Native's `is_hdfs_scheme` treats `hdfs` as readable when `fs.comet.libhdfs.schemes` is unset, + // and `create_hdfs_object_store` is in the default build. The JVM gate must agree: with the + // config unset, an `hdfs://` scan must be CLAIMED by Comet, not declined to Spark. Guards the + // `case None => Set("hdfs")` default in CometScanRule against the silent-fallback regression + // Andy flagged in #4525. + val path = s"${FakeHdfsSchemeFileSystem.PREFIX}${fakeRootDir.getAbsolutePath}/hdfs-data" + spark.range(0, 10).toDF("id").write.format("parquet").mode(SaveMode.Overwrite).save(path) + + var sparkPlan: SparkPlan = null + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + sparkPlan = spark.read.parquet(path).queryExecution.executedPlan + } + + withSQLConf( + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true") { + val transformed = CometScanRule(spark).apply(stripAQEPlan(sparkPlan)) + + val cometScans = transformed.collect { case s: CometScanExec => s } + val sparkScans = transformed.collect { case s: FileSourceScanExec => s } + assert( + cometScans.size == 1, + s"`hdfs://` is natively readable by default; Comet must claim the scan, " + + s"but it fell back to Spark:\n$transformed") + assert(sparkScans.isEmpty, s"expected no leftover Spark FileSourceScanExec:\n$transformed") + } + } +}