Skip to content

feat(tables): background import for large CSVs with live progress#4861

Open
TheodoreSpeaks wants to merge 5 commits into
stagingfrom
feat/1-million-table
Open

feat(tables): background import for large CSVs with live progress#4861
TheodoreSpeaks wants to merge 5 commits into
stagingfrom
feat/1-million-table

Conversation

@TheodoreSpeaks
Copy link
Copy Markdown
Collaborator

Summary

  • Add async background CSV/TSV import for large files (~1M rows): client uploads direct-to-storage, a detached worker streams the file, infers/maps the schema, and bulk-inserts in committed batches — no request/ALB timeout.
  • Replace per-row user_table_rows count triggers with statement-level triggers (transition tables) so bulk insert/delete no longer serialize per row (migration 0222).
  • Fix missing final boundary on CSV upload by streaming multipart with busboy instead of request.formData().
  • Surface import progress via a reusable ProgressItem emcn component + a header indicator, driven by SSE events; uploading → processing → done/failed stages defined programmatically.
  • Emit import state from the GET table routes so the indicator survives refresh; auto-uniquify the imported table name on collision.
  • Implement copilot materialize_file operation: 'table' + fail-fast guard for unimplemented ops.

Type of Change

  • New feature

Testing

Tested manually: imported 10MB and ~1M-row CSVs from the list and in-table; verified upload→processing→done progress, refresh persistence, and failure handling. bun run lint clean; bun run check:api-validation:strict passes.

Checklist

  • Code follows project style guidelines
  • Self-reviewed my changes
  • Tests added/updated and passing
  • No new warnings introduced
  • I confirm that I have read and agree to the terms outlined in the Contributor License Agreement (CLA)

@vercel
Copy link
Copy Markdown

vercel Bot commented Jun 3, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

1 Skipped Deployment
Project Deployment Actions Updated (UTC)
docs Skipped Skipped Jun 3, 2026 5:17pm

Request Review

@cursor
Copy link
Copy Markdown

cursor Bot commented Jun 3, 2026

PR Summary

Medium Risk
Touches bulk row inserts, new async import worker lifecycle, and a DB trigger migration; failures can leave partial imports or stale importing state until cron runs, though sync/async overlap is guarded with 409.

Overview
Large CSV/TSV imports no longer go through the app server as a full buffered multipart body. Streaming multipart (busboy + readMultipart) powers sync import routes; uploads over ~8MB (or the 10MB proxy cap) use direct-to-storage upload plus new /api/table/import-async and /api/table/[tableId]/import-async kickoffs that run runTableImport detached, stream from storage, bulk-insert per batch, and publish import SSE events.

DB: migration adds import columns on user_table_definitions and replaces per-row row-count triggers with statement-level triggers for bulk load performance. Cron stale-import cleanup marks stuck importing tables failed when updatedAt stops moving.

UI: ImportProgressMenu, import tray store, hydration from list/detail APIs, and table event stream handling for live row refetch + completion toasts. Sync routes reject overlapping imports (409) and validate fileKey workspace prefix on async paths.

Other: shared createCsvParser / parseFileRows, storage streaming downloads, copilot materialize_file operation: 'table', and ProgressItem emcn component.

Reviewed by Cursor Bugbot for commit 6993ae9. Bugbot is set up for automated code reviews on this repo. Configure here.

@TheodoreSpeaks
Copy link
Copy Markdown
Collaborator Author

@greptile review

Comment thread apps/sim/lib/table/service.ts
Comment thread apps/sim/lib/table/import-runner.ts
Comment thread apps/sim/app/api/table/[tableId]/import-async/route.ts
Comment thread apps/sim/lib/table/import-runner.ts
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Jun 3, 2026

Greptile Summary

This PR adds async background CSV/TSV import for large files, routing them direct-to-storage while a detached worker streams, infers schema, and bulk-inserts in committed batches — avoiding request/ALB timeouts. It also replaces per-row row_count triggers with statement-level transition-table triggers, fixes the missing final boundary multipart bug via busboy, surfaces import progress through a new ProgressItem SSE component, and wires the materialize_file copilot tool's table operation.

  • lib/table/import-runner.ts: new detached worker; handles create, append, and replace modes with progress heartbeats and terminal ready/failed SSE events.
  • lib/core/utils/multipart.ts: new busboy-backed streaming parser replacing request.formData(); both sync import routes migrate to it with correct field-before-file ordering enforced on the client.
  • packages/db/migrations/0224_table_import_columns.sql: adds five import-state columns and upgrades row_count maintenance to AFTER-statement triggers using transition tables.

