Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.mutable

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.SubExprUtils.wrapOuterReference
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -147,8 +148,20 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
// When strict DataFrame column resolution is disabled, we also allow name-based
// resolution as a fallback for tagged attributes.
val result = withPosition(u) {
resolveColumnByName(nameParts)
.orElse(LiteralFunctionResolution.resolve(nameParts))
// A parameterless built-in function takes precedence over a SQL UDF parameter
// that happens to share its name (per the documented SQL name resolution rules).
// Real columns from relations -- which don't carry the
// SQL_FUNCTION_PARAMETER_ALIAS_METADATA_KEY -- continue to win as before.
// Gated by a legacy kill-switch conf so the pre-fix behavior can be restored.
val column = resolveColumnByName(nameParts)
val resolved = column match {
case Some(c) if isSQLFunctionParameterAlias(c) && !conf.getConf(
SQLConf.LEGACY_ALLOW_UDF_PARAMETER_TO_SHADOW_PARAMETERLESS_FUNCTION) =>
LiteralFunctionResolution.resolve(nameParts).orElse(column)
case Some(_) => column
case None => LiteralFunctionResolution.resolve(nameParts)
}
resolved
.map {
// We trim unnecessary alias here. Note that, we cannot trim the alias at top-level,
// as we should resolve `UnresolvedAttribute` to a named expression. The caller side
Expand Down Expand Up @@ -696,4 +709,18 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
r.expressions.forall(_.references.subsetOf(outputSet))
}
}

