Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions crates/integration_tests/tests/read_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1379,6 +1379,62 @@ async fn test_read_schema_evolution_drop_column() {
);
}

/// Test reading a mixed-format table after ALTER TABLE DROP COLUMN.
/// Old Parquet/ORC data files have the dropped column; new Avro files do not.
#[tokio::test]
async fn test_read_mixed_format_schema_evolution_drop_column() {
let (plan, batches) =
scan_and_read_with_fs_catalog("mixed_format_schema_evolution_drop_column", None).await;

let formats: HashSet<&str> = plan
.splits()
.iter()
.flat_map(|split| split.data_files())
.filter_map(|file| file.file_name.rsplit_once('.').map(|(_, ext)| ext))
.collect();
assert_eq!(
formats,
HashSet::from(["avro", "orc", "parquet"]),
"mixed_format_schema_evolution_drop_column should scan all provisioned file formats"
);

for batch in &batches {
assert!(
batch.column_by_name("score").is_none(),
"Dropped column 'score' should not appear in output"
);
}

let mut rows: Vec<(i32, String)> = Vec::new();
for batch in &batches {
let id = batch
.column_by_name("id")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.expect("id");
let name = batch
.column_by_name("name")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.expect("name");
for i in 0..batch.num_rows() {
rows.push((id.value(i), name.value(i).to_string()));
}
}
rows.sort_by_key(|(id, _)| *id);

assert_eq!(
rows,
vec![
(1, "parquet-alice".into()),
(2, "parquet-bob".into()),
(3, "orc-carol".into()),
(4, "orc-dave".into()),
(5, "avro-eve".into()),
(6, "avro-frank".into()),
],
"Mixed-format DROP COLUMN should expose only remaining columns from all file formats"
);
}

// ---------------------------------------------------------------------------
// Complex type integration tests
// ---------------------------------------------------------------------------
Expand Down
44 changes: 44 additions & 0 deletions dev/spark/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,50 @@ def main():
"""
)

# ===== Mixed-format Schema Evolution: Drop Column =====
# Old Parquet/ORC files have (id, name, score); after DROP COLUMN, Avro files
# have only (id, name). Reader should ignore the dropped column in old files.
spark.sql(
"""
CREATE TABLE IF NOT EXISTS mixed_format_schema_evolution_drop_column (
id INT,
name STRING,
score INT
) USING paimon
TBLPROPERTIES (
'file.format' = 'parquet'
)
"""
)
spark.sql(
"""
INSERT INTO mixed_format_schema_evolution_drop_column VALUES
(1, 'parquet-alice', 100),
(2, 'parquet-bob', 200)
"""
)
spark.sql(
"ALTER TABLE mixed_format_schema_evolution_drop_column SET TBLPROPERTIES ('file.format' = 'orc')"
)
spark.sql(
"""
INSERT INTO mixed_format_schema_evolution_drop_column VALUES
(3, 'orc-carol', 300),
(4, 'orc-dave', 400)
"""
)
spark.sql("ALTER TABLE mixed_format_schema_evolution_drop_column DROP COLUMN score")
spark.sql(
"ALTER TABLE mixed_format_schema_evolution_drop_column SET TBLPROPERTIES ('file.format' = 'avro')"
)
spark.sql(
"""
INSERT INTO mixed_format_schema_evolution_drop_column VALUES
(5, 'avro-eve'),
(6, 'avro-frank')
"""
)

# ===== Complex Types table: ARRAY, MAP, STRUCT =====
spark.sql(
"""
Expand Down
Loading