Confidence Score: 4/5

Safe to merge for create and replace async imports; the append async path produces rows with wrong sort positions until the one-line fix is applied.

The async import worker initialises inserted = 0 regardless of mode. For replace this is correct (all rows are deleted first), and for create the table is empty. But for append, existing rows already occupy positions 0..rowCount-1; the worker writes new rows at those same positions, leaving the table sort order broken after every async append import. The rest of the PR — statement-level triggers, multipart streaming rewrite, concurrent-import 409 guard, stale-import janitor heartbeat, and SSE progress surface — all look correct.

apps/sim/lib/table/import-runner.ts — the inserted initialisation for append mode.

Important Files Changed

Filename Overview
apps/sim/lib/table/import-runner.ts New background import worker: append-mode rows always start at position 0, colliding with existing row positions and breaking display order.
apps/sim/app/api/table/import-async/route.ts New route to kick off async import for a fresh table — creates placeholder table, starts detached worker. Logic and auth look correct.
apps/sim/app/api/table/[tableId]/import-async/route.ts New route to kick off async import into an existing table; includes 409 concurrent-import guard and workspace/archive checks.
apps/sim/lib/table/service.ts Adds import lifecycle service functions; updateImportProgress now bumps updatedAt for the stale-janitor heartbeat as documented.
packages/db/migrations/0224_table_import_columns.sql Adds import-state columns and replaces per-row triggers with statement-level transition-table triggers for bulk performance.
apps/sim/lib/core/utils/multipart.ts New busboy-based streaming multipart parser; correctly handles abort, limits, field ordering, and file-stream lifecycle.
apps/sim/app/api/table/import-csv/route.ts Rewrites sync CSV import to stream via busboy parser; cleanup on error is correct and table is deleted if row insertion fails.
apps/sim/app/api/table/[tableId]/import/route.ts Sync append/replace import similarly migrated to busboy streaming; field ordering and stream cleanup handled correctly.
apps/sim/app/api/cron/cleanup-stale-executions/route.ts Adds stale-import janitor: marks importing tables with no recent updatedAt heartbeat as failed using the shared stale threshold.
apps/sim/stores/table/import-tray/store.ts New Zustand store for import progress tray; upsert/dismiss/clearTerminal logic is straightforward and correct.
apps/sim/lib/copilot/tools/handlers/materialize-file.ts Adds operation: 'table' to materialize_file copilot tool using shared parseFileRows; fail-fast guard for unknown operations.
apps/sim/hooks/queries/tables.ts Adds useImportCsvAsync and useImportCsvIntoTableAsync hooks; correctly orders workspaceId before file in FormData for busboy field-before-file requirement.

Sequence Diagram

sequenceDiagram
    participant Client
    participant Storage
    participant KickoffRoute as POST /import-async
    participant Worker as runTableImport (detached)
    participant DB
    participant SSE as SSE stream

    Client->>Storage: PUT file (direct-to-storage upload)
    Storage-->>Client: fileKey
    Client->>KickoffRoute: "POST { workspaceId, fileKey, fileName, mode }"
    KickoffRoute->>DB: markTableImporting(tableId, importId)
    KickoffRoute-->>Client: "200 { tableId, importId }"
    KickoffRoute--)Worker: runDetached("table-import", ...)

    Worker->>DB: getTableById(tableId)
    alt replace mode
        Worker->>DB: deleteAllTableRows(tableId)
    end
    Worker->>Storage: downloadFile(fileKey)
    Storage-->>Worker: Buffer

    loop for each batch (CSV_MAX_BATCH_SIZE rows)
        Worker->>DB: "bulkInsertImportBatch(startPosition=inserted)"
        Worker->>DB: updateImportProgress(rows)
        Worker->>SSE: appendTableEvent(importing, progress)
    end

    alt success
        Worker->>DB: markImportReady(tableId)
        Worker->>SSE: appendTableEvent(ready)
    else failure
        Worker->>DB: markImportFailed(tableId, error)
        Worker->>SSE: appendTableEvent(failed)
    end

    Client->>SSE: "EventSource /api/table/{tableId}/events/stream"
    SSE-->>Client: import progress ticks → ready / failed
Loading

Reviews (3): Last reviewed commit: "Merge remote-tracking branch 'origin/sta..." | Re-trigger Greptile

