Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7a15e95
admin-adjustable max columns with a clear overflow message
Ma77Ball May 2, 2026
36e3657
ran lint for pr
Ma77Ball May 2, 2026
e75bd49
Merge branch 'main' into fix/adjustable_univocity
Ma77Ball May 2, 2026
325a63f
Merge branch 'main' into fix/adjustable_univocity
chenlica May 2, 2026
29b35f7
Merge branch 'apache:main' into fix/adjustable_univocity
Ma77Ball May 2, 2026
09e20af
fixed frontend error for ci test cases
Ma77Ball May 2, 2026
a426b35
added nztable module and working on fixing ci failures
Ma77Ball May 2, 2026
7fb01ba
Merge branch 'main' into fix/adjustable_univocity
chenlica May 2, 2026
f5dc500
Merge branch 'main' into fix/adjustable_univocity
chenlica May 3, 2026
386e1a3
removed the columns per result panel setting
Ma77Ball May 3, 2026
b3cbf54
removed result table columns from default config file)
Ma77Ball May 3, 2026
bfc87e3
Merge branch 'main' into fix/adjustable_univocity
chenlica May 3, 2026
740eab8
Merge branch 'main' of github.com:apache/texera into fix/adjustable_u…
Ma77Ball May 7, 2026
869a85b
Merge branch 'fix/adjustable_univocity' of github.com:Ma77Ball/texera…
Ma77Ball May 7, 2026
2d20ad7
Removed old config for column limit on result panel
Ma77Ball May 7, 2026
c77b7c6
removed unused csv block from default.conf
Ma77Ball May 7, 2026
1e658eb
changing forall to exists to cover column overflows
Ma77Ball May 7, 2026
7f1c6e5
Added test case for the parseOrDefault
Ma77Ball May 7, 2026
423ac90
Merge branch 'main' into fix/adjustable_univocity
aglinxinyuan May 7, 2026
cbc534e
added line back in default.conf
Ma77Ball May 7, 2026
dba71ed
Merge remote-tracking branch 'origin/fix/adjustable_univocity' into f…
Ma77Ball May 7, 2026
083a091
ArrayIndexOutOfBoundsException added test
Ma77Ball May 7, 2026
cb2aa5c
ran lint with yarn
Ma77Ball May 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions common/dao/src/main/scala/org/apache/texera/dao/SiteSettings.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.texera.dao

import org.jooq.impl.DSL

import scala.util.Try