/**
* True if `e` originates from a SQL UDF input parameter alias, as marked by
* `SessionCatalog.SQL_FUNCTION_PARAMETER_ALIAS_METADATA_KEY` at parameter-alias
* construction sites. Unwraps `OuterReference` so callers that pass post-outer-resolution
* expressions still get a correct answer; the metadata lives on the underlying named
* expression.
*/
private def isSQLFunctionParameterAlias(e: Expression): Boolean = e match {
case OuterReference(inner) => isSQLFunctionParameterAlias(inner)
case n: NamedExpression =>
n.metadata.contains(SessionCatalog.SQL_FUNCTION_PARAMETER_ALIAS_METADATA_KEY)
case _ => false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ import org.apache.spark.util.Utils
object SessionCatalog {
val DEFAULT_DATABASE = "default"

/**
* Metadata key marking an Alias / Attribute as originating from a SQL UDF input parameter.
* Consumed by name resolution: a parameterless built-in function takes precedence over a
* tagged alias of the same name.
*/
val SQL_FUNCTION_PARAMETER_ALIAS_METADATA_KEY: String = "__funcInputAlias"

/**
* Kind of session-scoped function namespace for lookup/resolve.
* Used by the kind-based API to avoid separate methods per
Expand Down Expand Up @@ -1880,11 +1887,6 @@ class SessionCatalog(
name: String,
function: SQLFunction,
input: Seq[Expression]): LogicalPlan = {
def metaForFuncInputAlias = {
new MetadataBuilder()
.putString("__funcInputAlias", "true")
.build()
}
assert(!function.isTableFunc,
"Function '" + function.name + "' is a table function. " +
"Use makeSQLTableFunctionPlan() instead of makeSQLFunctionPlan().")
Expand Down Expand Up @@ -1941,6 +1943,9 @@ class SessionCatalog(
}
}

val funcInputMetadata = new MetadataBuilder()
.putBoolean(SessionCatalog.SQL_FUNCTION_PARAMETER_ALIAS_METADATA_KEY, true)
.build()
paddedInput.zip(param.fields).map {
case (expr, param) =>
// Add outer references to all resolved attributes and outer references in the function
Expand All @@ -1950,10 +1955,11 @@ class SessionCatalog(
case a: Attribute if a.resolved => OuterReference(a)
case o: OuterReference => OuterReference(o)
}
// Mark the alias as function input so name resolution can give a parameterless
// built-in function precedence over a same-named UDF parameter.
Alias(Cast(outer, param.dataType), param.name)(
qualifier = qualifier,
// mark the alias as function input
explicitMetadata = Some(metaForFuncInputAlias))
explicitMetadata = Some(funcInputMetadata))
}
}.getOrElse(Nil)

Expand Down Expand Up @@ -2046,6 +2052,10 @@ class SessionCatalog(
val outer = expr.transform {
case a: Attribute => OuterReference(a)
}
// No SQL_FUNCTION_PARAMETER_ALIAS_METADATA_KEY marker here: a table UDF body references
// its parameter as an outer reference (the param lives in the lateral join's left
// child), so resolveColumnByName returns None and a parameterless built-in function
// already wins via the pre-existing "function beats outer reference" precedence.
Alias(Cast(outer, param.dataType), param.name)(qualifier = qualifier)
}
val inputPlan = Project(inputCast, OneRowRelation())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3651,6 +3651,19 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val LEGACY_ALLOW_UDF_PARAMETER_TO_SHADOW_PARAMETERLESS_FUNCTION =
buildConf("spark.sql.legacy.allowUdfParameterToShadowParameterlessFunction")
.internal()
.doc("When true (legacy behavior), a SQL UDF parameter alias shadows a parameterless " +
"built-in function (current_user, current_date, current_time, current_timestamp, " +
"user, session_user, grouping__id) of the same name. When false (the default), the " +
"parameterless built-in function takes precedence, matching the documented name " +
"resolution rules.")
.version("4.2.0")
.withBindingPolicy(ConfigBindingPolicy.SESSION)
.booleanConf
.createWithDefault(false)

val ALLOW_NON_EMPTY_LOCATION_IN_CTAS =
buildConf("spark.sql.legacy.allowNonEmptyLocationInCTAS")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command.CreateUserDefinedFunctionCommand._
import org.apache.spark.sql.types.{DataType, StructField, StructType}
import org.apache.spark.sql.types.{DataType, MetadataBuilder, StructField, StructType}

/**
* The DDL command that creates a SQL function.
Expand Down Expand Up @@ -109,6 +109,18 @@ case class CreateSQLFunctionCommand(
// Qualify the input parameters with the function name so that attributes referencing
// the function input parameters can be resolved correctly.
val qualifier = Seq(name.funcName)
// Mark scalar UDF parameter aliases as function input so name resolution can give a
// parameterless built-in function precedence over a same-named UDF parameter. Table UDF
// bodies reference parameters as outer references, where a parameterless function already
// wins via the pre-existing "function beats outer reference" precedence, so the marker is
// not applied (and would not be consumed) there.
val funcInputMetadata = if (isTableFunc) {
None
} else {
Some(new MetadataBuilder()
.putBoolean(SessionCatalog.SQL_FUNCTION_PARAMETER_ALIAS_METADATA_KEY, true)
.build())
}
val input = param.map(p => Alias(
{
val defaultExpr = p.getDefault()
Expand All @@ -131,7 +143,7 @@ case class CreateSQLFunctionCommand(
}
Cast(defaultPlan, p.dataType)
}
}, p.name)(qualifier = qualifier))
}, p.name)(qualifier = qualifier, explicitMetadata = funcInputMetadata))
Project(input, OneRowRelation())
} else {
OneRowRelation()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
-- Automatically generated by SQLQueryTestSuite
-- !query
CREATE OR REPLACE TEMPORARY VIEW v_user AS SELECT 'admin.admin' AS current_user
-- !query analysis
CreateViewCommand `v_user`, SELECT 'admin.admin' AS current_user, false, true, LocalTempView, UNSUPPORTED, true
+- Project [admin.admin AS current_user#x]
+- OneRowRelation


-- !query
CREATE OR REPLACE TEMPORARY VIEW v_time AS SELECT CAST(0 AS INT) AS current_time
-- !query analysis
CreateViewCommand `v_time`, SELECT CAST(0 AS INT) AS current_time, false, true, LocalTempView, UNSUPPORTED, true
+- Project [cast(0 as int) AS current_time#x]
+- OneRowRelation


-- !query
SELECT current_user FROM v_user
-- !query analysis
Project [current_user#x]
+- SubqueryAlias v_user
+- View (`v_user`, [current_user#x])
+- Project [cast(current_user#x as string) AS current_user#x]
+- Project [admin.admin AS current_user#x]
+- OneRowRelation


-- !query
SELECT current_time FROM v_time
-- !query analysis
Project [current_time#x]
+- SubqueryAlias v_time
+- View (`v_time`, [current_time#x])
+- Project [cast(current_time#x as int) AS current_time#x]
+- Project [cast(0 as int) AS current_time#x]
+- OneRowRelation


-- !query
SELECT 'abc' AS current_user, current_user = current_user() AS function_won
-- !query analysis
[Analyzer test output redacted due to nondeterminism]


-- !query
SELECT (SELECT current_user) = current_user() AS function_won FROM v_user
-- !query analysis
[Analyzer test output redacted due to nondeterminism]


-- !query
DECLARE current_user = 'abc'
-- !query analysis
CreateVariable default(abc, sql=''abc''), false
+- ResolvedIdentifier org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog$@xxxxxxxx, session.current_user


-- !query
SELECT current_user, current_user FROM v_user
-- !query analysis
Project [current_user#x, current_user#x]
+- SubqueryAlias v_user
+- View (`v_user`, [current_user#x])
+- Project [cast(current_user#x as string) AS current_user#x]
+- Project [admin.admin AS current_user#x]
+- OneRowRelation


-- !query
DROP TEMPORARY VARIABLE current_user
-- !query analysis
DropVariable false
+- ResolvedIdentifier org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog$@xxxxxxxx, session.current_user


-- !query
WITH t1 AS (SELECT 1 AS current_date)
SELECT typeof((SELECT current_date)) FROM t1
-- !query analysis
[Analyzer test output redacted due to nondeterminism]


-- !query
WITH t1 AS (SELECT 1 AS current_timestamp)
SELECT typeof((SELECT current_timestamp)) FROM t1
-- !query analysis
[Analyzer test output redacted due to nondeterminism]


-- !query
WITH t1 AS (SELECT 1 AS user)
SELECT (SELECT user) = current_user() AS function_won FROM t1
-- !query analysis
[Analyzer test output redacted due to nondeterminism]


-- !query
WITH t1 AS (SELECT 1 AS session_user)
SELECT (SELECT session_user) = current_user() AS function_won FROM t1
-- !query analysis
[Analyzer test output redacted due to nondeterminism]


-- !query
SELECT typeof(grouping__id) FROM v_user GROUP BY current_user GROUPING SETS ((current_user))
-- !query analysis
Aggregate [current_user#x, spark_grouping_id#xL], [typeof(spark_grouping_id#xL) AS typeof(grouping_id())#x]
+- Expand [[current_user#x, current_user#x, 0]], [current_user#x, current_user#x, spark_grouping_id#xL]
+- Project [current_user#x, current_user#x AS current_user#x]
+- SubqueryAlias v_user
+- View (`v_user`, [current_user#x])
+- Project [cast(current_user#x as string) AS current_user#x]
+- Project [admin.admin AS current_user#x]
+- OneRowRelation


-- !query
DROP VIEW v_user
-- !query analysis
DropTempViewCommand v_user, false


-- !query
DROP VIEW v_time
-- !query analysis
DropTempViewCommand v_time, false
Loading