Getting Started with DBTK

Welcome to DBTK! This guide will get you up and running in 5 minutes.

What is DBTK?

DBTK (Data Benders Toolkit) is a Python library for ETL (Extract, Transform, Load) operations. It provides:

  • Universal database connectivity - One API for PostgreSQL, Oracle, MySQL, SQL Server, SQLite

  • Flexible file I/O - Read and write CSV, Excel, JSON, XML, compressed files

  • Smart data transformations - Field mapping, validation, type conversion

  • High-performance bulk operations - Process millions of records efficiently

  • Production-ready logging - Timestamped logs with automatic cleanup

Installation

# Basic installation
pip install dbtk

# Recommended (includes keyring, XML, Excel, phone/date helpers)
pip install dbtk[recommended]

# Database drivers (install as needed)
pip install psycopg2-binary      # PostgreSQL
pip install oracledb             # Oracle
pip install mysqlclient          # MySQL
pip install pyodbc               # SQL Server

Configuration Setup

To store connections and encrypt/decrypt passwords you will need a config file (~/.config/dbtk.yml) and an encryption key. The easiest method is to simply run the dbtk config-setup wizard from Powershell/shell.

# launch configuration wizard
dbtk config-setup

If the dbtk command is not found (common when pip-installing without admin privileges), use the python -m form instead — it always works regardless of PATH:

python -m dbtk config-setup

See the Configuration & Security for detailed instructions.

See the Troubleshooting Guide if the dbtk command is not found.

Your First ETL Pipeline (5 Minutes)

Step 1: Connect to a Database

import dbtk

# Option 1: From configuration file (recommended for production)
db = dbtk.connect('my_database')
cur = db.cursor()

# Option 2: Direct connection (quick for dev/testing)
from dbtk.database import postgres
db = postgres(user='admin', password='secret', database='mydb')
cur = db.cursor()

# Option 3: Cursor maintains reference to connection (cursor.connection) 
# and driver (cursor.connection.driver)
cur = dbtk.connect('my_database').cursor()
cur.connection.close()

Step 2: Read Data from a File

# Reads CSV, automatically detects format, handles compression
with dbtk.readers.get_reader('data.csv.gz') as reader:
    for record in reader:
        # Access data multiple ways
        print(record.name)        # Attribute access
        print(record['email'])    # Dict access
        print(record[0])          # Index access

Export Data

cursor.execute("SELECT * FROM users WHERE is_active = true")

# Excel — ready to open, no extra work required
dbtk.writers.to_excel(cursor, 'active_users.xlsx', sheet='Active Users')

What DBTK handles automatically:

  • Columns are sampled and auto-sized to fit the data

  • Header row is bold and frozen so it stays visible while scrolling

  • Date and datetime values are formatted correctly

  • None values become blank cells

Need other formats? Same data, one line each:

import dbtk.writers

cursor.execute("SELECT * FROM users WHERE is_active = true")
data = cursor.fetchall()  # must materialize when outputting to multiple sources

dbtk.writers.to_csv(data, 'active_users.csv')
dbtk.writers.to_json(data, 'active_users.json')
dbtk.writers.to_ndjson(data, 'active_users.ndjson')
dbtk.writers.to_xml(data, 'active_users.xml', root_element='users', record_element='user')

Complete Example: CSV to Database

import dbtk

# Setup logging (creates log file with defaults from user's configuration)
dbtk.setup_logging()

# Connect to database
with dbtk.connect('production_db') as db:
    cursor = db.cursor()

    # Define table with transforms
    orders_table = dbtk.etl.Table('orders', {
        'order_id': {'field': 'id', 'key': True},
        'customer_email': {'field': 'email', 'fn': 'email', 'nullable': False},
        'order_date': {'field': 'date', 'fn': 'date'},
        'amount': {'field': 'total', 'fn': 'float'},
        'status': {'default': 'pending'}
    }, cursor=cursor)

    # Load data with automatic batching and progress
    with dbtk.readers.get_reader('orders.csv.gz') as reader:
        surge = dbtk.etl.DataSurge(orders_table, use_transaction=True)
        errors = surge.insert(reader)

    print(f"Loaded {orders_table.counts['insert']} orders")
    print(f"Skipped {orders_table.counts['incomplete']} incomplete records")

# Check for errors
if dbtk.errors_logged():
    print("⚠️  Some errors occurred - check log file")

Core Concepts

1. Record Objects

Every cursor and file reader returns Record objects - that strike a balance between the memory efficiency and speed of lists and the functionality of dicts.

for row in cursor:
    row.name              # Attribute access
    row['email']          # Dict access
    row[0]                # Index access
    row[:3]               # Slicing
    id, name, email = row # Unpacking

See Record Objects for details.

2. Portable SQL

Write SQL once with named parameters - parameter style is determined by your database connection

# Write SQL with :named or %(pyformat)s parameters
import dbtk 

cursor = dbtk.connect('warehouse_prod').cursor()

