Streaming XTDB results into DuckDB via Arrow
When you already have data in DuckDB, ADBC lets you bring XTDB’s bitemporal data alongside it without a copy: both are Arrow-native, so a query result streams from XTDB straight into DuckDB to be joined with what’s already there. The worked scenario: take a bitemporal trade snapshot from XTDB as of Q4 2024 and join it to a counterparty-tier lookup table held in DuckDB.
Prerequisites
Section titled “Prerequisites”A running XTDB node with the FlightSQL listener on port 9832 and the pgwire server on port 5432. The nightly Docker image enables both by default:
docker run --rm -d --name xtdb-duckdb-demo \ -p 5432:5432 -p 9832:9832 ghcr.io/xtdb/xtdb:nightlyInstall the Python dependencies:
pip install adbc-driver-flightsql pyarrow duckdb psycopgpsycopg seeds the trades over pgwire, so no external psql binary is required.
The scenario
Section titled “The scenario”Seven trades are stored in XTDB with explicit valid-time ranges.
One of them, CommerzBank’s T005, expired on 2024-06-30 and should be absent from the end-of-Q4 snapshot even though it was physically ingested.
The question: as of 2024-12-31, what’s the total trade value per counterparty tier?
The counterparty-to-tier mapping lives in DuckDB as a local dimension table (Parquet, CSV, or in-memory; it does not need to be in XTDB).
Step 1: Seed XTDB with bitemporal trades
Section titled “Step 1: Seed XTDB with bitemporal trades”Trades arrive via the normal write path.
Use _valid_from and _valid_to to record when each trade was true: the valid-time axis.
import psycopgfrom textwrap import dedent
SEED_SQL = dedent("""\ INSERT INTO trades (_id, counterparty, notional, currency, _valid_from, _valid_to) VALUES ('T001','Barclays', 1500000.0,'GBP',TIMESTAMP '2024-01-15 00:00:00',TIMESTAMP '9999-12-31 00:00:00'), ('T002','Barclays', 2000000.0,'GBP',TIMESTAMP '2024-03-01 00:00:00',TIMESTAMP '9999-12-31 00:00:00'), ('T003','JPMorgan', 3200000.0,'USD',TIMESTAMP '2024-02-10 00:00:00',TIMESTAMP '9999-12-31 00:00:00'), ('T004','DeutscheBank', 800000.0,'EUR',TIMESTAMP '2024-04-01 00:00:00',TIMESTAMP '9999-12-31 00:00:00'), ('T005','CommerzBank', 1100000.0,'EUR',TIMESTAMP '2024-01-01 00:00:00',TIMESTAMP '2024-06-30 00:00:00'), ('T006','SocGen', 950000.0,'EUR',TIMESTAMP '2024-06-01 00:00:00',TIMESTAMP '9999-12-31 00:00:00'), ('T007','SocGen', 1750000.0,'EUR',TIMESTAMP '2024-09-15 00:00:00',TIMESTAMP '9999-12-31 00:00:00')""")
PG_URI = "postgresql://localhost:5432/xtdb"with psycopg.connect(PG_URI, user="xtdb", autocommit=True) as pg: with pg.cursor() as cur: cur.execute(SEED_SQL)_valid_from/_valid_to in the INSERT body override the default (now → end-of-time) so XTDB records each trade’s actual validity window.
Step 2: Pull the Q4 snapshot via ADBC
Section titled “Step 2: Pull the Q4 snapshot via ADBC”FOR VALID_TIME AS OF pins the query to a single instant.
XTDB returns only the rows whose valid-time period contains that instant.
T005 (CommerzBank, valid to 2024-06-30) is excluded.
import pyarrow as paimport adbc_driver_flightsql.dbapi as flight_sql
SNAPSHOT_SQL = """\ SELECT _id, counterparty, notional, currency FROM trades FOR VALID_TIME AS OF TIMESTAMP '2024-12-31 23:59:59' ORDER BY _id"""
FSQL_URI = "grpc://localhost:9832"
with flight_sql.connect(FSQL_URI) as conn: with conn.cursor() as cur: cur.execute(SNAPSHOT_SQL) snapshot: pa.Table = cur.fetch_arrow_table()fetch_arrow_table() returns a pyarrow.Table.
The data travels from XTDB’s in-memory Arrow buffers across the FlightSQL gRPC stream and lands in the client as Arrow, so it isn’t decoded row by row or widened through a text wire format.
snapshot now holds 6 rows: T001–T004, T006, T007.
Step 3: Register the snapshot into DuckDB
Section titled “Step 3: Register the snapshot into DuckDB”import duckdb
duck = duckdb.connect() # in-memory DuckDB instanceduck.register("xtdb_trades", snapshot)duckdb.register() accepts any object that exposes the Arrow C Data Interface: a pyarrow.Table, RecordBatch, polars DataFrame, etc.
DuckDB operates directly on the Arrow buffers XTDB produced.
No copy, no decode.
Step 4: Create the counterparty-tier dimension in DuckDB
Section titled “Step 4: Create the counterparty-tier dimension in DuckDB”Enrichment data that doesn’t belong in XTDB lives here. In production this is typically a Parquet file on the local filesystem:
duck.execute("""\ CREATE TABLE counterparty_tiers AS SELECT * FROM (VALUES ('Barclays', 'Gold'), ('JPMorgan', 'Gold'), ('DeutscheBank', 'Silver'), ('SocGen', 'Silver'), ('CommerzBank', 'Bronze') ) AS t(counterparty, tier)""")Or from a Parquet file:
duck.execute("CREATE TABLE counterparty_tiers AS SELECT * FROM 'tiers.parquet'")Step 5: Join and aggregate in DuckDB
Section titled “Step 5: Join and aggregate in DuckDB”The join runs inside DuckDB, over the Arrow buffers XTDB produced, combining the bitemporal snapshot with DuckDB’s local lookup table. No round-trip to XTDB, no re-serialisation.
result: pa.Table = duck.execute("""\ SELECT ct.tier, count(*) AS trade_count, sum(t.notional) AS total_notional, round(avg(t.notional), 0) AS avg_notional FROM xtdb_trades AS t JOIN counterparty_tiers AS ct ON t.counterparty = ct.counterparty GROUP BY ct.tier ORDER BY total_notional DESC""").arrow().read_all()Output:
Exposure by counterparty tier, as-of Q4 2024 end: Tier Trades Total Notional Avg Notional ---------- ------ ---------------- -------------- Gold 3 6,700,000 2,233,333 Silver 3 3,500,000 1,166,667CommerzBank (Bronze, T005) is absent because it did not exist at Q4 end. The valid-time filter happened at the XTDB layer before any data crossed the wire.
Streaming batches instead of materialising the whole snapshot
Section titled “Streaming batches instead of materialising the whole snapshot”For result sets too large to fit in client memory, stream Arrow batches one at a time rather than calling fetch_arrow_table():
import adbc_driver_flightsql as flight_sql_raw
db = flight_sql_raw.connect(FSQL_URI)conn = db.new_connection()stmt = conn.new_statement()stmt.set_sql_query(SNAPSHOT_SQL)result, schema = stmt.execute_query()
# result is an ArrowReader; each iteration yields one RecordBatch.for batch in result: duck.register("xtdb_batch", batch) duck.execute("INSERT INTO aggregated SELECT ..., FROM xtdb_batch")
result.close()conn.close()db.close()The tradeoff:
fetch_arrow_table()materialises the full snapshot in one shot: simple, and the right default for snapshots that fit in RAM.- The
ArrowReaderpath streams batch-by-batch: the right choice when the XTDB result set is large and you can incrementally fold each batch into an ongoing DuckDB aggregate without materialising the whole thing.
Writing results back
Section titled “Writing results back”DuckDB query results can go back to XTDB via bulk ingest, or out to Parquet:
Back to XTDB:
# result is a pyarrow.Table; annotate with _id before ingest.import pyarrow.compute as pcresult_with_id = result.append_column( "_id", pa.array([f"tier-summary-{r['tier']}" for r in result.to_pylist()]))with flight_sql.connect(FSQL_URI) as conn: with conn.cursor() as cur: cur.adbc_ingest("tier_summaries", result_with_id, mode="create_append")To Parquet:
import pyarrow.parquet as pqpq.write_table(result, "tier_exposure_q4_2024.parquet")Runnable example
Section titled “Runnable example”A self-contained script that runs the full scenario is in
examples/streaming-into-duckdb/main.py.
docker run --rm -d --name xtdb-g5 -p 5432:5432 -p 9832:9832 ghcr.io/xtdb/xtdb:nightlypip install adbc-driver-flightsql pyarrow duckdb psycopgpython docs/src/content/docs/adbc/examples/streaming-into-duckdb/main.pydocker rm -f xtdb-g5Why this works without a copy
Section titled “Why this works without a copy”XTDB’s query engine produces Arrow RecordBatch objects internally.
The FlightSQL shim packages those batches into gRPC DoGet streams as-is.
The ADBC FlightSQL driver on the Python side decodes the gRPC framing and reconstructs the Arrow buffers in the client process.
duckdb.register() takes the Arrow C Data Interface pointer to those buffers: DuckDB’s planner sees the Arrow schema and DuckDB’s executor reads the data in-place.
The data moves as: XTDB in-memory Arrow → gRPC wire encoding → pyarrow Arrow buffer → DuckDB execution. There is no intermediate JSON, CSV, Postgres text wire, or pickle. The gRPC serialisation is the only encode/decode step, and it is Arrow’s own IPC format.