Comment thread apps/sim/lib/table/service.ts
Comment thread apps/sim/app/api/table/[tableId]/import-async/route.ts
Comment on lines +60 to +65
* for `create`, mapping onto the existing schema for `append`/`replace`), then bulk-inserts
* in committed batches — **no rollback**: committed batches persist even if a later batch
* fails. Progress and the terminal state are surfaced via the table-events SSE stream.
*/
export async function runTableImport(payload: TableImportPayload): Promise<void> {
const { importId, tableId, workspaceId, userId, fileKey, fileName, delimiter, mode } = payload
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Uploaded CSV file is never deleted after import

downloadFile({ key: fileKey, context: 'workspace' }) fetches the file from workspace storage, but there is no deleteFile call in either the success or the catch path. Every async import permanently retains the source CSV/TSV in the workspace storage bucket. Over many imports this becomes a non-trivial storage accumulation with no cleanup mechanism.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Jun 3, 2026

Greptile Summary

This PR adds async background CSV/TSV import for large files (≥ 8 MB): the client uploads directly to storage, two new kickoff routes create a placeholder table (or mark an existing one as importing), and a detached worker streams the file, infers/maps the schema, and bulk-inserts in committed batches with live SSE progress. Synchronous imports are also improved by replacing request.formData() with a streaming busboy parser, and the per-row row_count triggers are replaced with statement-level transition-table triggers (migration 0222) to eliminate per-row lock contention on bulk operations.

  • Async import pipeline: new /api/table/import-async and /api/table/[tableId]/import-async kickoff routes, a runTableImport background worker, import-state columns on user_table_definitions, and a useImportTrayStore + ImportProgressMenu UI indicator driven by SSE events.
  • Streaming multipart parser: readMultipart (busboy) replaces request.formData() in the synchronous import routes, fixing the "missing final boundary" issue and enabling pre-file auth/permission work before the file body is consumed.
  • Statement-level triggers: 0222_stormy_surge.sql drops the per-row BEFORE INSERT / AFTER DELETE triggers and replaces them with AFTER … FOR EACH STATEMENT variants using REFERENCING NEW/OLD TABLE, cutting the lock-per-row cost from bulk inserts to a single UPDATE per statement.

Confidence Score: 3/5

Two correctness issues need attention before shipping: a potential silent data loss in replace-mode async imports, and a liveness-check gap that could cause legitimate long-running imports to be terminated by the cron cleaner.

In replace-mode async imports, deleteAllTableRows executes before downloadFile in the background worker. A storage error after the delete leaves the table permanently empty with no way to recover the original rows. Separately, updateImportProgress does not advance updatedAt, while the stale-import cron check uses updatedAt as its liveness signal and its inline comment states the column is bumped on each progress tick. For 1 M-row files the import finishes in minutes and the mismatch is unlikely to trigger; for very large files or slow storage it could cause a legitimate import to be terminated mid-run.

apps/sim/lib/table/import-runner.ts (replace-mode row deletion order) and apps/sim/lib/table/service.ts (updateImportProgress missing updatedAt)

Important Files Changed

Filename Overview
apps/sim/lib/table/import-runner.ts New background CSV import worker. Has a data-loss risk in replace mode: existing rows are deleted before the file download, so a download failure leaves the table empty.
apps/sim/lib/table/service.ts Adds import-state management functions. updateImportProgress omits updatedAt, contradicting the cron cleanup comment that uses updatedAt as the liveness signal.
apps/sim/app/api/table/import-async/route.ts New async kickoff route for large CSVs into a new table. Auth, permission, and name-collision handling look correct.
apps/sim/app/api/table/[tableId]/import-async/route.ts Async kickoff for importing into an existing table. Validates workspace ownership against the table record before proceeding.
packages/db/migrations/0222_stormy_surge.sql Replaces per-row INSERT/DELETE triggers with statement-level transition-table triggers to fix bulk-insert serialization. Logic looks correct; new columns added for async-import state.
apps/sim/lib/core/utils/multipart.ts New streaming multipart parser built on busboy. Error codes and abort-signal handling are thorough; resolves before file body is consumed so auth can run first.
apps/sim/app/api/cron/cleanup-stale-executions/route.ts Adds stale-import detection to the cron cleanup job. The comment claims updatedAt is bumped by progress ticks, but updateImportProgress does not update that column.
apps/sim/app/api/table/import-csv/route.ts Rewrites synchronous CSV import to use streaming busboy instead of request.formData(); properly destroys the stream in a finally block.
apps/sim/hooks/queries/tables.ts Adds two async-import mutation hooks; correctly orders FormData fields (text before file) for the streaming parser.
apps/sim/stores/table/import-tray/store.ts New Zustand store for tracking in-progress CSV imports in the header tray; clean merge semantics and correct terminal-entry handling.

Sequence Diagram

sequenceDiagram
    participant Client
    participant Storage
    participant KickoffRoute as POST /import-async
    participant BG as runTableImport (detached)
    participant DB

    Client->>Storage: upload file (direct-to-storage)
    Storage-->>Client: fileKey
    Client->>KickoffRoute: "POST {workspaceId, fileKey, fileName}"
    KickoffRoute->>DB: createTable / markTableImporting
    KickoffRoute->>BG: runDetached(runTableImport)
    KickoffRoute-->>Client: "{tableId, importId}"

    BG->>DB: deleteAllTableRows (replace mode only)
    BG->>Storage: downloadFile(fileKey)
    Storage-->>BG: buffer
    BG->>DB: "appendTableEvent(importing, progress=0, total)"
    loop each batch (1000 rows)
        BG->>DB: bulkInsertImportBatch
        BG->>DB: "appendTableEvent(importing, progress=N)"
    end
    BG->>DB: markImportReady / markImportFailed
    BG->>DB: "appendTableEvent(ready | failed)"
    DB-->>Client: SSE stream to ImportProgressMenu
Loading

Reviews (2): Last reviewed commit: "feat(tables): background import for larg..." | Re-trigger Greptile

Comment thread apps/sim/lib/table/import-runner.ts Outdated
Comment thread apps/sim/lib/table/service.ts Outdated
# Conflicts:
#	apps/sim/app/api/table/[tableId]/import/route.ts
#	apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts
#	apps/sim/app/workspace/[workspaceId]/tables/[tableId]/table.tsx
#	apps/sim/lib/table/events.ts
#	packages/db/migrations/meta/0222_snapshot.json
#	packages/db/migrations/meta/_journal.json
#	scripts/check-api-validation-contracts.ts
@TheodoreSpeaks
Copy link
Copy Markdown
Collaborator Author

Addressed Bugbot review + synced with staging:

  • Stale-import false positives (High): updateImportProgress now bumps updatedAt, so a live import keeps a heartbeat and the janitor won't fail it mid-run.
  • Overlapping imports: existing-table kickoff returns 409 if the table is already importing.
  • createColumns validation: worker rejects column names not present in the CSV headers (matches the sync route).
  • Empty CSV: marks the import failed ("CSV file has no data rows") instead of reporting a successful empty import.
  • Merged origin/staging and resolved conflicts (kept both the new import and usageLimitReached SSE handlers).
  • Migration regenerated via drizzle as 0224_table_import_columns (was a hand-written 0222 that collided with staging's 0222); the statement-level row-count triggers are appended after the generated column DDL since drizzle doesn't manage raw triggers.

bun run lint:check, bun run check:api-validation:strict, and tsc --noEmit all clean.

@greptile review

Comment thread apps/sim/app/api/table/[tableId]/import/route.ts
Comment thread apps/sim/app/api/table/[tableId]/import/route.ts
Comment thread apps/sim/lib/table/import-runner.ts Outdated
Comment thread apps/sim/app/api/table/import-async/route.ts
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

There are 3 total unresolved issues (including 1 from previous review).

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit db9cdc8. Configure here.

if (rows.length === 0 || !schema || !headerToColumn) return
const coerced = coerceRowsForTable(rows, schema, headerToColumn)
inserted += await bulkInsertImportBatch(
{ tableId, workspaceId, userId, rows: coerced, startPosition: inserted },
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Append import reuses row positions

High Severity

Background append imports pass startPosition as the number of rows inserted in this run (0, 1000, …), not the next free position after existing table rows. New rows are written at positions that already hold data, so ordering and grid display can be wrong while the synchronous append path uses nextAutoPosition.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit db9cdc8. Configure here.

Comment thread apps/sim/lib/table/import-runner.ts
@TheodoreSpeaks
Copy link
Copy Markdown
Collaborator Author

Now streaming the import instead of buffering the whole file (Bugbot High):

  • Added downloadFileStream (S3 / blob / local) to the storage layer — returns a Readable, no buffering. The worker pipes it through a byte counter into the CSV parser, so a ~1M-row file is never resident in memory.
  • The progress-bar total is now estimated from the object's byte size (cheap headObject) extrapolated by rows-per-byte as it runs, instead of a full newline pre-pass — self-refining and monotonic. No event/schema/UI changes.
  • replace still deletes only after the stream opens; oversized files are bounded by the row-count trigger (no byte cap needed on this path).

lint:check / tsc / check:api-validation:strict clean.

@greptile review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant