Introduced in Geneva 0.13.0
Overview
A common scenario is having column data that already exists in an external dataset — embeddings from a vendor, features exported from a data warehouse, or columnar data in cloud storage — that you want to load into an existing LanceDB table.
load_columns joins value columns from an external source into your table by primary key. It works for both use cases:
- Adding new columns: If the specified columns don’t exist in the destination table, they are created automatically.
- Updating existing columns: If the columns already exist, matched rows are updated with the source values. Unmatched rows are controlled by the
on_missing parameter.
Destination table (before):
External source (Parquet / Lance / IPC):
| pk | embedding |
|---|
| 1 | [.1, .2] |
| 2 | [.3, .4] |
| 3 | [.5, .6] |
Destination table (after load_columns join on pk):
| pk | col_a | col_b | embedding |
|---|
| 1 | x | 10 | [.1, .2] |
| 2 | y | 20 | [.3, .4] |
| 3 | z | 30 | [.5, .6] |
When to use
- Loading new columns: Attach pre-computed embeddings from a vendor, or add features exported from Spark/BigQuery as Parquet.
- Updating existing columns: Replace outdated embeddings with a newer model’s output, or refresh feature values from an updated export.
- Partial updates: Update a subset of rows (e.g., only rows whose embeddings were recomputed) while preserving all other values via carry semantics.
- Format consolidation: Merge columnar data spread across Parquet files into an existing Lance table.
Basic usage
Supports Parquet, Lance, and IPC sources. The format is auto-detected from the URI suffix, or can be overridden with source_format. You can load one or more columns in a single call.
import lancedb
db = lancedb.connect("my_db")
table = db.open_table("my_table")
# Add a new embedding column from a Parquet source
table.load_columns(
source="s3://bucket/embeddings/",
pk="document_id",
columns=["embedding"],
)
# Later, update the same column with refreshed embeddings
table.load_columns(
source="s3://bucket/embeddings_v2/",
pk="document_id",
columns=["embedding"],
)
For non-blocking execution, use load_columns_async which returns a JobFuture — call .result() to block until completion.
Handling missing keys
When the source doesn’t cover every row in the destination, the on_missing parameter controls what happens to unmatched rows:
| Mode | Behavior |
|---|
"carry" (default) | Keep existing value. NULL if the column is new. |
"null" | Explicitly set to NULL. |
"error" | Raise an error on the first unmatched row. |
# Default: unmatched rows keep their current value
table.load_columns(
source="s3://bucket/partial_embeddings/",
pk="document_id",
columns=["embedding"],
on_missing="carry",
)
# Strict mode: fail if source doesn't cover all rows
table.load_columns(
source="s3://bucket/embeddings/",
pk="document_id",
columns=["embedding"],
on_missing="error",
)
The carry mode is particularly important for partial and multi-pass loads — it ensures that previously loaded values are never overwritten.
Concurrency
The concurrency parameter controls the number of worker processes. The default is 8 — set this to match your available cluster resources.
table.load_columns(
source="s3://bucket/embeddings/",
pk="document_id",
columns=["embedding"],
concurrency=16,
)
Checkpointing
Bulk load jobs checkpoint each batch for fault tolerance, using the same infrastructure as backfill jobs. Key parameters:
checkpoint_interval_seconds: Target seconds per checkpoint batch (default 60s). The adaptive sizer grows or shrinks batch sizes to hit this target.
min_checkpoint_size / max_checkpoint_size: Bounds for adaptive sizing.
If your job is small enough to complete without needing fault tolerance, you can get better performance by effectively disabling checkpoints. Increase checkpoint_interval_seconds to a large value and set min_checkpoint_size high enough that each worker processes its entire workload in a single batch.
Commit visibility
For long-running jobs, commit_granularity controls how many fragments complete before an intermediate commit makes partial results visible to readers.
table.load_columns(
source="s3://bucket/embeddings/",
pk="document_id",
columns=["embedding"],
commit_granularity=10,
)
Multi-pass loads for large sources
load_columns builds an in-memory primary-key index from the source. If the source is too large to fit in memory, split it into chunks and run sequential calls. Carry semantics guarantee correctness across passes.
Index memory depends on primary key type:
| PK type | ~Memory per row | 100M rows | 1B rows |
|---|
| int64 | ~8 bytes | ~800 MB | ~8 GB |
| string (avg 32 bytes) | ~32 bytes | ~3.2 GB | ~32 GB |
Choose N so that source_size / N fits in memory:
import pyarrow.dataset as pads
# Discover source files (metadata only, no data I/O)
source_files = pads.dataset("s3://bucket/embeddings/", format="parquet").files
# Split into N chunks and run sequentially
N = 4
total = len(source_files)
for i in range(N):
chunk = source_files[i * total // N : (i + 1) * total // N]
table.load_columns(
source=chunk,
pk="document_id",
columns=["embedding"],
)
Each pass reads only its assigned files, so total source I/O stays at 1x. If the source is already partitioned into subdirectories, pass each URI directly:
for shard in range(4):
table.load_columns(
source=f"s3://bucket/embeddings/shard_{shard}/",
pk="document_id",
columns=["embedding"],
)
Multi-pass loads must run sequentially, not concurrently. Two load_columns calls running at the same time against the same column produce an interleaved end state. Use a plain for loop, not concurrent.futures.
Reference