Getting Started with DBTK
Welcome to DBTK! This guide will get you up and running in 5 minutes.
What is DBTK?
DBTK (Data Benders Toolkit) is a Python library for ETL (Extract, Transform, Load) operations. It provides:
Universal database connectivity - One API for PostgreSQL, Oracle, MySQL, SQL Server, SQLite
Flexible file I/O - Read and write CSV, Excel, JSON, XML, compressed files
Smart data transformations - Field mapping, validation, type conversion
High-performance bulk operations - Process millions of records efficiently
Production-ready logging - Timestamped logs with automatic cleanup
Installation
# Basic installation
pip install dbtk
# Recommended (includes keyring, XML, Excel, phone/date helpers)
pip install dbtk[recommended]
# Database drivers (install as needed)
pip install psycopg2-binary # PostgreSQL
pip install oracledb # Oracle
pip install mysqlclient # MySQL
pip install pyodbc # SQL Server
Configuration Setup
To store connections and encrypt/decrypt passwords you will need a config file (~/.config/dbtk.yml) and an encryption key. The easiest method is to simply run the dbtk config-setup wizard from Powershell/shell.
# launch configuration wizard
dbtk config-setup
If the dbtk command is not found (common when pip-installing without admin privileges), use the python -m form instead — it always works regardless of PATH:
python -m dbtk config-setup
See the Configuration & Security for detailed instructions.
See the Troubleshooting Guide if the dbtk command is not found.
Your First ETL Pipeline (5 Minutes)
Step 1: Connect to a Database
import dbtk
# Option 1: From configuration file (recommended for production)
db = dbtk.connect('my_database')
cur = db.cursor()
# Option 2: Direct connection (quick for dev/testing)
from dbtk.database import postgres
db = postgres(user='admin', password='secret', database='mydb')
cur = db.cursor()
# Option 3: Cursor maintains reference to connection (cursor.connection)
# and driver (cursor.connection.driver)
cur = dbtk.connect('my_database').cursor()
cur.connection.close()
Step 2: Read Data from a File
# Reads CSV, automatically detects format, handles compression
with dbtk.readers.get_reader('data.csv.gz') as reader:
for record in reader:
# Access data multiple ways
print(record.name) # Attribute access
print(record['email']) # Dict access
print(record[0]) # Index access
Export Data
cursor.execute("SELECT * FROM users WHERE is_active = true")
# Excel — ready to open, no extra work required
dbtk.writers.to_excel(cursor, 'active_users.xlsx', sheet='Active Users')
What DBTK handles automatically:
Columns are sampled and auto-sized to fit the data
Header row is bold and frozen so it stays visible while scrolling
Date and datetime values are formatted correctly
Nonevalues become blank cells
Need other formats? Same data, one line each:
import dbtk.writers
cursor.execute("SELECT * FROM users WHERE is_active = true")
data = cursor.fetchall() # must materialize when outputting to multiple sources
dbtk.writers.to_csv(data, 'active_users.csv')
dbtk.writers.to_json(data, 'active_users.json')
dbtk.writers.to_ndjson(data, 'active_users.ndjson')
dbtk.writers.to_xml(data, 'active_users.xml', root_element='users', record_element='user')
Complete Example: CSV to Database
import dbtk
# Setup logging (creates log file with defaults from user's configuration)
dbtk.setup_logging()
# Connect to database
with dbtk.connect('production_db') as db:
cursor = db.cursor()
# Define table with transforms
orders_table = dbtk.etl.Table('orders', {
'order_id': {'field': 'id', 'key': True},
'customer_email': {'field': 'email', 'fn': 'email', 'nullable': False},
'order_date': {'field': 'date', 'fn': 'date'},
'amount': {'field': 'total', 'fn': 'float'},
'status': {'default': 'pending'}
}, cursor=cursor)
# Load data with automatic batching and progress
with dbtk.readers.get_reader('orders.csv.gz') as reader:
surge = dbtk.etl.DataSurge(orders_table, use_transaction=True)
errors = surge.insert(reader)
print(f"Loaded {orders_table.counts['insert']} orders")
print(f"Skipped {orders_table.counts['incomplete']} incomplete records")
# Check for errors
if dbtk.errors_logged():
print("⚠️ Some errors occurred - check log file")
Core Concepts
1. Record Objects
Every cursor and file reader returns Record objects - that strike a balance between the memory efficiency and speed of lists and the functionality of dicts.
for row in cursor:
row.name # Attribute access
row['email'] # Dict access
row[0] # Index access
row[:3] # Slicing
id, name, email = row # Unpacking
See Record Objects for details.
2. Portable SQL
Write SQL once with named parameters - parameter style is determined by your database connection
# Write SQL with :named or %(pyformat)s parameters
import dbtk
cursor = dbtk.connect('warehouse_prod').cursor()
# Create PreparedStatement from file
statement = cursor.prepare_file('users.sql')
statement.execute({'status': 'active', 'age': 18})
# Create PreparedStatement from query
sql = "SELECT * FROM users WHERE status = :status AND age > :age",
statement = cursor.prepare_query(sql)
statement.execute({'status': 'active', 'age': 18})
# One time query from file
cursor.execute_file('users.sql', {'status': 'active', 'age': 18})
# on the fly parameter conversion
cursor.execute(sql, {'status': 'active', 'age': 18}, convert_params=True)
# DBTK converts to your database's native format automatically
# Oracle: :status, :age
# PostgreSQL: %(status)s, %(age)s
# MySQL/SQLite: ?, ? (and reorders parameters)
See Database Connections for details.
3. Field Mapping and Transforms
Separate your database schema from your source data format:
table = dbtk.etl.Table('employees', {
# Database column: source field + transforms
'emp_id': {'field': 'employee_number', 'key': True},
'full_name': {'field': 'name', 'fn': 'maxlen:100'},
'hire_date': {'field': 'start_date', 'fn': 'date'},
'department': {'field': 'dept_code', 'fn': 'lookup:departments:code:name'}
}, cursor=cursor)
from dbtk.etl import column_defs_from_db
# generate initial column configuration template from database schema
emp_cols = column_defs_from_db(cursor=cursor, table_name='hr.employees', add_comments=False)
# this returns a string representation to paste into your code and modify, not to use directly
print(emp_cols)
See ETL: Table & Transforms for details.
4. Bulk Operations
Process millions of records efficiently:
from dbtk.etl import Table, DataSurge, BulkSurge
contacts = Table('contacts', columns=contact_cols, cursor=cursor)
reader = dbtk.readers.get_reader('contacts.csv.gz')
# DataSurge: Fast batching with executemany
surge = DataSurge(contacts)
surge.insert(reader) # Supports MERGE, UPDATE, DELETE too
# BulkSurge: Maximum speed with native loading
surge = BulkSurge(contacts)
surge.load(reader) # Uses COPY/direct-path/bcp depending on database
See ETL: DataSurge & BulkSurge for details.
Configuration
Create dbtk.yml in your project or ~/.config/dbtk.yml:
connections:
production_db:
type: postgres
host: db.example.com
database: production
user: app_user
encrypted_password: gAAAAABh... # Use: dbtk encrypt-password
settings:
default_batch_size: 2000
logging:
directory: ./logs
level: INFO
split_errors: true
Encrypt passwords:
# Generate encryption key and store in system keyring
dbtk store-key
# Encrypt a password
dbtk encrypt-password mypassword
# Encrypt all passwords in config file
dbtk encrypt-config dbtk.yml
See Configuration & Security for details.
Common Patterns
Update or Insert Logic
for record in reader:
table.set_values(record)
existing = table.fetch() # Check if record exists
operation = 'update' if existing else 'insert'
if table.is_ready(operation):
err = table.execute(operation, raise_error=False)
if err:
logger.warning(f"{operation} failed: {table.last_error.message}")
elif operation == 'insert':
id = cursor.lastrowid
# note: DataSurge supports merge/upsert for high-performance, bulk operations
Filtering Records
with dbtk.readers.get_reader('data.csv') as reader:
reader.add_filter(lambda r: r.status == 'active')
reader.add_filter(lambda r: r.age >= 18)
for record in reader:
process(record)
Multi-Stage ETL with State Persistence
from dbtk.etl import IdentityManager, EntityStatus
stmt = cursor.prepare_file('identity_lookup.sql')
# Map source IDs to target IDs with caching
im = IdentityManager('source_id', 'target_id', resolver=stmt)
for record in reader:
entity = im.resolve(record) # Gets target_id, writes to record
if entity['_status'] == EntityStatus.RESOLVED:
table.set_values(record)
if not table.is_ready('insert'):
missing = ', '.join(table.reqs_missing('insert'))
im.add_message(record.source_id, f"Skipped - missing values: {missing}")
err = table.execute('insert')
if err:
im.add_error(record.source_id, table.last_error)
# Save state for next run
im.save_state('state/entities.json')
See ETL: Tools & Logging for details.
Next Steps
Essential Documentation
Database Connections - Cursors, transactions, SQL files
Record Objects - Universal data structure
Readers - Reading data from files and databases
Writers - Writing data to files and databases
ETL: Table & Transforms - Field mapping, transformations
ETL: DataSurge & BulkSurge - High-performance loading
Advanced Topics
ETL: Tools & Logging - IdentityManager, ValidationCollector
Configuration & Security - Encryption, environment variables
Advanced Features - Performance tuning, custom drivers
Get Help
Check Troubleshooting for common issues
Review API Reference for all methods
See examples in the
/examplesfolderReport issues at GitHub Issues
Quick Command Reference
# If 'dbtk' is not in PATH, prefix any command with 'python -m':
python -m dbtk config-setup
# Check installation and available drivers
dbtk checkup
# Password encryption
dbtk generate-key # Generate encryption key
dbtk store-key # Store key in system keyring
dbtk encrypt-password <password> # Encrypt a single password
dbtk encrypt-config dbtk.yml # Encrypt all passwords in config
# Log management
# Cleanup handled automatically with dbtk.cleanup_old_logs()
Ready to build production ETL pipelines? Start with the examples above and explore the documentation links!