# Create PreparedStatement from file
statement = cursor.prepare_file('users.sql')
statement.execute({'status': 'active', 'age': 18})

# Create PreparedStatement from query
sql = "SELECT * FROM users WHERE status = :status AND age > :age",
statement = cursor.prepare_query(sql)
statement.execute({'status': 'active', 'age': 18})

# One time query from file
cursor.execute_file('users.sql', {'status': 'active', 'age': 18})

# on the fly parameter conversion
cursor.execute(sql, {'status': 'active', 'age': 18}, convert_params=True)

# DBTK converts to your database's native format automatically
# Oracle: :status, :age
# PostgreSQL: %(status)s, %(age)s
# MySQL/SQLite: ?, ? (and reorders parameters)

See Database Connections for details.

3. Field Mapping and Transforms

Separate your database schema from your source data format:

table = dbtk.etl.Table('employees', {
    # Database column: source field + transforms
    'emp_id': {'field': 'employee_number', 'key': True},
    'full_name': {'field': 'name', 'fn': 'maxlen:100'},
    'hire_date': {'field': 'start_date', 'fn': 'date'},
    'department': {'field': 'dept_code', 'fn': 'lookup:departments:code:name'}
}, cursor=cursor)

from dbtk.etl import column_defs_from_db
# generate initial column configuration template from database schema
emp_cols = column_defs_from_db(cursor=cursor, table_name='hr.employees', add_comments=False)
# this returns a string representation to paste into your code and modify, not to use directly
print(emp_cols)

See ETL: Table & Transforms for details.

4. Bulk Operations

Process millions of records efficiently:

from dbtk.etl import Table, DataSurge, BulkSurge
contacts = Table('contacts', columns=contact_cols, cursor=cursor)
reader = dbtk.readers.get_reader('contacts.csv.gz')

# DataSurge: Fast batching with executemany 
surge = DataSurge(contacts)
surge.insert(reader)     # Supports MERGE, UPDATE, DELETE too

# BulkSurge: Maximum speed with native loading
surge = BulkSurge(contacts)
surge.load(reader)       # Uses COPY/direct-path/bcp depending on database

See ETL: DataSurge & BulkSurge for details.

Configuration

Create dbtk.yml in your project or ~/.config/dbtk.yml:

connections:
  production_db:
    type: postgres
    host: db.example.com
    database: production
    user: app_user
    encrypted_password: gAAAAABh...  # Use: dbtk encrypt-password

settings:
  default_batch_size: 2000
  logging:
    directory: ./logs
    level: INFO
    split_errors: true

Encrypt passwords:

# Generate encryption key and store in system keyring
dbtk store-key

# Encrypt a password
dbtk encrypt-password mypassword

# Encrypt all passwords in config file
dbtk encrypt-config dbtk.yml

See Configuration & Security for details.

Common Patterns

Update or Insert Logic

for record in reader:
    table.set_values(record)
    existing = table.fetch()  # Check if record exists
    operation = 'update' if existing else 'insert'
    
    if table.is_ready(operation):
        err = table.execute(operation, raise_error=False)
        if err:
            logger.warning(f"{operation} failed: {table.last_error.message}")
        elif operation == 'insert':
            id = cursor.lastrowid
# note: DataSurge supports merge/upsert for high-performance, bulk operations

Filtering Records

with dbtk.readers.get_reader('data.csv') as reader:
    reader.add_filter(lambda r: r.status == 'active')
    reader.add_filter(lambda r: r.age >= 18)
    for record in reader:
        process(record)

Multi-Stage ETL with State Persistence

from dbtk.etl import IdentityManager, EntityStatus

stmt = cursor.prepare_file('identity_lookup.sql')

# Map source IDs to target IDs with caching
im = IdentityManager('source_id', 'target_id', resolver=stmt)

for record in reader:
    entity = im.resolve(record)  # Gets target_id, writes to record
    if entity['_status'] == EntityStatus.RESOLVED:
        table.set_values(record)
        if not table.is_ready('insert'):
            missing = ', '.join(table.reqs_missing('insert'))
            im.add_message(record.source_id, f"Skipped - missing values: {missing}")
        err = table.execute('insert')
        if err:
            im.add_error(record.source_id, table.last_error)

# Save state for next run
im.save_state('state/entities.json')

See ETL: Tools & Logging for details.

Next Steps

Essential Documentation

Advanced Topics

Get Help

Quick Command Reference

# If 'dbtk' is not in PATH, prefix any command with 'python -m':
python -m dbtk config-setup

# Check installation and available drivers
dbtk checkup

# Password encryption
dbtk generate-key                    # Generate encryption key
dbtk store-key                       # Store key in system keyring
dbtk encrypt-password <password>     # Encrypt a single password
dbtk encrypt-config dbtk.yml         # Encrypt all passwords in config

# Log management
# Cleanup handled automatically with dbtk.cleanup_old_logs()

Ready to build production ETL pipelines? Start with the examples above and explore the documentation links!