Skip to content
184 changes: 94 additions & 90 deletions apps/sim/lib/webhooks/polling/google-sheets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import { processPolledWebhookEvent } from '@/lib/webhooks/processor'

const MAX_ROWS_PER_POLL = 100

/** Maximum number of leading rows to scan when auto-detecting the header row. */
const HEADER_SCAN_ROWS = 10

type ValueRenderOption = 'FORMATTED_VALUE' | 'UNFORMATTED_VALUE' | 'FORMULA'
type DateTimeRenderOption = 'SERIAL_NUMBER' | 'FORMATTED_STRING'

Expand All @@ -20,7 +23,8 @@ interface GoogleSheetsWebhookConfig {
manualSheetName?: string
valueRenderOption?: ValueRenderOption
dateTimeRenderOption?: DateTimeRenderOption
lastKnownRowCount?: number
/** 1-indexed row number of the last row seeded or processed. */
lastIndexChecked?: number
lastModifiedTime?: string
lastCheckedTimestamp?: string
maxRowsPerPoll?: number
Expand Down Expand Up @@ -63,7 +67,6 @@ export const googleSheetsPollingHandler: PollingProviderHandler = {
return 'failure'
}

// Pre-check: use Drive API to see if the file was modified since last poll
const { unchanged: skipPoll, currentModifiedTime } = await isDriveFileUnchanged(
accessToken,
spreadsheetId,
Expand All @@ -83,44 +86,51 @@ export const googleSheetsPollingHandler: PollingProviderHandler = {
return 'success'
}

// Fetch current row count via column A
const currentRowCount = await getDataRowCount(
const valueRender = config.valueRenderOption || 'FORMATTED_VALUE'
const dateTimeRender = config.dateTimeRenderOption || 'SERIAL_NUMBER'

const {
rowCount: currentRowCount,
headers,
headerRowIndex,
} = await fetchSheetState(
accessToken,
spreadsheetId,
sheetName,
valueRender,
dateTimeRender,
requestId,
logger
)

// First poll: seed state, emit nothing
if (config.lastKnownRowCount === undefined) {
if (config.lastIndexChecked === undefined) {
await updateWebhookProviderConfig(
webhookId,
{
lastKnownRowCount: currentRowCount,
lastIndexChecked: currentRowCount,
lastModifiedTime: currentModifiedTime ?? config.lastModifiedTime,
lastCheckedTimestamp: now.toISOString(),
},
logger
)
await markWebhookSuccess(webhookId, logger)
logger.info(
`[${requestId}] First poll for webhook ${webhookId}, seeded row count: ${currentRowCount}`
`[${requestId}] First poll for webhook ${webhookId}, seeded row index: ${currentRowCount}`
)
return 'success'
}

// Rows deleted or unchanged
if (currentRowCount <= config.lastKnownRowCount) {
if (currentRowCount < config.lastKnownRowCount) {
if (currentRowCount <= config.lastIndexChecked) {
if (currentRowCount < config.lastIndexChecked) {
logger.warn(
`[${requestId}] Row count decreased from ${config.lastKnownRowCount} to ${currentRowCount} for webhook ${webhookId}`
`[${requestId}] Row count decreased from ${config.lastIndexChecked} to ${currentRowCount} for webhook ${webhookId}`
)
}
await updateWebhookProviderConfig(
webhookId,
{
lastKnownRowCount: currentRowCount,
lastIndexChecked: currentRowCount,
lastModifiedTime: currentModifiedTime ?? config.lastModifiedTime,
lastCheckedTimestamp: now.toISOString(),
},
Expand All @@ -131,38 +141,47 @@ export const googleSheetsPollingHandler: PollingProviderHandler = {
return 'success'
}

// New rows detected
const newRowCount = currentRowCount - config.lastKnownRowCount
const newRowCount = currentRowCount - config.lastIndexChecked
const maxRows = config.maxRowsPerPoll || MAX_ROWS_PER_POLL
const rowsToFetch = Math.min(newRowCount, maxRows)
const startRow = config.lastKnownRowCount + 1
const endRow = config.lastKnownRowCount + rowsToFetch
const startRow = config.lastIndexChecked + 1
const endRow = config.lastIndexChecked + rowsToFetch

// Skip past the header row (and any blank rows above it) so it is never
// emitted as a data event.
const adjustedStartRow =
headerRowIndex > 0 ? Math.max(startRow, headerRowIndex + 1) : startRow

logger.info(
`[${requestId}] Found ${newRowCount} new rows for webhook ${webhookId}, processing rows ${startRow}-${endRow}`
`[${requestId}] Found ${newRowCount} new rows for webhook ${webhookId}, processing rows ${adjustedStartRow}-${endRow}`
)

// Resolve render options
const valueRender = config.valueRenderOption || 'FORMATTED_VALUE'
const dateTimeRender = config.dateTimeRenderOption || 'SERIAL_NUMBER'

const headers = await fetchHeaderRow(
accessToken,
spreadsheetId,
sheetName,
valueRender,
dateTimeRender,
requestId,
logger
)
// Entire batch is header/blank rows — advance pointer and skip fetch.
if (adjustedStartRow > endRow) {
const hasRemainingRows = rowsToFetch < newRowCount
await updateWebhookProviderConfig(
webhookId,
{
lastIndexChecked: config.lastIndexChecked + rowsToFetch,
lastModifiedTime: hasRemainingRows
? config.lastModifiedTime
: (currentModifiedTime ?? config.lastModifiedTime),
lastCheckedTimestamp: now.toISOString(),
},
logger
)
await markWebhookSuccess(webhookId, logger)
logger.info(
`[${requestId}] Batch ${startRow}-${endRow} contained only header/blank rows for webhook ${webhookId}, advancing pointer`
)
return 'success'
}

// Fetch new rows — startRow/endRow are already 1-indexed sheet row numbers
// because lastKnownRowCount includes the header row
const newRows = await fetchRowRange(
accessToken,
spreadsheetId,
sheetName,
startRow,
adjustedStartRow,
endRow,
valueRender,
dateTimeRender,
Expand All @@ -173,23 +192,22 @@ export const googleSheetsPollingHandler: PollingProviderHandler = {
const { processedCount, failedCount } = await processRows(
newRows,
headers,
startRow,
adjustedStartRow,
spreadsheetId,
sheetName,
config,
webhookData,
workflowData,
requestId,
logger
)

const rowsAdvanced = failedCount > 0 ? 0 : rowsToFetch
const newLastKnownRowCount = config.lastKnownRowCount + rowsAdvanced
const newLastIndexChecked = config.lastIndexChecked + rowsAdvanced
const hasRemainingOrFailed = rowsAdvanced < newRowCount
await updateWebhookProviderConfig(
webhookId,
{
lastKnownRowCount: newLastKnownRowCount,
lastIndexChecked: newLastIndexChecked,
lastModifiedTime: hasRemainingOrFailed
? config.lastModifiedTime
: (currentModifiedTime ?? config.lastModifiedTime),
Expand Down Expand Up @@ -256,20 +274,32 @@ async function getDriveFileModifiedTime(
}
}

async function getDataRowCount(
/**
* Fetches the sheet (A:Z) and returns the row count, auto-detected headers,
* and the 1-indexed header row number in a single API call.
*
* The Sheets API omits trailing empty rows, so `rows.length` equals the last
* non-empty row in columns A–Z. Header detection scans the first
* {@link HEADER_SCAN_ROWS} rows for the first non-empty row. Returns
* `headerRowIndex = 0` when no header is found within the scan window.
*/
async function fetchSheetState(
accessToken: string,
spreadsheetId: string,
sheetName: string,
valueRenderOption: ValueRenderOption,
dateTimeRenderOption: DateTimeRenderOption,
requestId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
): Promise<number> {
): Promise<{ rowCount: number; headers: string[]; headerRowIndex: number }> {
const encodedSheet = encodeURIComponent(sheetName)
// Fetch all rows across columns A–Z with majorDimension=ROWS so the API
// returns one entry per row that has ANY non-empty cell. Rows where column A
// is empty but other columns have data are included, whereas the previous
// column-A-only approach silently missed them. The returned array length
// equals the 1-indexed row number of the last row with data.
const url = `https://sheets.googleapis.com/v4/spreadsheets/${spreadsheetId}/values/${encodedSheet}!A:Z?majorDimension=ROWS&fields=values`
const params = new URLSearchParams({
majorDimension: 'ROWS',
fields: 'values',
valueRenderOption,
dateTimeRenderOption,
})
const url = `https://sheets.googleapis.com/v4/spreadsheets/${spreadsheetId}/values/${encodedSheet}!A:Z?${params.toString()}`

const response = await fetch(url, {
headers: { Authorization: `Bearer ${accessToken}` },
Expand All @@ -278,61 +308,32 @@ async function getDataRowCount(
if (!response.ok) {
const status = response.status
const errorData = await response.json().catch(() => ({}))

if (status === 403 || status === 429) {
throw new Error(
`Sheets API rate limit (${status}) — skipping to retry next poll cycle: ${JSON.stringify(errorData)}`
)
}

throw new Error(
`Failed to fetch row count: ${status} ${response.statusText} - ${JSON.stringify(errorData)}`
`Failed to fetch sheet state: ${status} ${response.statusText} - ${JSON.stringify(errorData)}`
)
}

const data = await response.json()
// values is [[row1col1, row1col2, ...], [row2col1, ...], ...] when majorDimension=ROWS.
// The Sheets API omits trailing empty rows, so the array length is the last
// non-empty row index (1-indexed), which is exactly what we need.
const rows = data.values as string[][] | undefined
return rows?.length ?? 0
}
const rows = (data.values as string[][] | undefined) ?? []
const rowCount = rows.length

async function fetchHeaderRow(
accessToken: string,
spreadsheetId: string,
sheetName: string,
valueRenderOption: ValueRenderOption,
dateTimeRenderOption: DateTimeRenderOption,
requestId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
): Promise<string[]> {
const encodedSheet = encodeURIComponent(sheetName)
const params = new URLSearchParams({
fields: 'values',
valueRenderOption,
dateTimeRenderOption,
})
const url = `https://sheets.googleapis.com/v4/spreadsheets/${spreadsheetId}/values/${encodedSheet}!1:1?${params.toString()}`

const response = await fetch(url, {
headers: { Authorization: `Bearer ${accessToken}` },
})

if (!response.ok) {
const status = response.status
if (status === 403 || status === 429) {
const errorData = await response.json().catch(() => ({}))
throw new Error(
`Sheets API rate limit (${status}) fetching header row — skipping to retry next poll cycle: ${JSON.stringify(errorData)}`
)
let headers: string[] = []
let headerRowIndex = 0
for (let i = 0; i < Math.min(rows.length, HEADER_SCAN_ROWS); i++) {
const row = rows[i]
if (row?.some((cell) => cell !== '')) {
headers = row
headerRowIndex = i + 1
break
}
logger.warn(`[${requestId}] Failed to fetch header row, proceeding without headers`)
return []
}

const data = await response.json()
return (data.values?.[0] as string[]) ?? []
return { rowCount, headers, headerRowIndex }
}

async function fetchRowRange(
Expand Down Expand Up @@ -361,13 +362,11 @@ async function fetchRowRange(
if (!response.ok) {
const status = response.status
const errorData = await response.json().catch(() => ({}))

if (status === 403 || status === 429) {
throw new Error(
`Sheets API rate limit (${status}) — skipping to retry next poll cycle: ${JSON.stringify(errorData)}`
)
}

throw new Error(
`Failed to fetch rows ${startRow}-${endRow}: ${status} ${response.statusText} - ${JSON.stringify(errorData)}`
)
Expand All @@ -383,7 +382,6 @@ async function processRows(
startRowIndex: number,
spreadsheetId: string,
sheetName: string,
config: GoogleSheetsWebhookConfig,
webhookData: PollWebhookContext['webhookData'],
workflowData: PollWebhookContext['workflowData'],
requestId: string,
Expand All @@ -394,7 +392,13 @@ async function processRows(

for (let i = 0; i < rows.length; i++) {
const row = rows[i]
const rowNumber = startRowIndex + i // startRowIndex is already the 1-indexed sheet row
const rowNumber = startRowIndex + i

// Skip empty rows — don't fire a workflow run with no data.
if (!row || row.length === 0) {
logger.info(`[${requestId}] Skipping empty row ${rowNumber} for webhook ${webhookData.id}`)
continue
}

try {
await pollingIdempotency.executeWithIdempotency(
Expand Down
Loading