Database Connections
DBTK provides a unified interface for connecting to multiple database types with consistent APIs and smart cursor handling.
Quick Start
import dbtk
# From configuration file
db = dbtk.connect('production_db')
# Direct connection
from dbtk.database import postgres, oracle, mysql, sqlserver, sqlite
# non-standard connection parameters are automatically mapped, standard ports are defaulted
db = postgres(user='admin', password='secret', database='mydb', host='localhost')
db = oracle(user='admin', password='secret', database='ORCL', host='localhost')
db = mysql(user='admin', password='secret', database='mydb')
db = sqlserver(user='admin', password='secret', database='mydb', host='localhost')
db = sqlite('path/to/database.db')
Supported Databases
DBTK supports multiple database drivers with automatic detection and fallback:
Database |
Driver |
Install Command |
Notes |
|---|---|---|---|
PostgreSQL |
psycopg2 |
|
Recommended, most mature |
PostgreSQL |
psycopg (3) |
|
Newest version, async support |
PostgreSQL |
pgdb |
|
DB-API compliant |
Oracle |
oracledb |
|
Thin mode - no Oracle client required |
Oracle |
cx_Oracle |
|
Requires Oracle client installation |
MySQL |
mysqlclient |
|
Fastest option, C extension, module name MySQLdb |
MySQL |
mariadb |
|
Official MariaDB connector, C extension, MySQL compatible |
MySQL |
mysql.connector |
|
Official MySQL connector |
MySQL |
pymysql |
|
Pure Python, lightweight |
SQL Server |
pyodbc |
|
ODBC driver required on system |
SQL Server |
pymssql |
|
Lightweight, no ODBC needed |
SQLite |
sqlite3 |
Built-in |
No installation needed |
Driver priority: DBTK automatically selects the best available driver based on priority. Override with driver='driver_name' in your connection config or function call.
The Database Object
The Database class wraps database connections and provides a consistent interface:
db = dbtk.connect('my_database')
# Connection info
print(db.database_type) # 'postgres', 'oracle', 'mysql', 'sqlserver', 'sqlite'
print(db.database_name) # Database/schema name
print(db.driver) # The underlying driver module (psycopg2, oracledb, etc.)
print(db.placeholder) # Parameter placeholder for this driver
# Create cursors
cursor = db.cursor()
# Transaction management
db.commit()
db.rollback()
db.close()
# Parameter style help
db.param_help() # Shows this driver's parameter style with examples
# psycopg2's parameter style is "pyformat"
# "SELECT * FROM people WHERE name = %s AND age > %s", ("Smith", 30)
# "SELECT * FROM people WHERE name = %(name)s AND age > %(age)s", {"name": "Smith", "age": 30}
Context Managers
# Connection automatically closed
with dbtk.connect('production_db') as db:
cursor = db.cursor()
cursor.execute("SELECT * FROM users")
users = cursor.fetchall()
# Transaction - auto-commit on success, rollback on exception
with db.transaction():
cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
cursor.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
Access the Full Stack
DBTK maintains a clean reference hierarchy for accessing the underlying driver:
cursor = dbtk.connect('imdb').cursor()
# The Cursor maintains a reference to the Database connection
print(cursor.connection.connection_name) # imdb
# The Database maintains a reference to the driver
print(cursor.connection.driver.__name__) # 'psycopg2', 'oracledb', etc.
# Access the wrapped connection or cursor
cursor.connection._connection
cursor._cursor
# Use driver exceptions
try:
cursor.execute(sql)
except cursor.connection.driver.DatabaseError as e:
cursor.connection.rollback()
logger.error(f"Database error: {e}")
Cursors and Records
All DBTK cursors return Record objects - a hybrid data structure that is memory efficient like at tuple but the functionality of dict or an object.
cursor = db.cursor()
cursor.execute("SELECT id, name, email FROM users WHERE status = :status",
{'status': 'active'})
for user in cursor:
user['name'] # Dict-style access
user.email # Attribute access
user[0] # Index access
user[:2] # Slicing
id, name, email = user # Tuple unpacking
Records store both the original column names, and also normalize column names for attribute access, so row.employee_id (and row['employee_id']) works whether the source column is Employee_ID, EMPLOYEE ID, or employee_id. This makes your Table field mappings resilient to source naming inconsistencies.
See Record Objects for full documentation on access patterns, normalization, mutation, and performance characteristics.
Cursor Configuration
cursor = db.cursor(
batch_size=5000, # Rows per batch in bulk operations
debug=True, # Print SQL queries and bind variables
return_cursor=True, # execute() returns cursor for chaining
)
# With return_cursor=True, you can chain calls
user = cursor.execute("SELECT * FROM users WHERE status = 'active'").fetchone()
Default cursor settings can be configured per-connection in the YAML config file or passed to dbtk.connect().
See Configuration for detailed connection configuration documentation.
connections:
my_database:
type: postgres
host: localhost
database: mydb
user: myuser
cursor:
batch_size: 4000
debug: false
return_cursor: true
Parameter Styles
Native Parameter Styles
Database |
Dictionary Style |
Positional Style |
Placeholder |
|---|---|---|---|
PostgreSQL |
pyformat |
format |
|
Oracle |
named |
numeric |
|
MySQL |
format |
|
|
SQL Server (pyodbc) |
qmark |
|
|
SQLite |
qmark |
|
Cursor Methods
Executing Queries
# MySQL — %s (format) paramstyle
mysql_cur.execute("SELECT * FROM users WHERE id = %s", (42,))
# Oracle — :named paramstyle
ora_cur.execute("SELECT * FROM users WHERE id = :id", {'id': 42})
# MS SQL Server — ? (qmark) paramstyle, convert_params rewrites :named → ?
mssql_cur.execute("SELECT * FROM users WHERE id = :id", {'id': 42}, convert_params=True)
# Multiple rows (batch)
cursor.executemany(
"INSERT INTO users (name, email) VALUES (:name, :email)",
[
{'name': 'Alice', 'email': 'alice@example.com'},
{'name': 'Bob', 'email': 'bob@example.com'}
]
)
# Execute from SQL file with portable parameter conversion
cursor.execute_file('queries/create_schema.sql', {'status': 'active'})
Prepared Statements
For queries executed repeatedly with different parameters, PreparedStatement transforms the SQL to match the database’s paramstyle and caches the parameter mapping.
import dbtk
# From a query string — mirrors cursor.prepare_file() for inline SQL
orders_stmt = cursor.prepare_query("SELECT * FROM orders WHERE customer_id = :id")
# From a query file
movies_stmt = cursor.prepare_file('queries/list_movies.sql')
# Execute many times efficiently
for user in users:
orders_stmt.execute({'id': user.id})
order = orders_stmt.fetchone()
process(user, order)
PreparedStatement is also available directly from dbtk or dbtk.cursors if you need to construct one explicitly:
import dbtk
stmt = dbtk.PreparedStatement(cursor, query="SELECT * FROM orders WHERE customer_id = :id")
stmt = dbtk.PreparedStatement(cursor, filename='queries/get_user.sql')
Parameter Conversion
DBTK has tools to handle different parameter styles. You can use named (:name) or pyformat (%(name)s) in queries - DBTK can convert to the driver’s native style.
By default, cursor.execute() passes the query and parameters directly to the underlying driver — no rewriting is done. Use convert_params=True for a one-off conversion. cursor.execute_file() and PreparedStatement convert automatically.
Oracle and PostgreSQL support both dictionary and positional parameters. Their default (db.driver.paramstyle) will be the dictionary style. If you want force positional mode, you can override the paramstyle as in the example below.
from dbtk.utils import process_sql_parameters, ParamStyle
sql = 'SELECT * FROM users WHERE name = :name AND age > :age'
# query automatically rewritten and parameters formatted to match paramstyle
statement = dbtk.cursors.PreparedStatement(cursor, sql)
# Use a positional style if you are doing a large executemany,
positional_style = ParamStyle.get_positional_style(cur.paramstyle)
# `process_sql_parameters` to manually convert a query and get parameter order
query, param_names = process_sql_parameters(sql, positional_style)
print(param_names) # ['name', 'age']
# extra params ignored, missing defaulted
params = {'name': 'Aang',
'rank': 'Avatar'}
# `prepare_params` converts params to whatever is needed by the query
bind_vars = cursor.prepare_params(param_names, params,
paramstyle=positional_style)
# ['Aang', None] or {'name': 'Aang', 'age': None} depending on paramstyle
Fetching Results
# Fetch one record
user = cursor.fetchone()
# Fetch many records
users = cursor.fetchmany(100)
# Fetch all records
all_users = cursor.fetchall()
# Iterate (memory efficient for large result sets)
for user in cursor:
process(user)
# Fetch exactly one (raises error if 0 or >1 rows)
user = cursor.selectinto("SELECT * FROM users WHERE id = :id", {'id': 42})
Cursor Properties
# Column names (after executing a query)
columns = cursor.columns() # Original names
normalized = cursor.columns(normalized=True) # Sanitized for Python identifiers
# Row count (if available)
count = cursor.rowcount
# Description (DB-API standard)
desc = cursor.description
Transaction Management
Manual Transactions
try:
cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
cursor.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
cursor.connection.commit()
except Exception:
cursor.connection.rollback()
raise
Context Manager Transactions
with db.transaction():
cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
cursor.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
# Commits automatically on success, rolls back on exception
Error Handling
try:
cursor.execute("SELECT * FROM nonexistent_table")
except db.driver.DatabaseError as e:
print(f"Database error: {e}")
except db.driver.IntegrityError as e:
print(f"Integrity constraint violated: {e}")
Benefits of SQL files:
Keep SQL separate from Python for better organisation and editor syntax highlighting
Test queries independently before integration
Reuse the same query across different scripts
Write once, run on any database
Best Practices
Use context managers - Ensures connections are properly closed
Use named/pyformat parameters - More readable and portable across databases
Iterate large result sets - Don’t
fetchall()on millions of rowsUse
transaction()context manager - Safe commit/rollback handlingUse configuration files - Keep credentials out of code
Use
cursor.execute_file()- Portable SQL with automatic parameter conversionUse
PreparedStatement- Portable SQL to be executed repeatedly
See Also
Record Objects - Full documentation on DBTK’s universal data structure
Configuration & Security - YAML config files and password encryption
ETL: Table & Transforms - Using cursors with Table and DataSurge
ETL: Tools & Logging - IdentityManager and ValidationCollector
Readers - Reading data from files and databases
Writers - Writing data to files and databases