From bad57e542daf0e9572d350b66eaef7658db748e1 Mon Sep 17 00:00:00 2001 From: Scott Schenkein Date: Sat, 30 May 2026 07:22:27 -0400 Subject: [PATCH 1/4] feat: surface native parquet read failures as FAILED_READ_FILE When Comet's native DataFusion scan hits a corrupt footer, corrupt page/column data, a truncated/empty file, or a deleted file, it rethrew the raw native message instead of Spark's FAILED_READ_FILE. The native path does not go through Spark's FileScanRDD, so the offending path was usually missing too. Classify these failures by TYPED DataFusionError variant in the native error path (ParquetError / ObjectStore / ArrowError-wrapping-ParquetError / IoError, unwrapping Context/Shared) rather than by matching error-message prose -- the strings come from three upstream crates (DataFusion, arrow-rs, object_store) and drift across version bumps with no compile-time signal. The match arms are checked by the compiler. - native: new SparkError::CannotReadFile { file_path, message } variant; a typed try_classify_file_read_error in the JNI bridge converts a file-read DataFusionError into it, replacing the previous "not found"/"No such file" string match. file_path is taken from object_store::Error::NotFound when available. Deliberately does NOT match object_store Generic errors (also used for non-file config errors that must surface as-is). - JVM: the structured error crosses JNI as the existing CometQueryExecutionException JSON payload; SparkErrorConverter decodes "CannotReadFile" and, when the native error carried no path, fills it from the per-task file list threaded from CometNativeScanExec via CometExecRDD. The shims wrap it via QueryExecutionErrors.cannotReadFilesError. No JVM-side message matching. Closes #4529 Co-Authored-By: Claude Opus 4.7 --- native/common/src/error.rs | 24 +++ native/jni-bridge/src/errors.rs | 189 ++++++++++++++++-- .../org/apache/comet/CometExecIterator.scala | 22 +- .../apache/comet/SparkErrorConverter.scala | 17 +- .../apache/spark/sql/comet/CometExecRDD.scala | 19 +- .../spark/sql/comet/CometNativeScanExec.scala | 15 +- .../comet/shims/ShimSparkErrorConverter.scala | 10 + .../comet/shims/ShimSparkErrorConverter.scala | 10 + .../comet/SparkErrorConverterSuite.scala | 41 ++++ .../apache/comet/exec/CometExecSuite.scala | 36 ++++ 10 files changed, 332 insertions(+), 51 deletions(-) diff --git a/native/common/src/error.rs b/native/common/src/error.rs index 1e2d7db9c4..4453d76d8c 100644 --- a/native/common/src/error.rs +++ b/native/common/src/error.rs @@ -215,6 +215,15 @@ pub enum SparkError { spark_type: String, }, + /// A per-file read failure (corrupt footer/page, truncated/empty file, deleted file) raised by + /// the native parquet reader / object_store. Classified by typed `DataFusionError` variant (no + /// message matching) and translated by the JVM shim into Spark's `FAILED_READ_FILE` + /// (`QueryExecutionErrors.cannotReadFilesError`). `file_path` may be empty when the underlying + /// error doesn't carry it (only `object_store::Error::NotFound` does); the JVM side then fills + /// it from the per-task file list. + #[error("Encountered error while reading file {file_path}: {message}")] + CannotReadFile { file_path: String, message: String }, + #[error("ArrowError: {0}.")] Arrow(Arc), @@ -291,6 +300,7 @@ impl SparkError { SparkError::DuplicateFieldByFieldId { .. } => "DuplicateFieldByFieldId", SparkError::ParquetMissingFieldIds => "ParquetMissingFieldIds", SparkError::ParquetSchemaConvert { .. } => "ParquetSchemaConvert", + SparkError::CannotReadFile { .. } => "CannotReadFile", SparkError::Arrow(_) => "Arrow", SparkError::Internal(_) => "Internal", } @@ -528,6 +538,12 @@ impl SparkError { "sparkType": spark_type, }) } + SparkError::CannotReadFile { file_path, message } => { + serde_json::json!({ + "filePath": file_path, + "message": message, + }) + } SparkError::Arrow(e) => { serde_json::json!({ "message": e.to_string(), @@ -617,6 +633,10 @@ impl SparkError { "org/apache/spark/sql/execution/datasources/SchemaColumnConvertNotSupportedException" } + // CannotReadFile - converted to a FAILED_READ_FILE SparkException by the shim + // (QueryExecutionErrors.cannotReadFilesError). + SparkError::CannotReadFile { .. } => "org/apache/spark/SparkException", + // Generic errors SparkError::Arrow(_) | SparkError::Internal(_) => "org/apache/spark/SparkException", } @@ -707,6 +727,10 @@ impl SparkError { // SparkException error class, so no error class is exposed here. SparkError::ParquetSchemaConvert { .. } => None, + // CannotReadFile — the JVM shim wraps it via cannotReadFilesError, which supplies the + // FAILED_READ_FILE error class, so none is exposed here. + SparkError::CannotReadFile { .. } => None, + // Generic errors (no error class) SparkError::Arrow(_) | SparkError::Internal(_) => None, } diff --git a/native/jni-bridge/src/errors.rs b/native/jni-bridge/src/errors.rs index 7bf4073c8d..bb7025aeb7 100644 --- a/native/jni-bridge/src/errors.rs +++ b/native/jni-bridge/src/errors.rs @@ -490,7 +490,7 @@ fn throw_exception(env: &mut Env, error: &CometError, backtrace: Option) // Handle DataFusion errors containing SparkError or SparkErrorWithContext CometError::DataFusion { msg: _, - source: DataFusionError::External(e), + source: df_error @ DataFusionError::External(e), } => { if let Some(spark_error_with_ctx) = e.downcast_ref::() { let json_message = spark_error_with_ctx.to_json(); @@ -504,32 +504,34 @@ fn throw_exception(env: &mut Env, error: &CometError, backtrace: Option) jni::jni_str!("org/apache/comet/exceptions/CometQueryExecutionException"), JNIString::new(json_message), ) + } else if let Some(spark_error) = try_classify_file_read_error(df_error) { + throw_spark_error_as_json(env, &spark_error) } else { - // Check for file-not-found errors from object store - let error_msg = e.to_string(); - if error_msg.contains("not found") - && error_msg.contains("No such file or directory") - { - let spark_error = SparkError::FileNotFound { message: error_msg }; - throw_spark_error_as_json(env, &spark_error) - } else { - // Not a SparkError, use generic exception - let exception = error.to_exception(); - match backtrace { - Some(backtrace_string) => env.throw_new( - JNIString::new(exception.class), - JNIString::new( - to_stacktrace_string(exception.msg, backtrace_string).unwrap(), - ), - ), - _ => env.throw_new( - JNIString::new(exception.class), - JNIString::new(exception.msg), + // Not a SparkError, use generic exception + let exception = error.to_exception(); + match backtrace { + Some(backtrace_string) => env.throw_new( + JNIString::new(exception.class), + JNIString::new( + to_stacktrace_string(exception.msg, backtrace_string).unwrap(), ), - } + ), + _ => env.throw_new( + JNIString::new(exception.class), + JNIString::new(exception.msg), + ), } } } + // Typed file-read errors (corrupt/truncated/deleted parquet, object_store, IO) raised + // by the native scan. Classified by DataFusionError variant -- not message text -- and + // surfaced as FAILED_READ_FILE via the structured SparkError channel. + CometError::DataFusion { msg: _, source } + if try_classify_file_read_error(source).is_some() => + { + let spark_error = try_classify_file_read_error(source).unwrap(); + throw_spark_error_as_json(env, &spark_error) + } // Handle direct SparkError - serialize to JSON CometError::Spark(spark_error) => throw_spark_error_as_json(env, spark_error), _ => { @@ -574,6 +576,55 @@ fn throw_spark_error_as_json(env: &mut Env, spark_error: &SparkError) -> jni::er ) } +/// Classify a `DataFusionError` as a per-file read failure by TYPED variant (not message text), +/// returning `SparkError::CannotReadFile` if so. This is the structured replacement for the +/// previous JVM-side substring matching on error prose. +/// +/// A file-read failure is any of: +/// - `ParquetError` (corrupt footer/page, EOF, "failed to fill whole buffer", etc.) +/// - `ObjectStore` (truncated/empty/deleted file, range errors) -- `NotFound` carries the path +/// - `ArrowError`, when it wraps a `ParquetError` (the parquet reader surfaces some failures as +/// `ArrowError::ParquetError`) +/// - `IoError` (filesystem read failures) +/// +/// `Context`/`Shared` wrappers are unwrapped recursively. Note we do NOT match `Execution`/ +/// `Internal`/`External`-string or `object_store::Error::Generic`: those also carry non-file +/// errors (e.g. "Hdfs support is not enabled in this build") that must surface as-is. +/// +/// `file_path` is populated from `object_store::Error::NotFound { path, .. }` when available; +/// otherwise it is left empty and the JVM side fills it from the per-task file list. +fn try_classify_file_read_error(error: &DataFusionError) -> Option { + use datafusion::common::DataFusionError as DFE; + match error { + DFE::ParquetError(_) | DFE::IoError(_) => Some(SparkError::CannotReadFile { + file_path: String::new(), + message: error.to_string(), + }), + DFE::ObjectStore(e) => { + let file_path = match e.as_ref() { + datafusion::object_store::Error::NotFound { path, .. } => path.clone(), + _ => String::new(), + }; + Some(SparkError::CannotReadFile { + file_path, + message: error.to_string(), + }) + } + // The parquet reader sometimes surfaces a failure as ArrowError::ParquetError. + DFE::ArrowError(e, _) => match e.as_ref() { + ArrowError::ParquetError(_) => Some(SparkError::CannotReadFile { + file_path: String::new(), + message: error.to_string(), + }), + _ => None, + }, + // Unwrap context/shared wrappers and re-classify the inner error. + DFE::Context(_, inner) => try_classify_file_read_error(inner), + DFE::Shared(inner) => try_classify_file_read_error(inner), + _ => None, + } +} + /// Try to convert a DataFusion "Unable to get field named" error into a SparkError. /// DataFusion produces this error when reading Parquet files with duplicate field names /// in case-insensitive mode. For example, if a Parquet file has columns "B" and "b", @@ -1101,4 +1152,98 @@ mod tests { // first line. assert_starts_with!(msg_rust, expected_message); } + + // --- try_classify_file_read_error: typed classification of file-read DataFusionErrors --- + // These guard the variant matching that replaced JVM-side error-message string matching. They + // need no JVM (pure DataFusionError -> Option), so they also run under miri. + + use datafusion::common::DataFusionError; + + fn file_path_of(err: &SparkError) -> &str { + match err { + SparkError::CannotReadFile { file_path, .. } => file_path, + other => panic!("expected CannotReadFile, got {other:?}"), + } + } + + #[test] + fn classify_parquet_error_is_file_read() { + let e = DataFusionError::ParquetError(Box::new(parquet::errors::ParquetError::General( + "corrupt footer".to_string(), + ))); + let classified = try_classify_file_read_error(&e); + assert!( + classified.is_some(), + "ParquetError should classify as file-read" + ); + // No path available from a bare ParquetError; JVM fills it from the per-task list. + assert_eq!(file_path_of(&classified.unwrap()), ""); + } + + #[test] + fn classify_arrow_parquet_error_is_file_read() { + // arrow-rs surfaces some parquet read failures as ArrowError::ParquetError. + let e = DataFusionError::ArrowError( + Box::new(ArrowError::ParquetError( + "failed to fill whole buffer".to_string(), + )), + None, + ); + assert!( + try_classify_file_read_error(&e).is_some(), + "ArrowError(ParquetError) should classify as file-read" + ); + } + + #[test] + fn classify_object_store_not_found_carries_path() { + let e = DataFusionError::ObjectStore(Box::new(datafusion::object_store::Error::NotFound { + path: "file:/tmp/data/part-3.parquet".to_string(), + source: "missing".into(), + })); + let classified = + try_classify_file_read_error(&e).expect("NotFound should classify as file-read"); + assert_eq!(file_path_of(&classified), "file:/tmp/data/part-3.parquet"); + } + + #[test] + fn classify_io_error_is_file_read() { + let e = DataFusionError::IoError(io::Error::new(io::ErrorKind::UnexpectedEof, "eof")); + assert!(try_classify_file_read_error(&e).is_some()); + } + + #[test] + fn classify_unwraps_context_and_shared() { + let inner = DataFusionError::ParquetError(Box::new( + parquet::errors::ParquetError::General("corrupt".to_string()), + )); + let ctx = DataFusionError::Context("reading file".to_string(), Box::new(inner)); + assert!( + try_classify_file_read_error(&ctx).is_some(), + "Context-wrapped ParquetError should classify" + ); + let shared = DataFusionError::Shared(Arc::new(DataFusionError::ObjectStore(Box::new( + datafusion::object_store::Error::NotFound { + path: "p".to_string(), + source: "x".into(), + }, + )))); + assert!( + try_classify_file_read_error(&shared).is_some(), + "Shared-wrapped ObjectStore error should classify" + ); + } + + #[test] + fn classify_non_file_errors_are_not_file_read() { + // Execution / Internal errors (and object_store Generic config errors, which arrive as + // Execution strings) must NOT be masked as file-read failures. + assert!(try_classify_file_read_error(&DataFusionError::Execution( + "Hdfs support is not enabled in this build".to_string() + )) + .is_none()); + assert!( + try_classify_file_read_error(&DataFusionError::Internal("bug".to_string())).is_none() + ); + } } diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 6140eca553..dd37b679d1 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -68,7 +68,8 @@ class CometExecIterator( partitionIndex: Int, broadcastedHadoopConfForEncryption: Option[Broadcast[SerializableConfiguration]] = None, encryptedFilePaths: Seq[String] = Seq.empty, - shuffleBlockIterators: Map[Int, CometShuffleBlockIterator] = Map.empty) + shuffleBlockIterators: Map[Int, CometShuffleBlockIterator] = Map.empty, + taskFilePaths: Seq[String] = Seq.empty) extends Iterator[ColumnarBatch] with Logging { @@ -169,29 +170,14 @@ class CometExecIterator( // Handle CometQueryExecutionException with JSON payload first case e: CometQueryExecutionException => logError(s"Native execution for task $taskAttemptId failed", e) - throw SparkErrorConverter.convertToSparkException(e) + throw SparkErrorConverter.convertToSparkException(e, taskFilePaths) case e: CometNativeException => // it is generally considered bad practice to log and then rethrow an // exception, but it really helps debugging to be able to see which task // threw the exception, so we log the exception with taskAttemptId here logError(s"Native execution for task $taskAttemptId failed", e) - - val parquetError: scala.util.matching.Regex = - """^Parquet error: (?:.*)$""".r - e.getMessage match { - case parquetError() => - // See org.apache.spark.sql.errors.QueryExecutionErrors.failedToReadDataError - // See org.apache.parquet.hadoop.ParquetFileReader for error message. - // _LEGACY_ERROR_TEMP_2254 has no message placeholders; Spark 4 strict-checks - // parameters and raises INTERNAL_ERROR if any are passed. - throw new SparkException( - errorClass = "_LEGACY_ERROR_TEMP_2254", - messageParameters = Map.empty, - cause = new SparkException("File is not a Parquet file.", e)) - case _ => - throw e - } + throw e case e: Throwable => throw e } diff --git a/spark/src/main/scala/org/apache/comet/SparkErrorConverter.scala b/spark/src/main/scala/org/apache/comet/SparkErrorConverter.scala index 36059684c9..a6bc21aca7 100644 --- a/spark/src/main/scala/org/apache/comet/SparkErrorConverter.scala +++ b/spark/src/main/scala/org/apache/comet/SparkErrorConverter.scala @@ -69,7 +69,9 @@ object SparkErrorConverter extends ShimSparkErrorConverter { * @return * the corresponding Spark exception, or the original exception if parsing fails */ - def convertToSparkException(e: CometQueryExecutionException): Throwable = { + def convertToSparkException( + e: CometQueryExecutionException, + taskFilePaths: Seq[String] = Seq.empty): Throwable = { try { if (!e.isJsonMessage()) { // Not JSON, return original exception @@ -83,7 +85,18 @@ object SparkErrorConverter extends ShimSparkErrorConverter { val json = parse(e.getMessage) val errorJson = json.extract[ErrorJson] - val params = errorJson.params.getOrElse(Map.empty) + val rawParams = errorJson.params.getOrElse(Map.empty) + // CannotReadFile carries the offending file path natively only for the object_store NotFound + // case; for corrupt/truncated parquet the native error has no path, so fall back to the + // per-task file list threaded in from CometExecIterator. + val params = + if (errorJson.errorType == "CannotReadFile" + && rawParams.get("filePath").forall(p => p == null || p.toString.isEmpty) + && taskFilePaths.nonEmpty) { + rawParams + ("filePath" -> taskFilePaths.mkString(",")) + } else { + rawParams + } val errorClass = errorJson.errorClass.map(_.trim).filter(_.nonEmpty).getOrElse(UNKNOWN_ERROR) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala index 47eda98a11..1b728aa3a0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala @@ -36,7 +36,8 @@ import org.apache.comet.serde.OperatorOuterClass private[spark] class CometExecPartition( override val index: Int, val inputPartitions: Array[Partition], - val planDataByKey: Map[String, Array[Byte]]) + val planDataByKey: Map[String, Array[Byte]], + val filePaths: Seq[String] = Seq.empty) extends Partition /** @@ -66,7 +67,8 @@ private[spark] class CometExecRDD( subqueries: Seq[ScalarSubquery], broadcastedHadoopConfForEncryption: Option[Broadcast[SerializableConfiguration]] = None, encryptedFilePaths: Seq[String] = Seq.empty, - shuffleScanIndices: Set[Int] = Set.empty) + shuffleScanIndices: Set[Int] = Set.empty, + @transient perPartitionFilePaths: Array[Seq[String]] = Array.empty) extends RDD[ColumnarBatch](sc, inputRDDs.map(rdd => new OneToOneDependency(rdd))) { // Determine partition count: from inputs if available, otherwise from parameter @@ -90,7 +92,9 @@ private[spark] class CometExecRDD( (0 until numPartitions).map { idx => val inputParts = inputRDDs.map(_.partitions(idx)).toArray val planData = perPartitionByKey.map { case (key, arr) => key -> arr(idx) } - new CometExecPartition(idx, inputParts, planData) + val fp = + if (perPartitionFilePaths.length > idx) perPartitionFilePaths(idx) else Seq.empty[String] + new CometExecPartition(idx, inputParts, planData, fp) }.toArray } @@ -130,7 +134,8 @@ private[spark] class CometExecRDD( partition.index, broadcastedHadoopConfForEncryption, encryptedFilePaths, - shuffleBlockIters) + shuffleBlockIters, + taskFilePaths = partition.filePaths) // Register ScalarSubqueries so native code can look them up subqueries.foreach(sub => CometScalarSubquery.setSubquery(it.id, sub)) @@ -179,7 +184,8 @@ object CometExecRDD { subqueries: Seq[ScalarSubquery], broadcastedHadoopConfForEncryption: Option[Broadcast[SerializableConfiguration]] = None, encryptedFilePaths: Seq[String] = Seq.empty, - shuffleScanIndices: Set[Int] = Set.empty): CometExecRDD = { + shuffleScanIndices: Set[Int] = Set.empty, + perPartitionFilePaths: Array[Seq[String]] = Array.empty): CometExecRDD = { // scalastyle:on new CometExecRDD( @@ -194,6 +200,7 @@ object CometExecRDD { subqueries, broadcastedHadoopConfForEncryption, encryptedFilePaths, - shuffleScanIndices) + shuffleScanIndices, + perPartitionFilePaths) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala index b9fc47c5c8..0ce8547563 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala @@ -154,7 +154,8 @@ case class CometNativeScanExec( * all files for all partitions in the driver, we serialize only common metadata (once) and each * partition's files (lazily, as tasks are scheduled). */ - @transient private lazy val serializedPartitionData: (Array[Byte], Array[Array[Byte]]) = { + @transient private lazy val serializedPartitionData + : (Array[Byte], Array[Array[Byte]], Array[Seq[String]]) = { // Outer partitionFilters (wrapper) DPP is resolved by Spark's standard // prepare -> waitForSubqueries lifecycle, triggered explicitly via // CometLeafExec.ensureSubqueriesResolved called from @@ -225,13 +226,20 @@ case class CometNativeScanExec( partitionNativeScan.toByteArray }.toArray - (commonBytes, perPartitionBytes) + // File paths per partition -- threaded through CometExecRDD to CometExecIterator so a native + // CannotReadFile error that lacks a path (corrupt/truncated parquet) can be surfaced as + // FAILED_READ_FILE naming the actual file (see SparkErrorConverter.convertToSparkException). + val perPartitionPaths = filePartitions.map(_.files.map(_.filePath.toString).toSeq).toArray + + (commonBytes, perPartitionBytes, perPartitionPaths) } def commonData: Array[Byte] = serializedPartitionData._1 def perPartitionData: Array[Array[Byte]] = serializedPartitionData._2 + def perPartitionFilePaths: Array[Seq[String]] = serializedPartitionData._3 + override def doExecuteColumnar(): RDD[ColumnarBatch] = { val nativeMetrics = CometMetricNode.fromCometPlan(this) val serializedPlan = CometExec.serializeNativePlan(nativeOp) @@ -259,7 +267,8 @@ case class CometNativeScanExec( nativeMetrics, Seq.empty, broadcastedHadoopConfForEncryption, - encryptedFilePaths) { + encryptedFilePaths, + perPartitionFilePaths = perPartitionFilePaths) { override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = { val res = super.compute(split, context) diff --git a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index 2c803cab6d..c502e4d55d 100644 --- a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -336,6 +336,16 @@ trait ShimSparkErrorConverter { QueryExecutionErrors.readCurrentFileNotFoundError( new FileNotFoundException(s"File $path does not exist"))) + case "CannotReadFile" => + // A per-file read failure (corrupt/truncated/deleted parquet, object_store, IO) classified + // by typed DataFusionError variant on the native side. Wrap in the FAILED_READ_FILE + // SparkException Spark itself produces when its own parquet reader fails. `filePath` is + // supplied by the native object_store NotFound error or, when empty, filled by + // SparkErrorConverter from the per-task file list. + val message = params.get("message").map(_.toString).getOrElse("") + val filePath = params.get("filePath").map(_.toString).getOrElse("") + Some(QueryExecutionErrors.cannotReadFilesError(new SparkException(message), filePath)) + case _ => None } diff --git a/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index ad5481c377..874a6af97c 100644 --- a/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -351,6 +351,16 @@ trait ShimSparkErrorConverter { QueryExecutionErrors .fileNotExistError(path, new FileNotFoundException(s"File $path does not exist"))) + case "CannotReadFile" => + // A per-file read failure (corrupt/truncated/deleted parquet, object_store, IO) classified + // by typed DataFusionError variant on the native side. Wrap in the FAILED_READ_FILE + // SparkException Spark itself produces when its own parquet reader fails. `filePath` is + // supplied by the native object_store NotFound error or, when empty, filled by + // SparkErrorConverter from the per-task file list. + val message = params.get("message").map(_.toString).getOrElse("") + val filePath = params.get("filePath").map(_.toString).getOrElse("") + Some(QueryExecutionErrors.cannotReadFilesError(new SparkException(message), filePath)) + case _ => // Unknown error type - return None to trigger fallback None diff --git a/spark/src/test/scala/org/apache/comet/SparkErrorConverterSuite.scala b/spark/src/test/scala/org/apache/comet/SparkErrorConverterSuite.scala index d3e2c2c64b..631d18e141 100644 --- a/spark/src/test/scala/org/apache/comet/SparkErrorConverterSuite.scala +++ b/spark/src/test/scala/org/apache/comet/SparkErrorConverterSuite.scala @@ -22,6 +22,47 @@ package org.apache.comet import org.scalatest.funsuite.AnyFunSuite class SparkErrorConverterSuite extends AnyFunSuite { + + test("CannotReadFile converts to a FAILED_READ_FILE SparkException naming the file") { + val ex = SparkErrorConverter + .convertErrorType( + "CannotReadFile", + "", + Map( + "filePath" -> "file:/tmp/data/part-0.parquet", + "message" -> "Parquet error: bad footer"), + Array.empty, + null) + .getOrElse(fail("Expected CannotReadFile to be converted to a Spark exception")) + assert(ex.getMessage.contains("FAILED_READ_FILE")) + assert(ex.getMessage.contains("part-0.parquet")) + } + + test("CannotReadFile with empty native path falls back to the per-task file list") { + // The native error (e.g. corrupt parquet) carries no path; convertToSparkException must fill + // it from the per-task file list threaded in from CometExecIterator. + val json = + """{"errorType":"CannotReadFile","errorClass":"",""" + + """"params":{"filePath":"","message":"Parquet error: bad footer"}}""" + val ex = SparkErrorConverter.convertToSparkException( + new org.apache.comet.exceptions.CometQueryExecutionException(json), + taskFilePaths = Seq("file:/tmp/data/part-7.parquet")) + assert(ex.getMessage.contains("FAILED_READ_FILE")) + assert(ex.getMessage.contains("part-7.parquet")) + } + + test("CannotReadFile prefers the native path over the per-task file list") { + // When object_store supplied the path (NotFound), keep it rather than the fallback list. + val json = + """{"errorType":"CannotReadFile","errorClass":"",""" + + """"params":{"filePath":"file:/tmp/data/native.parquet","message":"Object at location ... not found"}}""" + val ex = SparkErrorConverter.convertToSparkException( + new org.apache.comet.exceptions.CometQueryExecutionException(json), + taskFilePaths = Seq("file:/tmp/data/fallback.parquet")) + assert(ex.getMessage.contains("native.parquet")) + assert(!ex.getMessage.contains("fallback.parquet")) + } + private def castOverflowError(fromType: String, value: String): Throwable = { SparkErrorConverter .convertErrorType( diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index a1460427c0..f84bab0fb5 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -3974,6 +3974,42 @@ class CometExecSuite extends CometTestBase { } } + test("native parquet read failure surfaces as FAILED_READ_FILE with the file path") { + withTempDir { dir => + val path = new Path(dir.toURI.toString, "corrupt.parquet") + makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = false, 1000) + // Corrupt column/page data in the middle of the file while leaving the footer intact, so + // Spark's JVM-side footer pre-check passes during planning and the native DataFusion reader + // fails during execution -- the path CometExecIterator must wrap as FAILED_READ_FILE. + val f = new java.io.File(new java.net.URI(path.toString)) + val raf = new java.io.RandomAccessFile(f, "rw") + val len = raf.length() + raf.seek(8) // after the "PAR1" magic header, before the footer + raf.write(Array.fill[Byte](math.min(2048, (len / 2).toInt))(0xff.toByte)) + raf.close() + + withSQLConf(CometConf.COMET_ENABLED.key -> "true") { + val e = intercept[Throwable] { + spark.read.parquet(path.toString).collect() + } + // Spark reports its own per-file read failures as FAILED_READ_FILE carrying the path. + // Comet's native scan must do the same instead of leaking a raw CometNativeException. + val messages = Iterator + .iterate(e: Throwable)(_.getCause) + .takeWhile(_ != null) + .map(t => s"${t.getClass.getName}: ${t.getMessage}") + .toList + val chain = messages.mkString("\n ") + assert( + messages.exists(m => m.contains("FAILED_READ_FILE")), + s"Expected a FAILED_READ_FILE exception in the cause chain, but got:\n $chain") + assert( + messages.exists(m => m.contains("corrupt.parquet")), + s"Expected the offending file path in the cause chain, but got:\n $chain") + } + } + } + } case class BucketedTableTestSpec( From 9a5c64ec4a18473e78482743eac00d4ce9377b0e Mon Sep 17 00:00:00 2001 From: Scott Schenkein Date: Sun, 31 May 2026 10:22:48 -0400 Subject: [PATCH 2/4] fix: classify a missing file as FileNotFound (readCurrentFileNotFoundError), not CannotReadFile A genuinely-missing file (object_store NotFound) is distinct from a corrupt/truncated one: Spark surfaces it via `readCurrentFileNotFoundError` ("It is possible the underlying files have been updated."), not `cannotReadFilesError` (FAILED_READ_FILE). `try_classify_file_read_error` mapped every per-file read failure -- including NotFound -- to `SparkError::CannotReadFile`, so a file removed between planning and execution produced the wrong Spark error. Classify object_store NotFound as `SparkError::FileNotFound` instead. The NotFound may arrive directly (`DataFusionError::ObjectStore`) or wrapped by the parquet reader as `ParquetError::External(..)` / `ArrowError::ParquetError`, so a `source_chain_has_object_store_not_found` helper walks the typed source chain (never message text). Corrupt/truncated reads stay CannotReadFile -> FAILED_READ_FILE. The JVM shim already maps the `FileNotFound` errorType to `readCurrentFileNotFoundError`, so no shim change is needed. Surfaced by Delta's CDC-after-VACUUM read: `DeltaVacuumSuite` "vacuum for cdc - update/merge" and "... - delete tombstones" vacuum the `_change_data` files and assert the subsequent read throws `readCurrentFileNotFoundError`; with the native scan these failed because Comet returned the cannotReadFilesError message. Both pass with this fix (verified locally). Tests: - Rust unit tests for the classifier: object_store NotFound (direct and ParquetError::External-wrapped) -> FileNotFound; corrupt ParquetError stays CannotReadFile. - Spark `CometExecSuite` "native parquet read of a missing file surfaces readCurrentFileNotFoundError" (red before, green after): reads a file deleted between planning and execution. - Made the existing FAILED_READ_FILE corrupt-file assertion spark-version-stable (assert "Encountered error while reading file" -- present on both 3.5 and 4.x; only 4.x prepends the [FAILED_READ_FILE.NO_HINT] class tag), so the test passes under -Pspark-3.5 as well. Co-Authored-By: Claude Opus 4.8 (1M context) --- native/jni-bridge/src/errors.rs | 97 ++++++++++++++++--- .../apache/comet/exec/CometExecSuite.scala | 40 +++++++- 2 files changed, 122 insertions(+), 15 deletions(-) diff --git a/native/jni-bridge/src/errors.rs b/native/jni-bridge/src/errors.rs index bb7025aeb7..b10ce70e25 100644 --- a/native/jni-bridge/src/errors.rs +++ b/native/jni-bridge/src/errors.rs @@ -596,22 +596,36 @@ fn throw_spark_error_as_json(env: &mut Env, spark_error: &SparkError) -> jni::er fn try_classify_file_read_error(error: &DataFusionError) -> Option { use datafusion::common::DataFusionError as DFE; match error { + // A genuinely-missing file (object_store NotFound) is distinct from a corrupt/truncated + // one: Spark surfaces it as `readCurrentFileNotFoundError` ("It is possible the underlying + // files have been updated."), not `cannotReadFilesError`. The NotFound may arrive directly + // (`DFE::ObjectStore`) or wrapped by the parquet reader as `ParquetError::External(..)`, so + // inspect the source chain. Delta's CDC-after-VACUUM read depends on this distinction. + DFE::ParquetError(pe) if source_chain_has_object_store_not_found(pe.as_ref()) => { + Some(SparkError::FileNotFound { + message: error.to_string(), + }) + } DFE::ParquetError(_) | DFE::IoError(_) => Some(SparkError::CannotReadFile { file_path: String::new(), message: error.to_string(), }), - DFE::ObjectStore(e) => { - let file_path = match e.as_ref() { - datafusion::object_store::Error::NotFound { path, .. } => path.clone(), - _ => String::new(), - }; - Some(SparkError::CannotReadFile { - file_path, + DFE::ObjectStore(e) => match e.as_ref() { + datafusion::object_store::Error::NotFound { .. } => Some(SparkError::FileNotFound { message: error.to_string(), - }) - } + }), + _ => Some(SparkError::CannotReadFile { + file_path: String::new(), + message: error.to_string(), + }), + }, // The parquet reader sometimes surfaces a failure as ArrowError::ParquetError. DFE::ArrowError(e, _) => match e.as_ref() { + ArrowError::ParquetError(_) if source_chain_has_object_store_not_found(e.as_ref()) => { + Some(SparkError::FileNotFound { + message: error.to_string(), + }) + } ArrowError::ParquetError(_) => Some(SparkError::CannotReadFile { file_path: String::new(), message: error.to_string(), @@ -625,6 +639,24 @@ fn try_classify_file_read_error(error: &DataFusionError) -> Option { } } +/// True if `err` or any error in its `source()` chain is an `object_store` `NotFound` -- i.e. a +/// genuinely-missing file. Used to tell a missing file apart from a corrupt/truncated one: the +/// parquet reader wraps the object_store error as `ParquetError::External(..)`, so the typed +/// `NotFound` is only reachable by walking the source chain (we match the typed variant, never the +/// message text). +fn source_chain_has_object_store_not_found(err: &(dyn std::error::Error + 'static)) -> bool { + let mut current: Option<&(dyn std::error::Error + 'static)> = Some(err); + while let Some(e) = current { + if let Some(os) = e.downcast_ref::() { + if matches!(os, datafusion::object_store::Error::NotFound { .. }) { + return true; + } + } + current = e.source(); + } + false +} + /// Try to convert a DataFusion "Unable to get field named" error into a SparkError. /// DataFusion produces this error when reading Parquet files with duplicate field names /// in case-insensitive mode. For example, if a Parquet file has columns "B" and "b", @@ -1196,14 +1228,53 @@ mod tests { } #[test] - fn classify_object_store_not_found_carries_path() { + fn classify_object_store_not_found_is_file_not_found() { + // A genuinely-missing file must classify as FileNotFound (-> readCurrentFileNotFoundError + // on the JVM side), NOT CannotReadFile (-> cannotReadFilesError). The path is carried in + // the message; the JVM shim extracts it. let e = DataFusionError::ObjectStore(Box::new(datafusion::object_store::Error::NotFound { path: "file:/tmp/data/part-3.parquet".to_string(), source: "missing".into(), })); - let classified = - try_classify_file_read_error(&e).expect("NotFound should classify as file-read"); - assert_eq!(file_path_of(&classified), "file:/tmp/data/part-3.parquet"); + match try_classify_file_read_error(&e) { + Some(SparkError::FileNotFound { message }) => { + assert!(message.contains("part-3.parquet"), "message was: {message}") + } + other => panic!("expected FileNotFound, got {other:?}"), + } + } + + #[test] + fn classify_parquet_error_wrapping_not_found_is_file_not_found() { + // The parquet reader surfaces a missing data file as ParquetError::External wrapping an + // object_store NotFound (e.g. a Delta CDC file removed by VACUUM). The NotFound is only + // reachable through the source chain, but it must still classify as FileNotFound. + let os = datafusion::object_store::Error::NotFound { + path: "file:/tmp/t/_change_data/cdc.parquet".to_string(), + source: "missing".into(), + }; + let e = DataFusionError::ParquetError(Box::new(parquet::errors::ParquetError::External( + Box::new(os), + ))); + match try_classify_file_read_error(&e) { + Some(SparkError::FileNotFound { .. }) => {} + other => { + panic!("expected FileNotFound for a NotFound-wrapping ParquetError, got {other:?}") + } + } + } + + #[test] + fn classify_corrupt_parquet_error_stays_cannot_read_file() { + // A corrupt/truncated file (no NotFound in the chain) must remain CannotReadFile + // (-> FAILED_READ_FILE), unchanged by the FileNotFound carve-out. + let e = DataFusionError::ParquetError(Box::new(parquet::errors::ParquetError::General( + "corrupt footer".to_string(), + ))); + match try_classify_file_read_error(&e) { + Some(SparkError::CannotReadFile { .. }) => {} + other => panic!("expected CannotReadFile for a corrupt parquet error, got {other:?}"), + } } #[test] diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index f84bab0fb5..9499577e47 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -4000,9 +4000,12 @@ class CometExecSuite extends CometTestBase { .map(t => s"${t.getClass.getName}: ${t.getMessage}") .toList val chain = messages.mkString("\n ") + // `cannotReadFilesError` is the FAILED_READ_FILE path. Its message is version-stable + // ("Encountered error while reading file ..."); only Spark 4.x prepends the + // `[FAILED_READ_FILE.NO_HINT]` error-class tag, so assert on the stable substring. assert( - messages.exists(m => m.contains("FAILED_READ_FILE")), - s"Expected a FAILED_READ_FILE exception in the cause chain, but got:\n $chain") + messages.exists(m => m.contains("Encountered error while reading file")), + s"Expected a FAILED_READ_FILE (cannotReadFilesError) in the cause chain, but got:\n $chain") assert( messages.exists(m => m.contains("corrupt.parquet")), s"Expected the offending file path in the cause chain, but got:\n $chain") @@ -4010,6 +4013,39 @@ class CometExecSuite extends CometTestBase { } } + test("native parquet read of a missing file surfaces readCurrentFileNotFoundError") { + withTempDir { dir => + val path = new Path(dir.toURI.toString, "missing.parquet") + makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = false, 1000) + val f = new java.io.File(new java.net.URI(path.toString)) + + withSQLConf(CometConf.COMET_ENABLED.key -> "true") { + // Read the schema (footer) while the file exists, then delete it so it is MISSING at + // execution time -- mirroring a file vacuumed/removed between planning and the scan + // (e.g. Delta's CDC-after-VACUUM read). A missing file is distinct from a corrupt one: + // Spark surfaces it as `readCurrentFileNotFoundError` ("It is possible the underlying + // files have been updated."), NOT `cannotReadFilesError`/`FAILED_READ_FILE`. Comet's + // native scan must classify the object_store NotFound the same way. + val df = spark.read.parquet(path.toString) + df.queryExecution.executedPlan // force planning (footer read) before deletion + assert(f.delete(), s"failed to delete $f") + + val e = intercept[Throwable] { + df.collect() + } + val messages = Iterator + .iterate(e: Throwable)(_.getCause) + .takeWhile(_ != null) + .map(t => s"${t.getClass.getName}: ${t.getMessage}") + .toList + val chain = messages.mkString("\n ") + assert( + messages.exists(m => m.contains("It is possible the underlying files have been updated")), + s"Expected readCurrentFileNotFoundError for a missing file, but got:\n $chain") + } + } + } + } case class BucketedTableTestSpec( From 571fce1db53c85ec8c97019854bc5708ba0ca4ee Mon Sep 17 00:00:00 2001 From: Scott Schenkein Date: Sun, 31 May 2026 10:35:58 -0400 Subject: [PATCH 3/4] =?UTF-8?q?fix:=20address=20review=20feedback=20?= =?UTF-8?q?=E2=80=94=20spark-3.4=20shim,=20version-stable=20assertions,=20?= =?UTF-8?q?IoError=20scoping?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses @andygrove's review on #4536: - spark-3.4 shim: add the `CannotReadFile` case (it only existed in the 3.5 and 4.x shims), so a corrupt/truncated read is wrapped via `cannotReadFilesError` (FAILED_READ_FILE) on Spark 3.4 too. (The `FileNotFound` case was already present on 3.4.) - SparkErrorConverterSuite: assert on the version-stable message ("Encountered error while reading file ...") instead of the `FAILED_READ_FILE` literal, which only Spark 4.x prepends to getMessage as the error-class tag (3.4/3.5 render only the message). Fixes the two failing tests on 3.4/3.5; same version-stable style already applied to the CometExecSuite e2e test. - native classifier: stop treating a bare `DataFusionError::IoError` as a file read. Scans surface read failures as a typed ParquetError/ObjectStore error; a bare IoError can also come from non-scan paths (spill, shuffle), which must not be relabelled FAILED_READ_FILE with a per-task path attached. Test updated accordingly. Co-Authored-By: Claude Opus 4.8 (1M context) --- native/jni-bridge/src/errors.rs | 12 +++++++++--- .../sql/comet/shims/ShimSparkErrorConverter.scala | 10 ++++++++++ .../org/apache/comet/SparkErrorConverterSuite.scala | 12 ++++++++---- 3 files changed, 27 insertions(+), 7 deletions(-) diff --git a/native/jni-bridge/src/errors.rs b/native/jni-bridge/src/errors.rs index b10ce70e25..2a70349383 100644 --- a/native/jni-bridge/src/errors.rs +++ b/native/jni-bridge/src/errors.rs @@ -606,7 +606,11 @@ fn try_classify_file_read_error(error: &DataFusionError) -> Option { message: error.to_string(), }) } - DFE::ParquetError(_) | DFE::IoError(_) => Some(SparkError::CannotReadFile { + // NB: only ParquetError / ObjectStore / ArrowError(ParquetError) are treated as file reads. + // A bare `IoError` is intentionally NOT classified here: a scan surfaces read failures as a + // typed ParquetError or ObjectStore error, whereas an `IoError` can also originate from + // non-scan paths (spill, shuffle), which must not be relabelled FAILED_READ_FILE. + DFE::ParquetError(_) => Some(SparkError::CannotReadFile { file_path: String::new(), message: error.to_string(), }), @@ -1278,9 +1282,11 @@ mod tests { } #[test] - fn classify_io_error_is_file_read() { + fn classify_bare_io_error_is_not_file_read() { + // A bare IoError is not a scan read failure (scans surface ParquetError/ObjectStore); it + // can come from spill/shuffle, so it must NOT be classified as FAILED_READ_FILE. let e = DataFusionError::IoError(io::Error::new(io::ErrorKind::UnexpectedEof, "eof")); - assert!(try_classify_file_read_error(&e).is_some()); + assert!(try_classify_file_read_error(&e).is_none()); } #[test] diff --git a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index 5d261493fb..09ac063cd2 100644 --- a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -341,6 +341,16 @@ trait ShimSparkErrorConverter { QueryExecutionErrors.readCurrentFileNotFoundError( new FileNotFoundException(s"File $path does not exist"))) + case "CannotReadFile" => + // A per-file read failure of a readable-but-broken file (corrupt/truncated parquet, + // object_store, IO) classified by typed DataFusionError variant on the native side. Wrap + // in the FAILED_READ_FILE SparkException Spark itself produces when its own parquet reader + // fails. (A genuinely-missing file is "FileNotFound" above.) `filePath` is filled by + // SparkErrorConverter from the per-task file list when the native error carried none. + val message = params.get("message").map(_.toString).getOrElse("") + val filePath = params.get("filePath").map(_.toString).getOrElse("") + Some(QueryExecutionErrors.cannotReadFilesError(new SparkException(message), filePath)) + case _ => None } diff --git a/spark/src/test/scala/org/apache/comet/SparkErrorConverterSuite.scala b/spark/src/test/scala/org/apache/comet/SparkErrorConverterSuite.scala index 631d18e141..3b81a76ead 100644 --- a/spark/src/test/scala/org/apache/comet/SparkErrorConverterSuite.scala +++ b/spark/src/test/scala/org/apache/comet/SparkErrorConverterSuite.scala @@ -34,7 +34,10 @@ class SparkErrorConverterSuite extends AnyFunSuite { Array.empty, null) .getOrElse(fail("Expected CannotReadFile to be converted to a Spark exception")) - assert(ex.getMessage.contains("FAILED_READ_FILE")) + // `cannotReadFilesError` IS the FAILED_READ_FILE path. Assert on the version-stable message + // ("Encountered error while reading file ...") rather than the `FAILED_READ_FILE` literal, + // which only Spark 4.x prepends to getMessage as the error-class tag (3.4/3.5 do not). + assert(ex.getMessage.contains("Encountered error while reading file")) assert(ex.getMessage.contains("part-0.parquet")) } @@ -47,15 +50,16 @@ class SparkErrorConverterSuite extends AnyFunSuite { val ex = SparkErrorConverter.convertToSparkException( new org.apache.comet.exceptions.CometQueryExecutionException(json), taskFilePaths = Seq("file:/tmp/data/part-7.parquet")) - assert(ex.getMessage.contains("FAILED_READ_FILE")) + // Version-stable assertion (see above): only Spark 4.x renders the FAILED_READ_FILE class tag. + assert(ex.getMessage.contains("Encountered error while reading file")) assert(ex.getMessage.contains("part-7.parquet")) } test("CannotReadFile prefers the native path over the per-task file list") { - // When object_store supplied the path (NotFound), keep it rather than the fallback list. + // When the native error supplied a path, keep it rather than the fallback list. val json = """{"errorType":"CannotReadFile","errorClass":"",""" + - """"params":{"filePath":"file:/tmp/data/native.parquet","message":"Object at location ... not found"}}""" + """"params":{"filePath":"file:/tmp/data/native.parquet","message":"Parquet error: corrupt footer"}}""" val ex = SparkErrorConverter.convertToSparkException( new org.apache.comet.exceptions.CometQueryExecutionException(json), taskFilePaths = Seq("file:/tmp/data/fallback.parquet")) From 9d2154c9dddeb51dcb7ada5ee934dfade8728553 Mon Sep 17 00:00:00 2001 From: Scott Schenkein Date: Sun, 31 May 2026 12:41:15 -0400 Subject: [PATCH 4/4] =?UTF-8?q?fix:=20address=20review=20=E2=80=94=20corru?= =?UTF-8?q?pt-footer=20wording=20+=20tidy=20the=20file-read=20throw=20path?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to @andygrove's review on #4536: - (point 3, wording) parquet-rs reports a bad magic / unreadable footer as "Invalid Parquet file. Corrupt footer", whereas Spark's reader -- and Spark's `ParquetQuerySuite` ("ignoreCorruptFiles", "ignoreMissingFiles using parquet") -- phrase it as " is not a Parquet file". `cannot_read_file_message` now appends Spark's phrasing for the magic/footer case so the FAILED_READ_FILE cause carries it. The outer `cannotReadFilesError` wrapper ("Encountered error while reading file …") is unchanged, so this composes with Spark's tests and does not disturb the Delta shims that match Comet's outer message. Other read failures keep their native message. (On behavior: the native scan already declines and falls back to Spark when `spark.sql.files.ignoreCorruptFiles`/`ignoreMissingFiles` is enabled -- CometNativeScan.scala -- so the skip semantics are preserved; no behavior gap.) - (point 5, tidy) `try_classify_file_read_error` is no longer evaluated twice (`.is_some()` guard + `.unwrap()`): the DataFusion arm is a single `if let Some(..)`, and the generic fallback is extracted to `throw_generic_exception`. Tests: classifier unit tests for the magic/footer wording (added) vs other parquet errors (unchanged native message). Co-Authored-By: Claude Opus 4.8 (1M context) --- native/jni-bridge/src/errors.rs | 125 +++++++++++++++++++++++--------- 1 file changed, 89 insertions(+), 36 deletions(-) diff --git a/native/jni-bridge/src/errors.rs b/native/jni-bridge/src/errors.rs index 2a70349383..1971d79c41 100644 --- a/native/jni-bridge/src/errors.rs +++ b/native/jni-bridge/src/errors.rs @@ -523,47 +523,52 @@ fn throw_exception(env: &mut Env, error: &CometError, backtrace: Option) } } } - // Typed file-read errors (corrupt/truncated/deleted parquet, object_store, IO) raised - // by the native scan. Classified by DataFusionError variant -- not message text -- and - // surfaced as FAILED_READ_FILE via the structured SparkError channel. - CometError::DataFusion { msg: _, source } - if try_classify_file_read_error(source).is_some() => - { - let spark_error = try_classify_file_read_error(source).unwrap(); - throw_spark_error_as_json(env, &spark_error) - } - // Handle direct SparkError - serialize to JSON - CometError::Spark(spark_error) => throw_spark_error_as_json(env, spark_error), - _ => { - let error_msg = error.to_string(); - // Check for file-not-found errors that may arrive through other wrapping paths - if error_msg.contains("not found") - && error_msg.contains("No such file or directory") - { - let spark_error = SparkError::FileNotFound { message: error_msg }; - throw_spark_error_as_json(env, &spark_error) - } else if let Some(spark_error) = try_convert_duplicate_field_error(&error_msg) { + // Typed file-read errors (corrupt/truncated parquet, object_store) raised by the native + // scan -- classified by DataFusionError variant, not message text -- surfaced as + // FAILED_READ_FILE / FileNotFound via the structured SparkError channel. Anything else + // falls back to generic handling. + CometError::DataFusion { msg: _, source } => { + if let Some(spark_error) = try_classify_file_read_error(source) { throw_spark_error_as_json(env, &spark_error) } else { - let exception = error.to_exception(); - match backtrace { - Some(backtrace_string) => env.throw_new( - JNIString::new(exception.class), - JNIString::new( - to_stacktrace_string(exception.msg, backtrace_string).unwrap(), - ), - ), - _ => env.throw_new( - JNIString::new(exception.class), - JNIString::new(exception.msg), - ), - } + throw_generic_exception(env, error, backtrace) } } + // Handle direct SparkError - serialize to JSON + CometError::Spark(spark_error) => throw_spark_error_as_json(env, spark_error), + _ => throw_generic_exception(env, error, backtrace), }; } } +/// Generic fallback throw for an error that isn't a structured `SparkError`. Recognises a +/// file-not-found arriving through non-typed wrapping paths and duplicate-field errors; otherwise +/// throws the error's natural JVM exception (with the captured backtrace when available). +fn throw_generic_exception( + env: &mut Env, + error: &CometError, + backtrace: Option, +) -> jni::errors::Result<()> { + let error_msg = error.to_string(); + // A file-not-found that arrived through a non-typed wrapping path (the typed classification + // is handled by `try_classify_file_read_error`). + if error_msg.contains("not found") && error_msg.contains("No such file or directory") { + let spark_error = SparkError::FileNotFound { message: error_msg }; + throw_spark_error_as_json(env, &spark_error) + } else if let Some(spark_error) = try_convert_duplicate_field_error(&error_msg) { + throw_spark_error_as_json(env, &spark_error) + } else { + let exception = error.to_exception(); + match backtrace { + Some(backtrace_string) => env.throw_new( + JNIString::new(exception.class), + JNIString::new(to_stacktrace_string(exception.msg, backtrace_string).unwrap()), + ), + _ => env.throw_new(JNIString::new(exception.class), JNIString::new(exception.msg)), + } + } +} + /// Throws a CometQueryExecutionException with JSON-encoded SparkError fn throw_spark_error_as_json(env: &mut Env, spark_error: &SparkError) -> jni::errors::Result<()> { // Serialize error to JSON @@ -612,7 +617,7 @@ fn try_classify_file_read_error(error: &DataFusionError) -> Option { // non-scan paths (spill, shuffle), which must not be relabelled FAILED_READ_FILE. DFE::ParquetError(_) => Some(SparkError::CannotReadFile { file_path: String::new(), - message: error.to_string(), + message: cannot_read_file_message(error), }), DFE::ObjectStore(e) => match e.as_ref() { datafusion::object_store::Error::NotFound { .. } => Some(SparkError::FileNotFound { @@ -620,7 +625,7 @@ fn try_classify_file_read_error(error: &DataFusionError) -> Option { }), _ => Some(SparkError::CannotReadFile { file_path: String::new(), - message: error.to_string(), + message: cannot_read_file_message(error), }), }, // The parquet reader sometimes surfaces a failure as ArrowError::ParquetError. @@ -632,7 +637,7 @@ fn try_classify_file_read_error(error: &DataFusionError) -> Option { } ArrowError::ParquetError(_) => Some(SparkError::CannotReadFile { file_path: String::new(), - message: error.to_string(), + message: cannot_read_file_message(error), }), _ => None, }, @@ -661,6 +666,21 @@ fn source_chain_has_object_store_not_found(err: &(dyn std::error::Error + 'stati false } +/// Build the message for a `CannotReadFile` error. parquet-rs reports a bad magic / unreadable +/// footer as `"Invalid Parquet file. Corrupt footer"`, whereas Spark's own reader (and Spark's +/// `ParquetQuerySuite`) phrase it as `" is not a Parquet file"`. Append Spark's phrasing so +/// the cause carries it; the outer `cannotReadFilesError` wrapper ("Encountered error while reading +/// file …") is unchanged, so this composes with Spark's tests without changing the FAILED_READ_FILE +/// wrapping. Other read failures (corrupt pages, EOF, IO) keep their native message verbatim. +fn cannot_read_file_message(error: &DataFusionError) -> String { + let msg = error.to_string(); + if msg.contains("Invalid Parquet file") && !msg.contains("is not a Parquet file") { + format!("{msg} (file is not a Parquet file)") + } else { + msg + } +} + /// Try to convert a DataFusion "Unable to get field named" error into a SparkError. /// DataFusion produces this error when reading Parquet files with duplicate field names /// in case-insensitive mode. For example, if a Parquet file has columns "B" and "b", @@ -1281,6 +1301,39 @@ mod tests { } } + #[test] + fn classify_invalid_parquet_file_carries_spark_wording() { + // parquet-rs reports a bad magic / unreadable footer as "Invalid Parquet file. Corrupt + // footer"; Spark's ParquetQuerySuite asserts the cause says "is not a Parquet file". The + // CannotReadFile message must carry that phrasing. + let e = DataFusionError::ParquetError(Box::new(parquet::errors::ParquetError::General( + "Invalid Parquet file. Corrupt footer".to_string(), + ))); + match try_classify_file_read_error(&e) { + Some(SparkError::CannotReadFile { message, .. }) => assert!( + message.contains("is not a Parquet file"), + "message was: {message}" + ), + other => panic!("expected CannotReadFile, got {other:?}"), + } + } + + #[test] + fn classify_other_parquet_error_keeps_native_message() { + // A non-magic parquet failure (e.g. corrupt page data) keeps its native message verbatim + // -- the "is not a Parquet file" phrasing is only added for the magic/footer case. + let e = DataFusionError::ParquetError(Box::new(parquet::errors::ParquetError::General( + "could not decode page header".to_string(), + ))); + match try_classify_file_read_error(&e) { + Some(SparkError::CannotReadFile { message, .. }) => assert!( + !message.contains("is not a Parquet file"), + "message should not be augmented: {message}" + ), + other => panic!("expected CannotReadFile, got {other:?}"), + } + } + #[test] fn classify_bare_io_error_is_not_file_read() { // A bare IoError is not a scan read failure (scans surface ParquetError/ObjectStore); it