diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java b/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java index e0834f357..1fd28bb2b 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java @@ -24,6 +24,7 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.cloud.bigquery.RangePartitioning; import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.TableDefinition.Type; import com.google.cloud.bigquery.TimePartitioning; import com.google.cloud.hadoop.io.bigquery.AbstractBigQueryInputFormat; @@ -146,7 +147,7 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc readTimeout); Type type = Objects.requireNonNull(bigQueryTable).getDefinition().getType(); Boolean isPartitionFilterRequired = bigQueryTable.getRequirePartitionFilter(); - StandardTableDefinition tableDefinition = Objects.requireNonNull(bigQueryTable).getDefinition(); + TableDefinition tableDefinition = Objects.requireNonNull(bigQueryTable).getDefinition(); String query; if (type == Type.VIEW || type == Type.MATERIALIZED_VIEW || type == Type.EXTERNAL) { @@ -180,7 +181,7 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc @VisibleForTesting String generateQuery(String partitionFromDate, String partitionToDate, String filter, String datasetProject, String dataset, String table, String limit, String orderBy, - Boolean isPartitionFilterRequired, StandardTableDefinition tableDefinition) { + Boolean isPartitionFilterRequired, TableDefinition tableDef) { if (Strings.isNullOrEmpty(filter) && Strings.isNullOrEmpty(orderBy) && Strings.isNullOrEmpty( limit) @@ -188,6 +189,13 @@ String generateQuery(String partitionFromDate, String partitionToDate, String fi return null; } + if (!(tableDef instanceof StandardTableDefinition)) { + throw new IllegalArgumentException( + String.format("Unsupported BigQuery table type for filtering/partitioning: %s. " + + "Cannot apply filters, limits, or ordering.", tableDef.getType())); + } + + StandardTableDefinition tableDefinition = (StandardTableDefinition) tableDef; RangePartitioning rangePartitioning = tableDefinition.getRangePartitioning(); TimePartitioning timePartitioning = tableDefinition.getTimePartitioning(); StringBuilder condition = new StringBuilder();