import asyncio
import psycopg as pg
DB_PARAMS = {
"host": "localhost",
"port": 5432
}
async def insert_trades(conn, trades):
query = """
INSERT INTO trades (_id, name, quantity) VALUES (%s, %s, %s)
"""
async with conn.cursor() as cur:
for trade in trades:
trade_values = (trade["_id"], trade["name"], trade["quantity"])
await cur.execute(query, trade_values)
async def get_trades_over(conn, quantity):
query = """
SELECT * FROM trades WHERE quantity > %s
"""
async with conn.cursor() as cur:
await cur.execute(query, (quantity,))
return await cur.fetchall()
async def main():
trades = [
{"_id": 1, "name": "Trade1", "quantity": 1001},
{"_id": 2, "name": "Trade2", "quantity": 15},
{"_id": 3, "name": "Trade3", "quantity": 200},
]
try:
async with await pg.AsyncConnection.connect(**DB_PARAMS, autocommit=True) as conn:
# required for now https://github.com/xtdb/xtdb/issues/3589
conn.adapters.register_dumper(str, pg.types.string.StrDumperVarchar)
await insert_trades(conn, trades)
print("Trades inserted successfully")
result = await get_trades_over(conn, 100)
print(result)
except Exception as error:
print(f"Error occurred: {error}")
if __name__ == "__main__":
asyncio.run(main())
Using XTDB from Python
In Python, you can talk to a running XTDB node using the standard psycopg tooling, taking advantage of XTDB’s Postgres wire-compatibility.