/**
* Read-side accessor for the `site_settings` key/value table that admin pages
* write through. Centralises the "look up by key, parse, fall back on any
* failure" pattern that previously lived inline in ConfigResource,
* CSVScanSourceOpExec, and DatasetResource.
*
* Failures swallowed by the outer Try include: SqlServer not initialised
* (e.g. on workers in distributed mode), no row for the key, and value that
* can't be parsed. In all of these cases the caller's default takes over.
Comment thread
Ma77Ball marked this conversation as resolved.
*/
object SiteSettings {

def getInt(key: String, default: => Int): Int =
readAndParse(key, default)(_.toInt)

def getLong(key: String, default: => Long): Long =
readAndParse(key, default)(_.toLong)

private[dao] def parseOrDefault[T](raw: Option[String], default: T)(parse: String => T): T =
raw.flatMap(s => Try(parse(s.trim)).toOption).getOrElse(default)

private def readAndParse[T](key: String, default: => T)(parse: String => T): T =
Try {
val raw = SqlServer
.getInstance()
.createDSLContext()
.select(DSL.field("value", classOf[String]))
.from(DSL.table(DSL.name("texera_db", "site_settings")))
.where(DSL.field("key", classOf[String]).eq(key))
.fetchOneInto(classOf[String])
parseOrDefault(Option(raw), default)(parse)
}.getOrElse(default)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.texera.dao

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class SiteSettingsSpec extends AnyFlatSpec with Matchers {

"parseOrDefault" should "return the parsed value when the raw string is present and valid" in {
SiteSettings.parseOrDefault(Some("42"), 0)(_.toInt) shouldBe 42
}

it should "return the default when the Option is None" in {
SiteSettings.parseOrDefault(None, 99)(_.toInt) shouldBe 99
}

it should "return the default when the string cannot be parsed" in {
SiteSettings.parseOrDefault(Some("not-a-number"), 7)(_.toInt) shouldBe 7
}

it should "trim whitespace before parsing" in {
SiteSettings.parseOrDefault(Some(" 100 "), 0)(_.toInt) shouldBe 100
}

it should "work for Long values" in {
SiteSettings.parseOrDefault(Some("9999999999"), 0L)(_.toLong) shouldBe 9999999999L
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.texera.amber.core.tuple.{AttributeType, Schema}
import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity}
import org.apache.texera.amber.core.workflow.{PhysicalOp, SchemaPropagationFunc}
import org.apache.texera.amber.operator.source.scan.ScanSourceOpDesc
import org.apache.texera.amber.operator.source.scan.csv.CSVScanSourceOpExec
import org.apache.texera.amber.util.JSONUtils.objectMapper

import java.io.{IOException, InputStreamReader}
Expand Down Expand Up @@ -89,6 +90,8 @@ class CSVScanSourceOpDesc extends ScanSourceOpDesc {
csvFormat.setLineSeparator("\n")
val csvSetting = new CsvParserSettings()
csvSetting.setMaxCharsPerColumn(-1)
val maxColumns = CSVScanSourceOpExec.getMaxColumns
csvSetting.setMaxColumns(maxColumns)
csvSetting.setFormat(csvFormat)
csvSetting.setHeaderExtractionEnabled(hasHeader)
csvSetting.setNullValue("")
Expand All @@ -97,8 +100,8 @@ class CSVScanSourceOpDesc extends ScanSourceOpDesc {

var data: Array[Array[String]] = Array()
val readLimit = limit.getOrElse(INFER_READ_LIMIT).min(INFER_READ_LIMIT)
for (i <- 0 until readLimit) {
val row = parser.parseNext()
for (_ <- 0 until readLimit) {
val row = CSVScanSourceOpExec.parseNextRow(parser, maxColumns)
if (row != null) {
data = data :+ row
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,26 @@

package org.apache.texera.amber.operator.source.scan.csv

import com.univocity.parsers.common.TextParsingException
import com.univocity.parsers.csv.{CsvFormat, CsvParser, CsvParserSettings}
import org.apache.texera.amber.core.executor.SourceOperatorExecutor
import org.apache.texera.amber.core.storage.DocumentFactory
import org.apache.texera.amber.core.tuple.{AttributeTypeUtils, Schema, TupleLike}
import org.apache.texera.amber.util.JSONUtils.objectMapper
import org.apache.texera.dao.SiteSettings

import java.io.InputStreamReader
import java.net.URI
import scala.collection.immutable.ArraySeq
import scala.util.Try

class CSVScanSourceOpExec private[csv] (descString: String) extends SourceOperatorExecutor {
val desc: CSVScanSourceOpDesc = objectMapper.readValue(descString, classOf[CSVScanSourceOpDesc])
var inputReader: InputStreamReader = _
var parser: CsvParser = _
var nextRow: Array[String] = _
var numRowGenerated = 0
private var maxColumns: Int = CSVScanSourceOpExec.DEFAULT_MAX_COLUMNS
private val schema: Schema = desc.sourceSchema()

override def produceTuple(): Iterator[TupleLike] = {
Expand All @@ -44,7 +48,7 @@ class CSVScanSourceOpExec private[csv] (descString: String) extends SourceOperat
if (nextRow != null) {
return true
}
nextRow = parser.parseNext()
nextRow = CSVScanSourceOpExec.parseNextRow(parser, maxColumns)
nextRow != null
}

Expand Down Expand Up @@ -90,6 +94,8 @@ class CSVScanSourceOpExec private[csv] (descString: String) extends SourceOperat
) // disable skipping lines starting with # (default comment character)
val csvSetting = new CsvParserSettings()
csvSetting.setMaxCharsPerColumn(-1)
maxColumns = CSVScanSourceOpExec.getMaxColumns
csvSetting.setMaxColumns(maxColumns)
csvSetting.setFormat(csvFormat)
csvSetting.setHeaderExtractionEnabled(desc.hasHeader)

Expand All @@ -106,3 +112,41 @@ class CSVScanSourceOpExec private[csv] (descString: String) extends SourceOperat
}
}
}

object CSVScanSourceOpExec {
val DEFAULT_MAX_COLUMNS = 512

def getMaxColumns: Int =
SiteSettings.getInt("csv_parser_max_columns", DEFAULT_MAX_COLUMNS)

/**
* Wraps `parser.parseNext()` so a column-count overflow is reported to the user
* as a clear instruction rather than a deep Univocity stack trace. Other parser
* failures are rethrown unchanged.
*
* The thrown RuntimeException's message bubbles up through DataProcessor.handleExecutorException
* and becomes the title of the console message that drives the top-of-page toast.
*/
def parseNextRow(parser: CsvParser, maxColumns: Int): Array[String] = {
try parser.parseNext()
catch {
case e: TextParsingException if isColumnOverflow(e, maxColumns) =>
throw new RuntimeException(columnOverflowMessage(maxColumns), e)
}
}

private[csv] def isColumnOverflow(e: TextParsingException, maxColumns: Int): Boolean =
Option(e.getCause)
.collect { case aioobe: ArrayIndexOutOfBoundsException => aioobe }
.exists(aioobe => aioobeIndex(aioobe).exists(_ == maxColumns))

private def aioobeIndex(aioobe: ArrayIndexOutOfBoundsException): Option[Int] = {
val msg = Option(aioobe.getMessage).getOrElse("")
Try(msg.trim.toInt).toOption.orElse {
raw"Index (\d+) out of bounds".r.findFirstMatchIn(msg).map(_.group(1).toInt)
}
}

private[csv] def columnOverflowMessage(maxColumns: Int): String =
s"Max columns of $maxColumns exceeded."
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.amber.operator.source.scan.csv

import com.univocity.parsers.common.TextParsingException
import com.univocity.parsers.csv.{CsvParser, CsvParserSettings}
import org.scalatest.flatspec.AnyFlatSpec

import java.io.StringReader

/**
* Verifies the column-overflow translation in [[CSVScanSourceOpExec.parseNextRow]]
* — the path that turns a deep Univocity stack trace into a single-sentence message
* the workflow user can act on.
*/
class CSVScanSourceOpExecSpec extends AnyFlatSpec {

private def parserWithMaxColumns(max: Int): CsvParser = {
val settings = new CsvParserSettings()
settings.setMaxColumns(max)
settings.setMaxCharsPerColumn(-1)
new CsvParser(settings)
}

"parseNextRow" should "return the parsed row when the input is within the column limit" in {
val parser = parserWithMaxColumns(10)
parser.beginParsing(new StringReader("a,b,c\n"))

val row = CSVScanSourceOpExec.parseNextRow(parser, 10)

assert(row.toSeq == Seq("a", "b", "c"))
}

it should "return null at end of input (so the iterator can terminate cleanly)" in {
val parser = parserWithMaxColumns(10)
parser.beginParsing(new StringReader(""))

assert(CSVScanSourceOpExec.parseNextRow(parser, 10) == null)
}

it should "translate a column-overflow TextParsingException into a clear user message" in {
val maxColumns = 2
val parser = parserWithMaxColumns(maxColumns)
parser.beginParsing(new StringReader("a,b,c,d,e\n"))

val ex = intercept[RuntimeException] {
CSVScanSourceOpExec.parseNextRow(parser, maxColumns)
}

// The message must mention the configured limit so the user knows what was hit.
assert(ex.getMessage.contains(maxColumns.toString))
assert(ex.getMessage.toLowerCase.contains("max columns"))
assert(ex.getMessage.toLowerCase.contains("exceeded"))
// The original Univocity exception is preserved as the cause so developers
// can still inspect the underlying parser state if needed.
assert(ex.getCause.isInstanceOf[TextParsingException])
}

"isColumnOverflow" should "detect AIOOBE causes from Java 8's plain-integer message" in {
val cause = new ArrayIndexOutOfBoundsException("5")
val ex = new TextParsingException(null, "wrapper", cause)
assert(CSVScanSourceOpExec.isColumnOverflow(ex, maxColumns = 5))
assert(!CSVScanSourceOpExec.isColumnOverflow(ex, maxColumns = 6))
}

it should "detect AIOOBE causes from Java 9+'s 'Index N out of bounds for length M' message" in {
val cause = new ArrayIndexOutOfBoundsException("Index 5 out of bounds for length 5")
val ex = new TextParsingException(null, "wrapper", cause)
assert(CSVScanSourceOpExec.isColumnOverflow(ex, maxColumns = 5))
assert(!CSVScanSourceOpExec.isColumnOverflow(ex, maxColumns = 6))
}

it should "ignore TextParsingExceptions whose cause is unrelated" in {
val unrelated = new TextParsingException(null, "Some other parsing problem")
val withDifferentCause =
new TextParsingException(null, "wrapper", new IllegalStateException("nope"))
assert(!CSVScanSourceOpExec.isColumnOverflow(unrelated, maxColumns = 5))
assert(!CSVScanSourceOpExec.isColumnOverflow(withDifferentCause, maxColumns = 5))
Comment thread
Ma77Ball marked this conversation as resolved.
}

it should "ignore an AIOOBE whose message cannot be parsed as an index" in {
val unparseable = new ArrayIndexOutOfBoundsException("something went wrong")
val ex = new TextParsingException(null, "wrapper", unparseable)
assert(!CSVScanSourceOpExec.isColumnOverflow(ex, maxColumns = 5))
}

"columnOverflowMessage" should "include the configured maximum so the user knows the current limit" in {
val msg = CSVScanSourceOpExec.columnOverflowMessage(750)
assert(msg.contains("750"))
assert(msg.toLowerCase.contains("max columns"))
assert(msg.toLowerCase.contains("exceeded"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.texera.amber.core.storage.model.OnDataset
import org.apache.texera.amber.core.storage.util.LakeFSStorageClient
import org.apache.texera.amber.core.storage.{DocumentFactory, FileResolver}
import org.apache.texera.auth.SessionUser
import org.apache.texera.dao.SiteSettings
import org.apache.texera.dao.SqlServer
import org.apache.texera.dao.SqlServer.withTransaction
import org.apache.texera.dao.jooq.generated.enums.PrivilegeEnum
Expand Down Expand Up @@ -87,15 +88,8 @@ object DatasetResource {
.getInstance()
.createDSLContext()

private def singleFileUploadMaxBytes(ctx: DSLContext, defaultMiB: Long = 20L): Long = {
val limit = ctx
.select(DSL.field("value", classOf[String]))
.from(DSL.table(DSL.name("texera_db", "site_settings")))
.where(DSL.field("key", classOf[String]).eq("single_file_upload_max_size_mib"))
.fetchOneInto(classOf[String])
Try(Option(limit).getOrElse(defaultMiB.toString).trim.toLong)
.getOrElse(defaultMiB) * 1024L * 1024L
}
private def singleFileUploadMaxBytes(defaultMiB: Long = 20L): Long =
SiteSettings.getLong("single_file_upload_max_size_mib", defaultMiB) * 1024L * 1024L

/**
* Helper function to get the dataset from DB using did
Expand Down Expand Up @@ -1577,7 +1571,7 @@ class DatasetResource {
if (fileSizeBytesValue <= 0L) throw new BadRequestException("fileSizeBytes must be > 0")
if (partSizeBytesValue <= 0L) throw new BadRequestException("partSizeBytes must be > 0")

val totalMaxBytes: Long = singleFileUploadMaxBytes(ctx)
val totalMaxBytes: Long = singleFileUploadMaxBytes()
if (totalMaxBytes <= 0L) {
throw new WebApplicationException(
"singleFileUploadMaxBytes must be > 0",
Expand Down Expand Up @@ -1969,7 +1963,7 @@ class DatasetResource {
)
}

val maxBytes = singleFileUploadMaxBytes(ctx)
val maxBytes = singleFileUploadMaxBytes()
val tooLarge = actualSizeBytes > maxBytes

if (tooLarge) {
Expand Down
Loading
Loading