Skip to main content
Beta — introduced in Geneva 0.11.0 Standard UDFs produce exactly one output value per input row. Scalar UDTFs enable 1:N row expansion — each source row can produce multiple output rows. The results are stored as a materialized view with MV-style incremental refresh.
Source TableDerived TableExpansion
1 video row→ N clip rowsVideo segmentation
1 document row→ N chunk rowsText chunking
1 image row→ N tile rowsImage tiling

Defining a Scalar UDTF

Use the @scalar_udtf decorator on a function that yields output rows. Geneva infers the output schema from the return type annotation.
from geneva import scalar_udtf
from typing import Iterator, NamedTuple

class Clip(NamedTuple):
    clip_start: float
    clip_end: float
    clip_bytes: bytes

@scalar_udtf
def extract_clips(video_path: str, duration: float) -> Iterator[Clip]:
    """Yields multiple clips per video."""
    clip_length = 10.0
    for start in range(0, int(duration), int(clip_length)):
        end = min(start + clip_length, duration)
        clip_data = extract_video_segment(video_path, start, end)
        yield Clip(clip_start=start, clip_end=end, clip_bytes=clip_data)
Input parameters are bound to source columns by name — the parameter video_path binds to source column video_path, just like standard UDFs.
A scalar UDTF can yield zero rows for a source row. The source row is still marked as processed and will not be retried on the next refresh.

List return pattern

If you prefer to build the full list in memory rather than yielding, you can return a list instead of an Iterator:
@scalar_udtf
def extract_clips(video_path: str, duration: float) -> list[Clip]:
    clips = []
    for start in range(0, int(duration), 10):
        end = min(start + 10, duration)
        clips.append(Clip(clip_start=start, clip_end=end, clip_bytes=b"..."))
    return clips

Batched scalar UDTF

For vectorized processing, use batch=True. The function receives Arrow arrays and returns a RecordBatch of expanded rows:
@scalar_udtf(batch=True)
def extract_clips(batch: pa.RecordBatch) -> pa.RecordBatch:
    """Process rows in batches. Same 1:N semantic per row."""
    ...

Creating a Scalar UDTF View

Scalar UDTFs use the existing create_materialized_view API with a udtf= parameter:
import geneva

db = geneva.connect("/data/mydb")
videos = db.open_table("videos")

# Create the 1:N materialized view
clips = db.create_materialized_view(
    "clips",
    query=videos.search(None).select(["video_path", "metadata"]),
    udtf=extract_clips,
)

# Populate — runs the UDTF on every source row
clips.refresh()
The query parameter controls which source columns are inherited. Columns listed in .select() are carried into every child row automatically.

Inheriting source columns

# Only video_path and metadata are inherited into the clips table
clips = db.create_materialized_view(
    "clips",
    query=videos.search(None).select(["video_path", "metadata"]),
    udtf=extract_clips,
)

Inherited Columns

Child rows automatically include the parent’s columns — no manual join required. The columns available in the child table are determined by the query’s .select():

videos table (source)

video_pathdurationmetadata
/v/a.mp4120.0{fps: 30}
/v/b.mp460.0{fps: 24}

clips table (derived, 1:N)

video_pathmetadataclip_startclip_endclip_bytes
/v/a.mp4{fps: 30}0.010.0b”\x00\x1a…”
/v/a.mp4{fps: 30}10.020.0b”\x00\x2b…”
/v/a.mp4{fps: 30}20.030.0b”\x00\x3c…”
/v/b.mp4{fps: 24}0.010.0b”\x00\x4d…”
/v/b.mp4{fps: 24}10.020.0b”\x00\x5e…”
The first three rows come from the /v/a.mp4 source row, the last two from /v/b.mp4. Inherited columns (video_path, metadata) are carried over automatically; clip_start, clip_end, and clip_bytes are generated by the UDTF.

Adding Computed Columns After Creation

Since scalar UDTF views are materialized views, you can add UDF-computed columns to the child table and backfill them:
@udf(data_type=pa.list_(pa.float32(), 512))
def clip_embedding(clip_bytes: bytes) -> list[float]:
    return embed_model.encode(clip_bytes)

# Add an embedding column to the clips table
clips.add_columns({"embedding": clip_embedding})

# Backfill computes embeddings for all existing clips
clips.refresh()
This is a powerful pattern: expand source rows with a scalar UDTF, then enrich the expanded rows with standard UDFs.

Incremental Refresh

Scalar UDTFs support incremental refresh, just like standard materialized views:
  • New source rows: The UDTF runs on new rows, inserting child rows.
  • Deleted source rows: Child rows linked to the deleted parent are cascade-deleted.
  • Updated source rows: Old children are deleted, UDTF re-runs, new children inserted.
# Add new videos to the source table
videos.add(new_video_data)

# Incremental refresh — only processes the new videos
clips.refresh()
Only the new source rows are processed. Existing clips from previous refreshes are untouched.

Chaining UDTF Views

Scalar UDTF views are standard materialized views, so they can serve as the source for further views:
# videos → clips (1:N)
clips = db.create_materialized_view(
    "clips", query=videos.search(None), udtf=extract_clips
)

# clips → frames (1:N)
frames = db.create_materialized_view(
    "frames", query=clips.search(None), udtf=extract_frames
)

Full Example: Document Chunking

from geneva import connect, scalar_udtf, udf
from typing import Iterator, NamedTuple
import pyarrow as pa

class Chunk(NamedTuple):
    chunk_index: int
    chunk_text: str

@scalar_udtf
def chunk_document(text: str) -> Iterator[Chunk]:
    """Split a document into overlapping chunks."""
    words = text.split()
    chunk_size = 500
    overlap = 50
    for i, start in enumerate(range(0, len(words), chunk_size - overlap)):
        chunk_words = words[start:start + chunk_size]
        yield Chunk(chunk_index=i, chunk_text=" ".join(chunk_words))

db = connect("/data/mydb")
docs = db.open_table("documents")

# Create chunked view — inherits doc_id, title, etc. from source
chunks = db.create_materialized_view(
    "doc_chunks",
    query=docs.search(None).select(["doc_id", "title", "text"]),
    udtf=chunk_document,
)
chunks.refresh()

# Add embeddings to chunks for semantic search
@udf(data_type=pa.list_(pa.float32(), 1536))
def embed_text(chunk_text: str) -> list[float]:
    return embedding_model.encode(chunk_text)

chunks.add_columns({"embedding": embed_text})
chunks.refresh()  # Backfills embeddings on all existing chunks

# Query — parent columns available alongside chunk columns
chunks.search(None).select(["doc_id", "title", "chunk_text", "embedding"]).to_pandas()
For a comparison of all three function types (UDFs, Scalar UDTFs, Batch UDTFs), see Understanding Transforms. Reference: