API Reference

Configuration

Configuration management for database connections. Supports YAML configuration files with optional password encryption and global settings.

class dbtk.config.ConfigManager(config_file=None)[source]

Bases: object

Manage DBTK configuration from YAML files.

ConfigManager handles loading and parsing YAML configuration files that define database connections, encrypted passwords, and global settings. It searches for configuration files in standard locations, validates the structure, and provides methods for accessing connections and passwords.

The manager supports encrypted passwords using Fernet symmetric encryption, environment variable substitution, and automatic sample config generation for new users.

Configuration File Structure

# dbtk.yml
settings:
  default_timezone: UTC
  default_country: US
  default_paramstyle: named

connections:
  my_db:
    type: postgres
    host: localhost
    database: myapp
    user: admin
    encrypted_password: gAAAAABh...

passwords:
  api_key:
    encrypted_password: gAAAAABh...
    description: API key for external service

Configuration Locations

ConfigManager searches for configuration files in this order:

  1. File specified in config_file parameter

  2. ./dbtk.yml (current directory)

  3. ./dbtk.yaml (current directory)

  4. ~/.config/dbtk.yml (user config directory)

  5. ~/.config/dbtk.yaml (user config directory)

If no config is found, creates a sample config at ~/.config/dbtk_sample.yml.

param config_file:

Path to YAML config file. If None, searches standard locations.

type config_file:

str or Path, optional

config_file

Path to the loaded configuration file

Type:

Path

config

Parsed configuration dictionary

Type:

dict

Example

from dbtk.config import ConfigManager

# Load from default location
config_mgr = ConfigManager()

# Access connection settings
conn_params = config_mgr.get_connection('production_db')

# Get encrypted password
api_key = config_mgr.get_password('external_api')

# Load specific config file
config_mgr = ConfigManager('/path/to/custom.yml')

See also

dbtk.connect

Connect to database using config

generate_encryption_key

Create encryption key for passwords

encrypt_config_file

Encrypt passwords in config file

Notes

  • YAML files must have .yml or .yaml extension

  • Connections require ‘type’ field (postgres, oracle, mysql, etc.)

  • Encrypted passwords require DBTK_ENCRYPTION_KEY environment variable

  • Environment variables can be used with ${VAR_NAME} syntax

  • Sample config is created at ~/.config/dbtk_sample.yml on first run if no config exists

__init__(config_file=None)[source]

Initialize config manager and load configuration.

Parameters:

config_file (str or Path, optional) –

Path to YAML config file. If None, searches for:

  • dbtk.yml in current directory

  • dbtk.yaml in current directory

  • ~/.config/dbtk.yml

  • ~/.config/dbtk.yaml

Raises:
  • FileNotFoundError – If no config file found in any search location

  • ValueError – If config file is invalid or malformed

Example

# Use default config location
config = ConfigManager()

# Use specific config file
config = ConfigManager('/etc/dbtk/production.yml')

# Config auto-creates sample if none exists
config = ConfigManager()  # Creates ~/.config/dbtk.yml if needed
add_password(name, password, description=None, encrypt=True)[source]

Add or update a password entry.

Parameters:
  • name (str) – Password name/key

  • password (str) – Password value

  • description (str) – Optional description

  • encrypt (bool) – Whether to encrypt the password (default: True)

decrypt_password(encrypted_password)[source]

Decrypt an encrypted password.

encrypt_password(password)[source]

Encrypt a password for storage.

get_connection_config(name)[source]

Get configuration for a named connection.

get_password(name)[source]

Get a stored password by name.

Parameters:

name (str) – Password name/key

Returns:

Decrypted password string

Raises:

ValueError – If password not found or decryption fails

Return type:

str

get_setting(key, default=None)[source]

Get a setting value from the config.

Parameters:
  • key (str) – Setting key (supports dot notation like ‘database.timeout’)

  • default (Any) – Default value if key not found

Returns:

Setting value or default

Return type:

Any

Example

timeout = config.get_setting(‘database.timeout’, 30) tz = config.get_setting(‘default_timezone’, ‘UTC’)

list_connections()[source]

List all available connection names.

list_passwords()[source]

List all available password names.

remove_password(name)[source]

Remove a password entry.

Parameters:

name (str) – Password name to remove

Raises:

ValueError – If password not found

set_setting(key, value)[source]

Set a setting value and save config.

Parameters:
  • key (str) – Setting key (supports dot notation)

  • value (Any) – Setting value

dbtk.config.connect(name, password=None, config_file=None)[source]

Connect to a named database from configuration.

Parameters:
  • name (str) – Connection name from config file

  • password (str) – Optional password if not stored in config

  • config_file (str | Path | None) – Optional path to config file

Returns:

Database connection instance

Return type:

Database

Example

db = connect(‘prod_warehouse’) cursor = db.cursor() cursor.execute(“SELECT * FROM users”)

dbtk.config.diagnose_config(config_file=None)[source]

Full config health check using a real ConfigManager instance.

dbtk.config.encrypt_config_file(filename=None)[source]

CLI Utility to encrypt all passwords in a config file.

dbtk.config.encrypt_password(password=None, encryption_key=None)[source]

CLI utility function to encrypt a password.

Parameters:
  • password (str) – Password to encrypt (if None, prompts for input)

  • encryption_key (str) – Optional encryption key. If None, uses DBTK_ENCRYPTION_KEY env var

Returns:

Encrypted password

Return type:

str

dbtk.config.generate_encryption_key()[source]

Generate a random encryption key.

This function generates a random encryption key that can be used to encrypt and decrypt data securely. The key is returned as a string and should be stored in the DBTK_ENCRYPTION_KEY environment variable or on keyring by calling dbtk store-key [your key]

Returns:

A randomly generated encryption key.

Return type:

str

dbtk.config.get_password(name, config_file=None)[source]

Get a stored password from configuration.

Parameters:
  • name (str) – Password name from config file

  • config_file (str | Path | None) – Optional path to config file

Returns:

Decrypted password string

Return type:

str

Example

api_key = get_password(‘openai_api_key’) secret = get_password(‘jwt_secret’)

dbtk.config.get_setting(key, default=None, config_file=None)[source]

Get a setting value from configuration.

Parameters:
  • key (str) – Setting key (supports dot notation like ‘database.timeout’)

  • default (Any) – Default value if key not found

  • config_file (str | Path | None) – Optional path to config file

Returns:

Setting value or default

Return type:

Any

Example

timeout = get_setting(‘database.timeout’, 30) tz = get_setting(‘default_timezone’, ‘UTC’)

dbtk.config.migrate_config(source_file, target_file, new_encryption_key)[source]

Migrate config file with new encryption key.

dbtk.config.set_config_file(config_file)[source]

Set the configuration file to use globally.

dbtk.config.setup_config()[source]

Interactive setup wizard for DBTK configuration.

This command guides you through: - Choosing config file location (project vs user) - Creating a config file from dbtk_sample.yml - Setting up encryption (keyring or environment variable) - Adding database connections

Example

# Interactive setup dbtk config-setup

dbtk.config.store_key(key=None, force=False)[source]

CLI utility to store encryption key in system keyring.

Database Connections

Database connection wrapper that provides a uniform interface to different database adapters.

class dbtk.database.Database(connection, driver, database_name=None, connection_name=None, cursor_settings=None)[source]

Bases: object

Database connection wrapper providing a uniform interface across database adapters.

The Database class wraps database-specific connection objects and provides a consistent API regardless of which database driver is being used (psycopg2, oracledb, mysqlclient, etc.). It handles parameter style conversions, manages cursors, and delegates attribute access to the underlying connection for driver-specific functionality.

Key features:

  • Unified interface - Same API for PostgreSQL, Oracle, MySQL, SQL Server, SQLite

  • Cursor factory - Create different cursor types (Record, tuple, dict, list)

  • Transaction management - Context managers for safe transactions

  • Attribute delegation - Access underlying driver features when needed

  • Parameter style abstraction - Automatic handling of different bind parameter formats

driver

The database adapter module (e.g., psycopg2, oracledb)

database_type

Database type: ‘postgres’, ‘oracle’, ‘mysql’, ‘sqlserver’, or ‘sqlite’

Type:

str

database_name

Name of the connected database

Type:

str

placeholder

Parameter placeholder for this database’s parameter style

Type:

str

Example

import dbtk

# Create connection
db = dbtk.database.postgres(user='admin', password='secret', database='mydb')

# Or from configuration
db = dbtk.connect('production_db')

# Use as context manager
with db:
    cursor = db.cursor()
    cursor.execute("SELECT * FROM users WHERE status = :status", {'status': 'active'})
    users = cursor.fetchall()

# Manual connection management
db = dbtk.connect('production_db')
cursor = db.cursor('dict')  # Dictionary cursor
cursor.execute("SELECT * FROM orders")
db.commit()
db.close()

See also

dbtk.connect

Connect to database from configuration

Database.cursor

Create a cursor for executing queries

Database.transaction

Context manager for transactions

__init__(connection, driver, database_name=None, connection_name=None, cursor_settings=None)[source]

Initialize Database wrapper around an existing connection.

This is typically called by connection factory functions rather than directly. Use dbtk.database.postgres(), dbtk.connect(), etc. instead.

Parameters:
  • connection – Underlying database connection object from the adapter

  • driver – Database adapter module (psycopg2, oracledb, mysqlclient, etc.)

  • database_name (str, optional) – Name of the database. If None, attempts to extract from connection.

  • connection_name (str, optional) – Name/alias from config file (e.g., ‘imdb’, ‘prod_db’). None if not from config.

  • cursor_settings (dict, optional) – Values passed to the cursor constructor. e.g. {‘batch_size’: 2000}

Example

import psycopg2
from dbtk.database import Database

# Direct instantiation (not typical)
conn = psycopg2.connect(dbname='mydb', user='admin', password='secret')
db = Database(conn, psycopg2, 'mydb')

# Typical usage via factory functions
db = dbtk.database.postgres(user='admin', password='secret', database='mydb')
classmethod create(db_type, driver=None, connection_name=None, cursor_settings=None, **kwargs)[source]

Factory method to create database connections.

Parameters:
  • db_type (str) – Database type (‘postgres’, ‘oracle’, ‘mysql’, etc.)

  • driver (str | None) – Specific driver to use (optional)

  • connection_name (str | None) – Config file connection name/alias (optional)

  • cursor_settings (dict | None) – Defaults to use when creating cursors.

  • **kwargs – Connection parameters

Returns:

Database instance

Return type:

Database

cursor(**kwargs)[source]

Create a cursor for executing database queries.

Returns a cursor that yields Record objects, providing flexible access to query results through attribute, dictionary, and index notation.

Parameters:

**kwargs

Optional cursor configuration:

  • batch_size (int) - Rows to process at once in bulk operations

  • debug (bool) - Enable debug output showing queries and bind variables

  • return_cursor (bool) - If True, execute() returns cursor for method chaining

Returns:

Cursor instance that returns Record objects

Return type:

Cursor

Example

# Create cursor - returns Records
cursor = db.cursor()
cursor.execute("SELECT id, name, email FROM users WHERE status = :status",
              {'status': 'active'})

# Record supports multiple access patterns
for row in cursor:
    print(row['name'])    # Dictionary access
    print(row.name)       # Attribute access
    print(row[1])         # Index access
    print(row.get('phone', 'N/A'))  # Safe access with default

# With configuration options
cursor = db.cursor(debug=True)

See also

Record

Flexible row object with dict, attribute, and index access

Database.transaction

Context manager for safe transactions

property dialect

Database dialect instance providing SQL generation and schema introspection.

param_help()[source]

Print help on this driver’s parameter style.

transaction()[source]

Context manager for safe database transactions.

Automatically commits the transaction on successful completion or rolls back if an exception occurs. This ensures transactions are properly cleaned up even if errors occur.

Yields:

Database – This database connection instance

Raises:

Exception – Re-raises any exception that occurs within the transaction block after rolling back the transaction

Example

with db.transaction():
    cursor = db.cursor()
    cursor.execute("INSERT INTO orders (customer, amount) VALUES (:c, :a)",
                 {'c': 'Aang', 'a': 100})
    cursor.execute("UPDATE inventory SET stock = stock - 1 WHERE item_id = :id",
                 {'id': 42})
    # Automatically commits on success

# If exception occurs, transaction is automatically rolled back
try:
    with db.transaction():
        cursor = db.cursor()
        cursor.execute("INSERT INTO invalid_table ...")  # Raises error
        # Rollback happens automatically
except Exception as e:
    logger.error(f"Transaction failed: {e}")

See also

Database.commit

Manually commit a transaction

Database.rollback

Manually roll back a transaction

dbtk.database.get_supported_db_types()[source]

Get all supported database types.

dbtk.database.mysql(user, password=None, database='mysql', host='localhost', port=3306, driver=None, **kwargs)[source]

Create a MySQL/MariaDB database connection.

Automatically selects the best available MySQL driver (mysqlclient, mysql.connector, pymysql, or MySQLdb).

Parameters:
  • user (str) – Database username

  • password (str | None) – Database password

  • database (str) – Database name (default: ‘mysql’)

  • host (str) – Server hostname or IP (default: ‘localhost’)

  • port (int) – Server port (default: 3306)

  • driver (str) – Specific driver to use (‘mysqlclient’, ‘mysql.connector’, ‘pymysql’, ‘MySQLdb’)

  • **kwargs – Additional driver-specific parameters (charset, ssl, etc.)

Returns:

Database connection object with context manager support

Return type:

Database

Example

::
>>> from dbtk.database import mysql
>>> with mysql(user='root', password='pass', database='myapp') as db:
...     cursor = db.cursor()
...     cursor.execute("SELECT * FROM users")

See also

Database.create() for more connection options

dbtk.database.oracle(user=None, password=None, database=None, host=None, port=1521, driver=None, **kwargs)[source]

Create an Oracle database connection.

Supports both DSN and connection string formats. Automatically selects the best available Oracle driver (oracledb or cx_Oracle).

Parameters:
  • user (str | None) – Database username. Not required when using OCI cloud native authentication via extra_auth_params.

  • password (str | None) – Database password

  • database (str) – Service name or SID

  • host (str | None) – Server hostname or IP (required if not using dsn)

  • port (int) – Server port (default: 1521)

  • driver (str) – Specific driver to use (‘oracledb’, ‘cx_Oracle’)

  • **kwargs – Additional driver-specific parameters (dsn, mode, extra_auth_params, etc.)

Returns:

Database connection object with context manager support

Return type:

Database

Example

::
>>> from dbtk.database import oracle
>>> # Using service name
>>> db = oracle(user='scott', password='tiger',
...             host='oracle.example.com', database='ORCL')
>>>
>>> # Using DSN directly
>>> db = oracle(user='scott', password='tiger',
...             dsn='oracle.example.com:1521/ORCL')
>>>
>>> # OCI Resource Principal (no credentials needed)
>>> db = oracle(dsn='mydb.adb.us-phoenix-1.oraclecloud.com:1521/myservice',
...             extra_auth_params={'auth_type': 'ResourcePrincipal'})

See also

Database.create() for more connection options

dbtk.database.postgres(user, password=None, database='postgres', host='localhost', port=5432, driver=None, **kwargs)[source]

Create a PostgreSQL database connection.

Automatically selects the best available PostgreSQL driver (psycopg2, psycopg3, or pgdb). You can specify a specific driver if needed.

Parameters:
  • user (str) – Database username

  • password (str | None) – Database password

  • database (str) – Database name (default: ‘postgres’)

  • host (str) – Server hostname or IP (default: ‘localhost’)

  • port (int) – Server port (default: 5432)

  • driver (str) – Specific driver to use (‘psycopg2’, ‘psycopg’, ‘pgdb’)

  • **kwargs – Additional driver-specific connection parameters

Returns:

Database connection object with context manager support

Return type:

Database

Example

::
>>> from dbtk.database import postgres
>>> with postgres(user='user', password='pass', database='mydb') as db:
...     cursor = db.cursor()
...     cursor.execute("SELECT * FROM users")

See also

Database.create() for more connection options

dbtk.database.register_user_drivers(drivers_config)[source]

Register drivers from config file.

dbtk.database.sqlite(database, **kwargs)[source]

Create a SQLite database connection.

SQLite is a serverless, file-based database. Use ‘:memory:’ for an in-memory database.

Parameters:
  • database (str) – Path to database file or ‘:memory:’ for in-memory database

  • **kwargs – Additional sqlite3.connect() parameters (timeout, isolation_level, etc.)

Returns:

Database connection object with context manager support

Return type:

Database

Example

::
>>> from dbtk.database import sqlite
>>> # File-based database
>>> with sqlite('app.db') as db:
...     cursor = db.cursor()
...     cursor.execute("CREATE TABLE users (id INTEGER, name TEXT)")
>>>
>>> # In-memory database (useful for testing)
>>> db = sqlite(':memory:')

See also

Database.create() for more connection options sqlite3 module documentation for additional parameters

dbtk.database.sqlserver(user, password=None, database=None, host='localhost', port=1433, **kwargs)[source]

Create a Microsoft SQL Server database connection.

Automatically selects the best available SQL Server driver (pyodbc or pymssql).

Parameters:
  • user (str) – Database username

  • password (str | None) – Database password

  • database (str) – Database name

  • host (str) – Server hostname or IP (default: ‘localhost’)

  • port (int) – Server port (default: 1433)

  • **kwargs – Additional driver-specific parameters (driver, encrypt, etc.)

Returns:

Database connection object with context manager support

Return type:

Database

Example

::
>>> from dbtk.database import sqlserver
>>> db = sqlserver(user='sa', password='pass',
...                database='AdventureWorks', host='sqlserver.local')
>>> cursor = db.cursor()

Note

When using pyodbc, you may need to specify the ODBC driver: sqlserver(…, driver=’ODBC Driver 17 for SQL Server’)

See also

Database.create() for more connection options

Cursors

Cursor classes that wrap database cursors and provide different return types. All cursors delegate to the underlying database cursor stored in _cursor.

class dbtk.cursors.Cursor(connection, batch_size=None, debug=False, return_cursor=False, **kwargs)[source]

Bases: object

Cursor that returns query results as Records.

It wraps database-specific cursor objects and provides a consistent interface plus additional functionality like SQL file execution, parameter conversion, and prepared statements. It also maintains a clean reference hierarchy to the connection (cursor.connection) and to the driver (cursor.connection.driver)

Cursor returns Record objects, which provide flexible access via dictionary keys, attributes, or integer indices.

connection

The database connection this cursor belongs to

Type:

Database

paramstyle

Parameter style of the underlying database (‘qmark’, ‘named’, etc.)

Type:

str

placeholder

Placeholder string for bind parameters (e.g., ‘?’, ‘:1’, etc.)

Type:

str

description

Column metadata from the last query (delegated to underlying cursor)

Note

Cursors delegate attribute access to the underlying database-specific cursor, so all native cursor functionality is available.

Example

db = dbtk.connect('prod_ods')
cursor = db.cursor()
cursor.execute("SELECT id, name, email FROM users WHERE status = :status",
              {'status': 'active'})

for row in cursor:
    user_id, name, email = row  # Plain list - index access only
    print(f"{user_id}: {name} ({email})")

See also

Record

Flexible data structure supporting dict, attribute, and index access

WRAPPER_SETTINGS = ('batch_size', 'debug', 'return_cursor', 'fast_executemany')
__init__(connection, batch_size=None, debug=False, return_cursor=False, **kwargs)[source]

Initialize a cursor for database operations.

Parameters:
  • connection (Database) – Database connection object

  • batch_size (int, optional) – How many rows to process at a time when using executemany() or bulk operations in DataSurge

  • debug (bool, default False) – Enable debug output showing queries and bind variables

  • return_cursor (bool, default False) – If True, execute() returns the cursor for method chaining

  • **kwargs – Additional arguments passed to the underlying database cursor

Example

# Typically created via Database.cursor()
cursor = db.cursor()

# With debug enabled
cursor = db.cursor(debug=True)

# With method chaining
cursor = db.cursor(return_cursor=True)
results = cursor.execute("SELECT * FROM users").fetchall()
columns(normalized=False)[source]

Return list of column names.

Parameters:

normalized (bool, default False) – If True, return normalized column names (sanitized for Python attributes). If False, return original column names from database.

Returns:

Column names in order

Return type:

List[str]

Example

cursor.execute("SELECT 'First Name', 'User ID' FROM ...")
cursor.columns()                 # ['First Name', 'User ID']
cursor.columns(normalized=True)  # ['first_name', 'user_id']
execute(query, bind_vars=(), convert_params=False)[source]

Execute a database query.

Pass convert_params=True to have the query rewritten to the cursor’s paramstyle and parameters handled automatically (same as PreparedStatement and execute_file).

Parameters:
  • bind_vars (tuple or dict, default ()) –

    Bind parameters to pass to the database.

    When convert_params is False (the default), passed directly to the underlying cursor and must already be in the format required by the cursor’s paramstyle.

    When convert_params is True, must be a dict (or Record).

  • convert_params (bool, default False) – If True, parameter order will be extracted from the query and the query will be rewritten to match the cursor’s paramstyle. Missing parameters will be defaulted to None and extra parameters will be ignored.

execute_file(filename, bind_vars=None, **kwargs)[source]

Execute SQL query from a file with named parameter substitution.

This is a convenience method for one-off queries. For queries that will be executed multiple times, use prepare_file() instead for better performance.

Parameters:
  • filename (str | Path) – Path to SQL file. Resolved relative to CWD first; if not found there, falls back to the directory of the calling script.

  • bind_vars (dict | None) – Dictionary of named parameters

  • **kwargs – encoding: File encoding (default: utf-8-sig)

Returns:

Cursor if return_cursor=True, else None

Return type:

Any

Example

cursor.execute_file(‘queries/get_user.sql’, {‘user_id’: 123})

executemany(query, bind_vars)[source]

Execute a query against multiple parameter sets.

fetchall()[source]

Fetch all remaining rows.

fetchmany(size=None)[source]

Fetch the next set of rows.

fetchone()[source]

Fetch the next row.

prepare_file(filename, encoding='utf-8-sig')[source]

Prepare a SQL statement from a file for repeated execution.

The SQL file is read once and parameter conversion is performed once. The returned PreparedStatement can be executed multiple times efficiently.

Parameters:
  • filename (str | Path) – Path to SQL file. Resolved relative to CWD first; if not found there, falls back to the directory of the calling script.

  • encoding (str) – File encoding (default: utf-8-sig)

Returns:

PreparedStatement object

Return type:

PreparedStatement

Example

stmt = cursor.prepare_file('queries/insert_user.sql')
for user in users:
    stmt.execute({'user_id': user.id, 'name': user.name})
prepare_params(param_names, bind_vars, paramstyle=None)[source]

Convert named parameters to the format required by the cursor’s paramstyle. Used automatically by PreparedStatement, Table and DataSurge classes.

Parameters:
  • param_names (list of str) – Ordered list of parameter names as they appear in the SQL statement. Missing names default to None with a debug-level log message.

  • bind_vars (dict) – Dictionary of named parameter values keyed by parameter name.

  • paramstyle (str, optional) – Override the cursor’s native paramstyle for this call. Must be one of the values in dbtk.utils.ParamStyle. Ignored if not a recognised style.

Returns:

  • tuple – For positional styles (qmark, numeric, format): values ordered to match param_names.

  • dict – For named styles (named, pyformat): subset of bind_vars containing only the required parameters.

Return type:

Any

Example

::

from dbtk.utils import ParamStyle, process_sql_parameters

# query with :named or %(pyformat)s parameters sql = “SELECT * FROM warriors WHERE nation = :nation AND rank = COALESCE(:rank, rank)”

# query rewritten in cursor’s parameter style and parameter names in order they appear query, params_names = process_sql_parameters(sql, ParamStyle.get_positional_style(cur.paramstyle))

# missing parameter defaults to None, extra parameters are ignored cur.prepare_params(params_names, {‘nation’: ‘Fire Nation’, ‘nick_name’: ‘Sparky’}) (‘Fire Nation’, None)

prepare_query(query)[source]

Prepare a SQL statement from a query string for repeated execution.

The parameter conversion is performed once. The returned PreparedStatement can be executed multiple times efficiently.

Parameters:

query (str)

Returns:

PreparedStatement object

Return type:

PreparedStatement

Example

stmt = cursor.prepare_query('SELECT * FROM users WHERE user_id = :user_id')
for user in users:
    stmt.execute({'user_id': user.id})
selectinto(query, bind_vars=())[source]

Execute query that must return exactly one row.

class dbtk.cursors.PreparedStatement(cursor, query=None, filename=None, encoding='utf-8-sig')[source]

Bases: object

A prepared SQL statement loaded from a file with cached parameter mapping.

The statement is read from file once and SQL parameter conversion is performed once based on the cursor’s paramstyle. The prepared statement can then be executed multiple times efficiently. It retains a reference to the cursor, so it can be used in the same way as a regular cursor (fetchone(), fetchmany(), etc.).

__init__(cursor, query=None, filename=None, encoding='utf-8-sig')[source]

Create a prepared statement from a SQL file.

Parameters:
  • cursor – The cursor that will execute this statement

  • query (str | None) – SQL query string (optional)

  • filename (str | Path | None) – Path to SQL file (relative to CWD)

  • encoding (str | None) – File encoding (default: utf-8-sig)

execute(bind_vars=None)[source]

Execute the prepared statement with the given parameters.

Parameters:

bind_vars (dict | None) – Dictionary of named parameters

Returns:

Cursor if return_cursor=True, else None

Return type:

Any

Record

Record classes for database result sets.

class dbtk.record.FixedWidthRecord(*args, **kwargs)[source]

Bases: Record

A Record subclass optimized for fixed-width data parsing and reconstruction.

Instances represent a single row from a fixed-width file, with values accessible by name, attribute, or index. The class retains the original List[FixedColumn] definitions so that to_line() can reconstruct the exact source line by splicing each formatted value into its correct byte position.

Designed for use with FixedReader or EDIReader, where each record type gets its own dynamic subclass with the appropriate column definitions.

Class attributes (set automatically by set_fields):
_columnsList[FixedColumn]

Full column definitions in definition order, including name, position, type, alignment, pad character, and comment.

_line_lenint

Total line width in characters (max end_pos across all columns).

Example usage:

# In reader factory
RecordClass.set_fields(columns)  # columns is List[FixedColumn]
record = RecordClass(*row_values)
original_line = record.to_line()  # reconstructs formatted string

# Parse a raw fixed-width string directly
record = RecordClass.from_line(raw_line)  # corollary to to_line()
classmethod from_line(line, auto_trim=True)[source]

Parse a fixed-width string and return a new instance of this record type.

The corollary to to_line(): slice each field from its declared position, apply type conversion based on column_type, and construct the record. Mirrors the parsing logic in FixedReader so that records created here behave identically to those produced by the reader.

Parameters:
  • line (str) – A fixed-width string. Should be at least _line_len characters; shorter strings are handled gracefully (missing positions return an empty string).

  • auto_trim (bool) – If True (default), strip leading/trailing whitespace from text fields before storing. Set to False to preserve the raw padded value exactly as it appears in the source line.

Returns:

A new instance of this FixedWidthRecord subclass with values populated from the parsed string.

Raises:

TypeError – If line is not a string.

Return type:

FixedWidthRecord

Example:

MyRecord = fixed_record_factory([
    ('record_type', 1),
    FixedColumn('amount', 2, 11, 'int'),
])
record = MyRecord.from_line('6        42')
assert record.record_type == '6'
assert record.amount == 42

# Round-trip
assert record.to_line() == MyRecord.from_line(record.to_line()).to_line()
pprint(normalized=False, add_comments=False)[source]

Pretty-print the record with aligned columns.

Parameters:
  • normalized (bool) – If True, use normalized field names.

  • add_comments (bool) – If True, append each column’s comment (from the FixedColumn definition) after the value. Columns without a comment are left blank in that position. Has no effect when there are no _columns defined.

classmethod set_fields(fields)[source]

Set field names and column definitions from a list of FixedColumn objects.

Stores the column list as-is on the class (preserving position, type, alignment, pad character, and comment for introspection) and pre-computes _line_len as the rightmost end_pos across all columns.

Parameters:

fields (List[FixedColumn]) – FixedColumn definitions for this record type. Definition order determines value order on instances; to_line() places each value by start_idx so out-of-position-order definitions work correctly.

to_line(truncate_overflow=False)[source]

Reconstruct the original fixed-width line from this record’s values.

Builds a space-filled buffer of _line_len characters and splices each field value into its position using start_idx. Column order in the definition does not matter; gaps between columns remain as spaces. Iterates only the column fields (stops before _row_num or any other appended fields). Missing values are treated as empty strings.

Parameters:

truncate_overflow (bool) – If False (default), raise ValueError when a value exceeds its column width. If True, silently truncate.

Returns:

A string exactly matching the fixed-width format for this record type.

Raises:

ValueError – If truncate_overflow=False and any value exceeds its width.

Example

record.to_line() # -> ‘1234567890ABC 0000012345’

visualize()[source]

Return a diagnostic string showing column boundaries over the record value.

Output format (4 lines):

         1         2    ...
1234567890123456789012345...   ← position ruler
|| |         |         |...   ← '|' at each column start
101 123456789  87654321...    ← to_line() output

Returns a string; call print(record.visualize()) to display. Consistent with FixedReader.visualize() which also returns a string.

class dbtk.record.Record(*args, **kwargs)[source]

Bases: list

Flexible/lightweight that strikes a balance between the memory efficiency of list and the functionality of dicts/objects.

Record extends list to provide a rich interface for accessing query result rows. It supports attribute access, dictionary-style key access, integer indexing, and slicing - all on the same object. This makes it a very flexible and memory efficient return type for both cursors and readers.

Access Patterns

  • Dictionary-style: row['column_name'] - Safe with .get() method

  • Attribute access: row.column_name - Clean, readable syntax

  • Integer index: row[3] - Positional access

  • Slicing: row[1:4] - Get multiple columns at once

  • Iteration: for value in row - Iterate over values

  • Containment: 'column_name' in row - Check if column exists

Key Methods

  • get(key, default=None) - Safe dictionary-style access with default

  • keys() - Get list of column names

  • values() - Get list of column values

  • items() - Get (column, value) pairs

  • copy() - Create a shallow copy of the record

  • update(dict) - Update multiple columns from a dictionary

  • coalesce(dict) - Update only missing values from a dictionary

  • pprint() - Pretty-print the record

Column Names: Original vs Normalized

Every Record stores two parallel lists of names for each column:

  • _fields — the original names exactly as returned by the database (e.g. 'First Name', 'User ID', '#term_code').

  • _fields_normalizedPython-safe versions used for attribute access (e.g. 'first_name', 'user_id', 'term_code').

Both lists are set once by set_fields() when the cursor executes its first query. Normalization converts field to be suitable for attribute access. It lowercases, replaces non-alphanumeric characters with underscores, collapses runs, strips trailing underscores, and prefixes digit-leading names with n.

Which to use:

  • Use original names (row['First Name'], row.keys(), row.to_dict()) when round-tripping data back to the database or to a CSV, where column names must match the schema exactly.

  • Use normalized names (row.first_name, row['first_name'], row.keys(normalized=True), row.to_dict(normalized=True)) in application code where Pythonic attribute access is preferred and when case and white-space insensitive matching is beneficial.

Both forms work interchangeably for item get/set and in checks, so row['First Name'] and row['first_name'] return the same value.

Note

Record is dynamically subclassed when a cursor executes a query. Each unique set of column names gets its own Record subclass with those names set as class attributes. This enables attribute access while maintaining the list base class for compatibility.

Example

cursor = db.cursor()
cursor.execute("SELECT id, name, email, created FROM users WHERE id = :id",
              {'id': 42})
user = cursor.fetchone()

# All these access patterns work on the same object:
print(user['name'])            # Dictionary-style: 'Aang'
print(user.name)               # Attribute access: 'Aang'
print(user[1])                 # Index access: 'Aang'
print(user[1:3])               # Slicing: ['Aang', 'aang@avatar.com']

# Safe access with default
print(user.get('phone', 'N/A'))  # 'N/A' if no phone column

# Dictionary methods
for col, val in user.items():
    print(f"{col}: {val}")

# List compatibility
user_id, name, email, created = user  # Unpack like a tuple
print(' | '.join(user))                # Join like a list

# Update columns
user['email'] = 'newemail@avatar.com'
user.name = 'Avatar Aang'

See also

Cursor

Database cursor that returns Record objects

__init__(*args, **kwargs)[source]
coalesce(other=None, **kwargs)[source]

Fill in missing or empty fields from another dict, Record, or keyword arguments.

Updates the current Record by copying values from other (or **kwargs) only for fields that are currently None or an empty string (‘’). Existing non-empty values are preserved.

This is a non-destructive “fill gaps” operation — it will never overwrite valid data.

Parameters:
  • other – Optional dict or Record containing values to coalesce. If provided, its items are processed first.

  • **kwargs – Additional key-value pairs to coalesce (overrides keys in other if both provide a value).

Returns:

The updated Record (for chaining).

Return type:

self

Examples

>>> Record.set_fields(['id', 'name', 'email', 'phone', 'notes'])
>>> record = Record(None, "Scott", "", "scott@example.com", None)
>>> resolved = {'id': 123, 'name': 'Scott Bailey', 'notes': 'VIP'}
>>> record.coalesce(resolved, phone="555-1234")
>>> record  # [123, "Scott", "", "555-1234", "VIP"]
copy()[source]

Return a shallow copy of the Record.

  • Copies the underlying list values

  • Copies the field metadata (_fields, _fields_normalized)

  • Copies deleted fields set

  • Copies any runtime-added fields (_added dict)

  • Preserves the same Record subclass (so attribute access works)

Returns:

A new Record instance with the same data and state

Return type:

Record

get(key, default=None)[source]
insert(index, value)[source]
items(normalized=False)[source]

Get (field_name, value) pairs.

Parameters:

normalized (bool) – If True, use normalized field names. If False (default), use original field names.

Yields:

Tuples of (field_name, value)

keys(normalized=False)[source]

Get list of field names.

Parameters:

normalized (bool) – If True, return normalized field names. If False (default), return original field names.

Returns:

List of field names

Return type:

List[str]

pop(key, default=<object object>)[source]
pprint(normalized=False)[source]

Pretty-print the record with aligned columns.

Parameters:

normalized (bool) – If True, use normalized field names. If False (default), use original field names.

reverse()[source]
classmethod set_fields(fields)[source]

Set the field names for this Record class.

Stores original field names and generates normalized versions for attribute access. Handles collisions by appending _2, _3, etc. to duplicate normalized names.

Parameters:

fields (List[str]) – Original field names (e.g., [‘Start Year’, ‘End Date’])

Examples

>>> rec.set_fields(['Start Year', 'End Date'])
>>> rec._fields
['Start Year', 'End Date']
>>> rec._fields_normalized
['start_year', 'end_date']
sort(*args, **kwargs)[source]
to_dict(normalized=False)[source]

Convert Record to dictionary.

Parameters:

normalized (bool) – If True, use normalized field names as keys. If False (default), use original field names.

Returns:

Dictionary representation of the record

Return type:

dict

Examples

>>> rec = Record(2020, 2025)
>>> rec.set_fields(['Start Year', 'End Year'])
>>> rec.to_dict()
{'Start Year': 2020, 'End Year': 2025}
>>> rec.to_dict(normalized=True)
{'start_year': 2020, 'end_year': 2025}
update(other=None, **kwargs)[source]

Update fields from another dict, Record, or keyword arguments.

Overwrites existing field values unconditionally. To preserve existing non-empty values, use coalesce() instead.

Accepts any mapping with an items() method, an iterable of (key, value) pairs, or keyword arguments. Unknown keys are added as runtime fields.

Parameters:
  • other – Optional dict, Record, or iterable of (key, value) pairs.

  • **kwargs – Additional key-value pairs to set.

Examples

>>> Record.set_fields(['id', 'name', 'email'])
>>> record = Record(1, 'Scott', 'old@example.com')
>>> record.update({'email': 'new@example.com'}, name='Scott Bailey')
>>> record  # [1, 'Scott Bailey', 'new@example.com']
values()[source]

Get list of field values (in original field order).

dbtk.record.fixed_record_factory(columns, name='FixedRecord')[source]

Class factory: build a FixedWidthRecord subclass from a compact column spec.

Follows the same convention as collections.namedtuple() — returns a class, not an instance.

Parameters:
  • columns (list of FixedColumn or (name, width) tuple) – May be mixed. Tuples specify (name, width) and are assigned sequential positions starting at 1 (or immediately after the previous column). FixedColumn objects are used as-is and advance the auto-position cursor to col.end_pos + 1.

  • name (str, optional) – Class name of the returned type. Defaults to 'FixedRecord'.

Returns:

A FixedWidthRecord subclass with set_fields() already called.

Return type:

type

Examples

# Tuple-only: positions calculated automatically
AchDetail = fixed_record_factory([
    ('record_type',    1),
    ('priority_code',  2),
    ('routing_number', 9),
    ('account_number', 17),
    ('amount',         10),
], name='AchDetail')

line = AchDetail('6', '22', '123456789', '00012345678', 100)
print(line.to_line())

# Mixed: use FixedColumn where you need explicit control
AchHeader = fixed_record_factory([
    FixedColumn('record_type', 1, 1),
    ('priority_code', 2),
    FixedColumn('routing_number', 4, 12, column_type='int', align='right'),
    ('filler', 39),
])

Readers

Base classes and utilities for file readers.

Defines the abstract Reader interface and Clean enumeration for header normalization across all reader implementations.

class dbtk.readers.base.Reader(add_row_num=True, skip_rows=0, n_rows=None, headers=None, null_values=None)[source]

Bases: ABC

Abstract base class for all file readers in DBTK.

Provides unified interface and common functionality for reading various file formats (CSV, Excel, JSON, XML, fixed-width). All readers support the same features regardless of file format: header cleaning, record skipping, row number tracking, and flexible return types.

Readers are designed to work as context managers and iterators, making them ideal for memory-efficient processing of large files. They automatically handle resource cleanup and support both Record objects (with multiple access patterns) and plain dictionaries as return types.

Common Features

  • Row number tracking - Automatic _row_num field for debugging

  • Record skipping - Skip header rows or bad data

  • Record limiting - Process only first N records

  • Record filtering - Filter records with custom functions (new!)

  • Flexible return types - Record objects or dictionaries

  • Context manager - Automatic resource cleanup

  • Iterator protocol - Memory-efficient streaming

  • Null value conversion - Convert specified values to None

param add_row_num:

Add a ‘_row_num’ field to each record containing the 1-based row number

type add_row_num:

bool, default True

param skip_rows:

Number of data rows to skip after headers (useful for skipping footer rows or known bad data at start of file)

type skip_rows:

int, default 0

param n_rows:

Maximum number of rows to read. None (default) reads all rows.

type n_rows:

int, optional

param null_values:

Values to convert to None. Can be a single string or a collection of strings. Common examples: ‘N’, ‘NULL’, ‘NA’, ‘’ (empty string)

type null_values:

str, list, tuple, or set, optional

Example

# Subclasses implement specific file formats
from dbtk import readers

# CSV with default settings - returns Record objects
with readers.CSVReader(open('data.csv')) as reader:
    for record in reader:
        print(record.name, record.email)  # attribute access
        print(record['name'], record['email'])  # or dict-style

# Skip first 5 rows, read only 100 records
with readers.CSVReader(open('data.csv'),
                      skip_rows=5,
                      n_rows=100) as reader:
    for row in reader:
        print(row.name)

# Access fields with original or normalized names
with readers.CSVReader(open('messy.csv')) as reader:
    # Headers like "ID #", "Student Name" preserved as originals
    # but also accessible as normalized: id_hash, student_name
    for record in reader:
        print(record['ID #'], record['Student Name'])  # original
        print(record.id, record.student_name)  # normalized

# Filter records with custom function
with readers.CSVReader(open('data.csv')) as reader:
    reader.add_filter(lambda r: int(r.age) >= 18)
    reader.add_filter(lambda r: r.country == 'US')
    for record in reader:
        print(record.name)  # Only US adults

See also

CSVReader

Read CSV files

JSONReader

Read JSON files

ExcelReader

Read Excel .xlsx files

XMLReader

Read XML files

FixedReader

Read fixed-width text files

Record

Flexible row objects with multiple access patterns

Notes

This is an abstract base class. Use one of the concrete implementations (CSVReader, JSONReader, etc.) for actual file reading.

Subclasses must implement:

  • _read_headers() - Return list of raw column names from file

  • _generate_rows() - Yield raw data rows as lists

Optionally override:

  • _cleanup() - Release resources (file handles, etc.)

BIG_BYTE_THRESHOLD = 5242880
BIG_ROW_THRESHOLD = 10000
__init__(add_row_num=True, skip_rows=0, n_rows=None, headers=None, null_values=None)[source]

Initialize the reader with common options.

Parameters:
  • add_row_num (bool, default True) – Add a ‘_row_num’ field to each record containing the 1-based row number

  • skip_rows (int, default 0) – Number of data rows to skip after headers

  • n_rows (int, optional) – Maximum number of rows to read, or None for all rows

  • headers (Optional list of header names to use instead of reading from row 0)

  • null_values (str, list, tuple, or set, optional) – Values to convert to None. Can be a single string or collection of strings. Common examples: ‘N’ (IMDB), ‘NULL’, ‘NA’, ‘’ (empty string)

Example

# In subclass implementation
class MyReader(Reader):
    def __init__(self, file_path, **kwargs):
        super().__init__(**kwargs)
        self.file = open(file_path)

    def _read_headers(self):
        return ['id', 'name', 'email']

    def _generate_rows(self):
        for line in self.file:
            yield line.strip().split(',')
add_filter(func)[source]

Add a filter function to the filtering pipeline.

Filter functions are applied after skip_rows and null_values conversion, but before the n_rows limit. Multiple calls to add_filter() accumulate in a pipeline - all filters must return True for a record to be included.

The filter operates on the final Record object, after null value conversion has been applied. This allows you to filter on clean data rather than raw values.

Parameters:

func (callable) – A function that takes a record and returns True to keep it, False to filter it out. The function should accept a single argument (the record).

Returns:

Returns self to allow method chaining

Return type:

Reader

Example

from dbtk import readers

# Single filter
with readers.CSVReader(open('users.csv')) as reader:
    reader.add_filter(lambda r: r.age >= 18)
    for record in reader:
        print(record.name)  # Only adults

# Multiple filters (all must pass)
with readers.CSVReader(open('users.csv')) as reader:
    reader.add_filter(lambda r: r.age >= 18)
    reader.add_filter(lambda r: r.country == 'US')
    reader.add_filter(lambda r: r.active == 'true')
    for record in reader:
        print(record.name)  # US adults who are active

# Complex filter function
def valid_email(record):
    return '@' in record.email and '.' in record.email

with readers.CSVReader(open('users.csv')) as reader:
    reader.add_filter(valid_email)
    for record in reader:
        print(record.email)

# Get exactly n_rows after filtering
with readers.CSVReader(open('users.csv')) as reader:
    reader.add_filter(lambda r: r.age >= 18)
    reader.n_rows = 100  # Get 100 records that pass the filter
    data = list(reader)
    print(len(data))  # Will be 100 (or less if fewer than 100 match)

Notes

  • Filters are lazy - they’re applied during iteration, not when add_filter() is called

  • Execution order: read → skip_rows → null_values → filter pipeline → n_rows

  • If both skip_rows and add_filter() are used, a warning is logged (skip_rows applies first)

  • The n_rows limit applies after filtering, so n_rows=100 returns 100 filtered records

  • Record._row_num field reflects the count of returned (filtered) records, not raw file rows

property fieldnames: List[str]

Alias for headers to maintain compatibility with csv.DictReader.

Returns:

List of cleaned header names

property headers: List[str]

Get the column headers.

Returns:

List of cleaned header names

property row_count: int

Returns the number of rows.

This property provides access to the total number of rows, which is stored in the private attribute _row_num.

Returns:

The total number of rows.

Return type:

int

property source: str

Get the filename of the source file.

For the ExcelReader and XLSReader, the source must be set manually because the Workbook objects do not keep a reference to the original file.

CSV file reader with flexible delimiter and quoting support.

class dbtk.readers.csv.CSVReader(fp, dialect=<class 'csv.excel'>, headers=None, add_row_num=True, skip_rows=0, n_rows=None, null_values=None, **kwargs)[source]

Bases: Reader

Read CSV (Comma-Separated Values) files with flexible formatting options.

CSVReader provides a simple, consistent interface for reading CSV files with support for various delimiters, quoting styles, and header handling. It handles messy real-world CSV files by providing automatic header cleaning, custom dialects, and the ability to override headers entirely.

The reader returns Record objects by default (supporting attribute, key, and index access) or plain dictionaries if preferred. It automatically handles tab-delimited files when delimiter=’t’ is specified.

Parameters:
  • fp (file-like object) – Open file pointer to CSV file (from open() or similar). For encoding detection, use get_reader('file.csv', encoding='detect') instead of opening the file directly.

  • dialect (csv.Dialect, default csv.excel) –

    CSV dialect defining formatting rules. Common options:

    • csv.excel - Standard CSV format (comma delimiter, quoted strings)

    • csv.excel_tab - Tab-delimited format

    • csv.unix_dialect - Unix-style CSV (LF line endings)

  • headers (List[str], optional) – Custom header names to use instead of reading from first row. Useful when CSV has no header row or you want to rename columns.

  • add_rownum (bool, default True) – Add ‘_row_num’ field to each record with 1-based row number

  • skip_records (int, default 0) – Number of data rows to skip after headers

  • max_records (int, optional) – Maximum records to read, None for all

  • **kwargs – Additional arguments passed to csv.reader() like delimiter, quotechar, etc.

Example

from dbtk import readers

# Basic CSV reading
with readers.CSVReader(open('users.csv')) as reader:
    for user in reader:
        print(f"{user.name}: {user.email}")

# Tab-delimited file
with readers.CSVReader(open('data.tsv'), delimiter='\t') as reader:
    for record in reader:
        process(record)

# Custom delimiter and quoting
with readers.CSVReader(open('data.txt'),
                      delimiter='|',
                      quotechar='"',
                      quoting=csv.QUOTE_MINIMAL) as reader:
    for record in reader:
        print(record)

# Provide custom headers (file has no header row)
headers = ['id', 'name', 'email', 'created']
with readers.CSVReader(open('data.csv'), headers=headers) as reader:
    for record in reader:
        print(record.id, record.name)

# Skip first 10 data rows, read only 100 rows
with readers.CSVReader(open('large.csv'),
                      skip_rows=10,
                      n_rows=100) as reader:
    data = list(reader)

See also

Reader

Base class with common reader features

readers.get_reader

Automatic reader selection based on file extension

Notes

  • Automatically converts ‘t’ delimiter to excel_tab dialect

  • Headers are read from first row unless custom headers provided

  • File pointer is automatically closed when used as context manager

__init__(fp, dialect=<class 'csv.excel'>, headers=None, add_row_num=True, skip_rows=0, n_rows=None, null_values=None, **kwargs)[source]

Initialize CSV reader for a file.

Parameters:
  • fp (file-like object) – Open file pointer to CSV file

  • dialect (csv.Dialect, default csv.excel) – CSV dialect (excel, excel_tab, unix_dialect, etc.)

  • headers (List[str], optional) – Custom headers to use instead of reading from file

  • add_row_num (bool, default True) – Add _row_num field to records

  • skip_rows (int, default 0) – Data rows to skip after headers

  • n_rows (int, optional) – Maximum rows to read

  • null_values (str, list, tuple, or set, optional) – Values to convert to None (e.g., ‘N’ for IMDB files)

  • **kwargs – Additional csv.reader() arguments (delimiter, quotechar, etc.)

Excel workbook reader supporting XLS and XLSX formats.

class dbtk.readers.excel.ExcelReader(worksheet, headers=None, add_row_num=True, skip_rows=0, n_rows=None, null_values=None)[source]

Bases: Reader

Class to iterate over an Excel Spreadsheet using openpyxl.

__init__(worksheet, headers=None, add_row_num=True, skip_rows=0, n_rows=None, null_values=None)[source]

Initialize ExcelReader for reading Excel .xlsx files.

Parameters:
  • worksheet – openpyxl.Worksheet object to read from.

  • headers (List[str] | None) – Optional list of header names to use instead of reading from row 1.

  • add_row_num (bool) – If True, adds a _row_num field to each record (default: True).

  • skip_rows (int) – Number of data rows to skip after headers (default: 0).

  • n_rows (int | None) – Maximum number of rows to read, or None for all (default: None).

  • null_values – Values to convert to None (e.g., ‘N’, ‘NULL’, ‘NA’).

Raises:

TypeError – If worksheet is not an openpyxl.Worksheet.

class dbtk.readers.excel.XLSReader(worksheet, headers=None, add_row_num=True, skip_rows=0, n_rows=None, null_values=None)[source]

Bases: Reader

Class to iterate over an Excel Spreadsheet using xlrd.

__init__(worksheet, headers=None, add_row_num=True, skip_rows=0, n_rows=None, null_values=None)[source]

Initialize XLReader for reading Excel .xls files.

Parameters:
  • worksheet – xlrd.Sheet object to read from.

  • headers (List[str] | None) – Optional list of header names to use instead of reading from row 0.

  • add_row_num (bool) – If True, adds a _row_num field to each record (default: True).

  • skip_rows (int) – Number of data rows to skip after headers (default: 0).

  • n_rows (int | None) – Maximum number of rows to read, or None for all (default: None).

  • null_values – Values to convert to None (e.g., ‘N’, ‘NULL’, ‘NA’).

Raises:

TypeError – If worksheet is not an xlrd.Sheet.

dbtk.readers.excel.check_dependencies()[source]

Check for optional dependencies and issue warnings if missing.

dbtk.readers.excel.get_sheet_by_index(wb, index)[source]

Get a worksheet from a workbook by index.

Parameters:
  • wb – Workbook object (openpyxl.Workbook or xlrd.Book).

  • index (int) – Index of the sheet to retrieve (0-based).

Returns:

Worksheet object (openpyxl.Worksheet or xlrd.Sheet).

Raises:

TypeError – If workbook type is not supported.

dbtk.readers.excel.get_sheet_by_name(wb, sheet_name)[source]

Get a worksheet from a workbook by name.

dbtk.readers.excel.open_workbook(filename)[source]

Open an Excel workbook using openpyxl for .xlsx or xlrd for .xls.

Parameters:

filename (str | Path) – Path to the Excel file (.xlsx or .xls).

Returns:

Workbook object (openpyxl.Workbook or xlrd.Book).

Raises:

ImportError – If neither openpyxl nor xlrd is available.

Fixed-width text file reader with column position specifications.

class dbtk.readers.fixed_width.EDIReader(fp, columns, type_name_map=None, strict=False, **kwargs)[source]

Bases: FixedReader

Reader for fixed-width files containing multiple record types (EDI-like formats).

Parses files where each line’s layout is determined by a type identifier prefix (e.g., NACHA ACH files with ‘1’, ‘5’, ‘6’, ‘7’, ‘8’, ‘9’ record types). Each record type uses its own set of FixedColumn definitions, allowing different column positions and formats per type.

Record type codes must all be the same length (automatically detected from keys). The reader dispatches parsing based on the prefix of each line and returns typed Record instances (one dynamic subclass per record type).

Supports common legacy formats such as NACHA ACH, COBOL copybooks, and other multi-layout fixed-width EDI-style files. The column specifications for several common EDI-like files, including ACH, are defined in dbtk.formats.edi

Parameters:
  • fp (TextIO) – Open file pointer in text mode

  • columns (Dict[str, List[FixedColumn]]) – Mapping of record type codes (keys) to their column definitions. All keys must be strings of identical length.

  • type_name_map (Dict[str, str], optional) – Optional friendly names for record types (e.g., {‘1’: ‘File Header’}) used in logging or output fields.

  • strict (bool, default False) – If True raise error if record type code not mapped in columns, else skipped and logged

  • auto_trim (bool, default True) – Trim whitespace from field values

  • **kwargs – Additional arguments passed to FixedReader base class

Raises:

ValueError – If record type keys have inconsistent lengths or columns dict is invalid

Example

>>> columns = {
...     '1': [FixedColumn('record_type', 1, 1), FixedColumn('priority_code', 2, 3), ...],
...     '5': [FixedColumn('record_type', 1, 1), FixedColumn('service_class_code', 2, 4), ...],
...     # ... other types ...
... }
>>> reader = EDIReader(open('ach_file.ach'), columns=columns)
>>> for record in reader:
...     print(record.company_name)  # fields available depend on record type
__init__(fp, columns, type_name_map=None, strict=False, **kwargs)[source]
visualize()[source]

Visualize column boundaries for each record type found in the file.

Scans the entire file, emitting one block per record type the first time that type is encountered. Each block shows the rulers, column boundary markers, the raw source line, and the interpreted line. Blocks are separated by blank lines. The file pointer is saved and restored.

Returns:

String with one visualization block per record type.

Return type:

str

class dbtk.readers.fixed_width.FixedReader(fp, columns, auto_trim=True, add_row_num=False, skip_rows=0, n_rows=None, null_values=None)[source]

Bases: Reader

Reader for fixed width files

__init__(fp, columns, auto_trim=True, add_row_num=False, skip_rows=0, n_rows=None, null_values=None)[source]

Initializes the instance with the provided file pointer, column definitions, and processing options.

fp

The file pointer from which data is read. For encoding detection, use get_reader('file.txt', encoding='detect') instead of opening the file directly.

Type:

TextIO

columns

A list of FixedColumn objects defining the structure of columns in the data.

Type:

List[FixedColumn]

auto_trim

Determines whether to automatically trim whitespace from field values. Default is True.

Type:

bool

add_row_num

Determines whether to add a row number attribute

Type:

bool

skip_rows

The number of rows to skip before reading data.

Type:

int

n_rows

The maximum number of rows to read.

Type:

Optional[int]

null_values

Values to convert to None (e.g., ‘N’, ‘NULL’, ‘NA’).

visualize(sample_lines=2)[source]

Visualize column boundaries over sample data from the file.

Seeks to the beginning of the file, reads up to sample_lines records, then restores the file pointer. Output shows the rulers and column boundary markers once, then for each record both the raw source line and the interpreted line reconstructed via record.to_line().

Parameters:

sample_lines (int) – Number of records to include in the preview.

Returns:

String representation of column layout with sample data.

Return type:

str

JSON and NDJSON (newline-delimited JSON) file readers.

class dbtk.readers.json.JSONReader(fp, flatten=True, add_row_num=True, skip_rows=0, n_rows=None, null_values=None, **kwargs)[source]

Bases: Reader

Read JSON files containing arrays of objects.

JSONReader parses JSON files that contain an array of objects (standard JSON format for tabular data). It can optionally flatten nested objects using dot notation, making it easy to work with hierarchical JSON data in a flat, record-based format.

The reader automatically discovers the schema by scanning all objects in the array, ensuring all possible keys are available even if some objects don’t have all fields.

Parameters:
  • fp (file-like object) – Open file pointer to JSON file

  • flatten (bool, default True) – Flatten nested objects with dot notation. For example, {"user": {"name": "Bob"}} becomes {"user.name": "Bob"}. Arrays are preserved as-is.

  • add_rownum (bool, default True) – Add _row_num field to each record

  • skip_records (int, default 0) – Number of records to skip

  • max_records (int, optional) – Maximum records to read

  • **kwargs – Reserved for future use

Example

from dbtk import readers

# Simple JSON array
# [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
with readers.JSONReader(open('users.json')) as reader:
    for user in reader:
        print(user.id, user.name)

# Nested JSON with flattening
# [{"id": 1, "user": {"name": "Alice", "email": "a@example.com"}}]
with readers.JSONReader(open('nested.json'), flatten=True) as reader:
    for record in reader:
        print(record.id, record['user.name'], record['user.email'])

# Disable flattening to keep nested structure
with readers.JSONReader(open('nested.json'), flatten=False) as reader:
    for record in reader:
        print(record.user)  # {'name': 'Alice', 'email': 'a@example.com'}

See also

NDJSONReader

Read newline-delimited JSON files

Reader

Base reader class

writers.to_json

Write JSON files

Notes

  • JSON file must contain an array at the root level

  • All objects in array are scanned to discover complete schema

  • Empty arrays raise ValueError

  • Nested objects are flattened with dot notation by default

  • Arrays within objects are never flattened

__init__(fp, flatten=True, add_row_num=True, skip_rows=0, n_rows=None, null_values=None, **kwargs)[source]
property record_count: int
class dbtk.readers.json.NDJSONReader(fp, add_row_num=True, skip_rows=0, n_rows=None, null_values=None)[source]

Bases: Reader

Newline-delimited JSON file reader that returns Record objects or dicts.

__init__(fp, add_row_num=True, skip_rows=0, n_rows=None, null_values=None)[source]

Initialize NDJSON reader.

Parameters:
  • fp (TextIO) – File pointer to NDJSON file (one JSON object per line)

  • add_row_num (bool) – Add _row_num to each record

  • skip_rows (int) – Number of rows to skip from the beginning

  • n_rows (int | None) – Maximum number of rows to read (None = unlimited)

  • null_values – Values to convert to None (e.g., ‘N’, ‘NULL’, ‘NA’)

XML file reader with XPath support for element extraction.

class dbtk.readers.xml.XMLColumn(name, xpath=None, data_type='text')[source]

Bases: object

Column definition for XML extraction.

__init__(name, xpath=None, data_type='text')[source]
Parameters:
  • name (str) – Column name for the Record

  • xpath (str | None) – XPath expression (if None, uses simple element matching)

  • data_type (str) – Data type hint (not enforced, just documentation)

class dbtk.readers.xml.XMLReader(fp, record_xpath='//record', columns=None, sample_size=10, add_row_num=True, skip_rows=0, n_rows=None, null_values=None)[source]

Bases: Reader

XML file reader that returns Record objects.

__init__(fp, record_xpath='//record', columns=None, sample_size=10, add_row_num=True, skip_rows=0, n_rows=None, null_values=None)[source]

Initialize XML reader.

Parameters:
  • fp (TextIO) – File pointer to XML file

  • record_xpath (str) – XPath expression to find record elements

  • columns (List[XMLColumn] | None) – List of XMLColumn definitions for custom extraction

  • sample_size (int) – Number of records to sample for column discovery

  • add_row_num (bool) – Add _row_num to each record

  • skip_rows (int) – Number of data rows to skip after headers

  • n_rows (int | None) – Maximum number of rows to read, or None for all

  • null_values – Values to convert to None (e.g., ‘N’, ‘NULL’, ‘NA’)

property columns: List[XMLColumn]

Return all column definitions (custom + auto-discovered).

property record_count: int

Return total number of records found.

dbtk.readers.xml.open_xml(filename, **kwargs)[source]

Open XML file for reading.

Parameters:
  • filename (str | Path) – Path to XML file

  • **kwargs – Arguments passed to XMLReader

Returns:

XMLReader instance

Return type:

XMLReader

Example

::
with open_xml(‘data.xml’, record_xpath=’//user’) as reader:
for record in reader:

print(record.name)

class dbtk.readers.data_frame.DataFrameReader(df, add_row_num=True, skip_rows=0, n_rows=None, null_values=None)[source]

Bases: Reader

Read directly from pandas or polars DataFrames — zero intermediate files.

This reader accepts a pre-loaded DataFrame (from pandas or polars) and streams rows as Record objects. It supports all standard Reader features (add_rownum, skip_records, max_records) while providing accurate progress tracking based on known row count.

No pandas or polars are imported in this module — the user has already imported one and passed a DataFrame.

Parameters:
  • df (DataFrame) – pandas.DataFrame or polars.DataFrame containing the data

  • add_rownum (bool, default True) – Add ‘_row_num’ field with 1-based row number

  • skip_records (int, default 0) – Number of rows to skip from the beginning

  • max_records (int, optional) – Maximum number of records to yield

Examples

>>> import pandas as pd
>>> df = pd.read_parquet("data.parquet")
>>> with DataFrameReader(df) as reader:
>>>     for row in reader:
>>>         print(row.id)
>>> import polars as pl
>>> df = pl.read_parquet("data.parquet")
>>> with DataFrameReader(df, add_rownum=False) as reader:
>>>     BulkSurge(table).load(reader)
__init__(df, add_row_num=True, skip_rows=0, n_rows=None, null_values=None)[source]
property total: int | None

Total rows for progress bar — known for DataFrames.

Writers

Base classes for data writers with common file handling and data extraction patterns.

class dbtk.writers.base.BaseWriter(data, file=None, columns=None, encoding='utf-8', write_headers=True, compression='infer', **fmt_kwargs)[source]

Bases: ABC

Abstract base class for all data writers in DBTK.

Provides common functionality for writing data to various formats (CSV, JSON, Excel, XML, etc.). Writers accept data from multiple sources - cursors, lists of Records, lists of dicts, or lists of lists - and handle the conversion to the target format automatically.

All writers share common features like automatic column detection, stdout support for quick previews, configurable encodings, and optional type preservation. Writers are designed to work seamlessly with DBTK cursors and readers.

Common Features

  • Multiple data sources - Cursors, Records, dicts, lists

  • Automatic column detection - From cursors, Record objects, or dict keys

  • Stdout preview - Write to console with automatic row limiting

  • Configurable encoding - UTF-8, Latin-1, etc.

  • Type preservation - Optionally keep native types vs converting to strings

  • Consistent API - Same interface regardless of output format

param data:

Data to write. Accepts:

  • Cursor objects (from database queries)

  • List of Record objects (from readers)

  • List of dictionaries

  • List of lists/tuples (requires columns parameter)

type data:

Iterable[RecordLike]

param filename:

Output filename or file handle. If None, writes to stdout (limited to 20 rows for preview).

type filename:

str, Path, TextIO, or BinaryIO, optional

param columns:

Column names for list-of-lists data. Ignored for other data types which have columns embedded.

type columns:

List[str], optional

param encoding:

File encoding for text-based formats

type encoding:

str, default ‘utf-8’

param write_headers:

If True, include header row in formats that support it.

type write_headers:

bool, default True

param **fmt_kwargs:

Additional format-specific arguments (passed to subclasses)

columns

Column names detected from data or provided explicitly

Type:

List[str]

data_iterator

Iterator over data records

Type:

Iterator

Examples

Subclasses implement specific formats:

from dbtk import writers

# Write cursor results to CSV
cursor.execute("SELECT * FROM users")
writers.to_csv(cursor, 'users.csv')

# Write list of records to JSON
with readers.CSVReader(open('input.csv')) as reader:
    records = list(reader)
writers.to_json(records, 'output.json')

# Preview to stdout (shows first 20 rows)
cursor.execute("SELECT * FROM large_table")
writers.to_csv(cursor, None)  # Prints to console

See also

to_csv

Write CSV files

to_json

Write JSON files

to_excel

Write Excel files

to_xml

Write XML files

cursor_to_cursor

Database-to-database transfer

Notes

This is an abstract base class. Use one of the concrete implementations (CSVWriter, JSONWriter, etc.) or the convenience functions (to_csv, to_json, etc.).

Subclasses must implement:

  • _write_data() - Perform the actual write operation

When filename is None (stdout mode), output is automatically limited to 20 rows to prevent accidentally printing huge result sets to the console.

__init__(data, file=None, columns=None, encoding='utf-8', write_headers=True, compression='infer', **fmt_kwargs)[source]

Initialize the writer with data and options.

Parameters:
  • data (Iterable[RecordLike]) – Data source (cursor, list of records, etc.)

  • file (str, Path, TextIO, or BinaryIO, optional) – Output file. None writes to stdout.

  • columns (List[str], optional) – Column names for list-of-lists

  • encoding (str, default 'utf-8') – File encoding

  • write_headers (bool, default True) – Include header row in output

  • **fmt_kwargs – Format-specific arguments

accepts_file_handle = True
close()[source]

Close the output file if it was opened by this writer.

Safe to call multiple times (idempotent). Automatically called when using the writer as a context manager.

preserve_types = False
property row_count: int

Number of rows written (updated during write operation)

to_string(obj)[source]

Convert a database value to string representation.

Parameters:

obj (Any) – Value to convert

Returns:

String representation

Return type:

str

write()[source]

Write all data in one operation.

Returns:

Number of rows written.

Return type:

int

Raises:

ValueError – If no data is available to write.

class dbtk.writers.base.BatchWriter(data=None, file=None, columns=None, headers=None, encoding='utf-8', write_headers=True, compression='infer', **fmt_kwargs)[source]

Bases: BaseWriter

Base class for writers that support incremental, batch-based output.

Unlike traditional writers that require all data up-front, BatchWriter subclasses are designed for streaming and bulk ETL workloads where data arrives in chunks (e.g., from BulkSurge, large queries, or infinite streams).

Key Features

  • Lazy initialization - Columns and iterator are resolved on first write

  • Reusable file handle - Write multiple batches without reopening

  • Header control - First batch includes headers, subsequent batches omit

  • Zero-copy compatible - Works with Record objects and generators

  • Dual-mode operation - Use as traditional writer or streaming writer

Usage Patterns

Pattern 1: Traditional (single-shot)
>>> writer = CSVWriter(data=all_records, file='output.csv')
>>> writer.write()
Pattern 2: Pure streaming
>>> with CSVWriter(data=None, file='output.csv') as writer:
...     for batch in surge.batched(records):
...         writer.write_batch(batch)
Pattern 3: Hybrid
>>> writer = CSVWriter(data=first_batch, file='output.csv')
>>> writer.write()  # Process initial batch
>>> writer.write_batch(second_batch)  # Continue streaming
>>> writer.write_batch(third_batch)
param data:

Initial data. If None, setup is deferred until first write_batch(). This enables streaming use cases where data arrives in batches.

type data:

Iterable[RecordLike], optional

param file:

Output destination. For streaming, pass an open file handle.

type file:

str, Path, TextIO, or BinaryIO, optional

param columns:

Explicit column names. If not provided, inferred from first batch.

type columns:

List[str], optional

param encoding:

File encoding for text-based formats

type encoding:

str, default ‘utf-8’

param write_headers:

Whether to write column headers on the first batch.

type write_headers:

bool, default True

param **fmt_kwargs:

Format-specific options passed to _write_data().

Notes

Subclasses must implement _write_data() but inherit write_batch() for free.

Used by:
  • BulkSurge.dump() and .load(fallback_path=…)

  • Any high-performance streaming export pipeline

See also

CSVWriter

Batchable CSV writer

NDJSONWriter

Batchable newline-delimited JSON writer

XMLStreamer

Batchable streaming XML writer

__init__(data=None, file=None, columns=None, headers=None, encoding='utf-8', write_headers=True, compression='infer', **fmt_kwargs)[source]

Initialize a batch-capable writer with optional deferred setup.

Parameters:
  • data (Iterable[RecordLike], optional) – Initial data. If None, setup is deferred until first write_batch().

  • file (str, Path, TextIO, or BinaryIO, optional) – Output destination.

  • columns (List[str], optional) – Explicit column names. If not provided, inferred from data.

  • headers (List[str], optional) – Header row text for CSV/Excel writers. If None, checks data.description for original column names, then falls back to detected column names. Only used by writers that have header rows (CSV, Excel).

  • encoding (str, default 'utf-8') – File encoding

  • write_headers (bool, default True) – Include header row in output

  • **fmt_kwargs – Format-specific options

Raises:

ValueError – If headers provided but write_headers=False

accepts_file_handle = True
data_iterator: Iterator | None
preserve_types = False
write()[source]

Write initial data provided at initialization.

Returns:

Number of rows written

Return type:

int

Raises:

RuntimeError – If no initial data was provided to __init__. Use write_batch() for streaming.

write_batch(data)[source]

Write a batch of records to the output stream.

This is the core method that makes BatchWriter suitable for BulkSurge and other high-volume streaming scenarios.

Parameters:

data (Iterable[RecordLike]) – A batch of Record objects (or compatible row objects).

Raises:

RuntimeError – If initial data was provided but write() hasn’t been called yet.

class dbtk.writers.csv.CSVWriter(data=None, file=None, columns=None, headers=None, write_headers=True, null_string=None, compression='infer', **csv_kwargs)[source]

Bases: BatchWriter

CSV writer class that extends BatchWriter.

__init__(data=None, file=None, columns=None, headers=None, write_headers=True, null_string=None, compression='infer', **csv_kwargs)[source]

Initialize CSV writer.

Parameters:
  • data – Cursor object or list of records

  • file (str | Path | TextIO | None) – Output file. If None, writes to stdout

  • columns (List[str] | None) – Column names for list-of-lists data (optional for other types)

  • headers (List[str] | None) – Header row text. If None, checks data.description for original column names, then falls back to detected column names. Useful when field names have been normalized but you want original database column names in the CSV header.

  • write_headers (bool) – Whether to include column headers

  • null_string (str) – String representation for null values

  • compression (str) – Compression type (‘infer’, ‘gzip’, ‘bz2’, ‘lzma’, or None)

  • **csv_kwargs – Additional arguments passed to csv.writer

to_string(obj)[source]

Convert object to string for CSV output. Change settings[‘null_string_csv’] to change null value representation.

dbtk.writers.csv.to_csv(data, file=None, headers=None, write_headers=True, null_string=None, compression='infer', **csv_kwargs)[source]

Export cursor or result set to CSV file.

Parameters:
  • data – Cursor object or list of records

  • file (str | Path | None) – Output file. If None, writes to stdout

  • headers (List[str] | None) – Header row text. If None, uses cursor.description or detected column names

  • write_headers (bool) – Whether to include column headers

  • null_string (str) – String representation for null values

  • compression (str) – Compression type. ‘infer’ detects from file extension (.gz, .bz2, .xz). Pass ‘gzip’, ‘bz2’, or ‘lzma’ to override, or None to disable.

  • **csv_kwargs – Additional arguments passed to csv.writer

Example

# Write to file to_csv(cursor, ‘users.csv’)

# Write to stdout to_csv(cursor)

# Custom delimiter to_csv(cursor, ‘data.tsv’, delimiter=’ ‘)

# Override header names to_csv(cursor, ‘users.csv’, headers=[‘User ID’, ‘Full Name’, ‘Email’])

Database writing utilities and ETL table operations.

class dbtk.writers.database.DatabaseWriter(data, target_cursor, target_table, batch_size=1000, commit_frequency=10000)[source]

Bases: BaseWriter

Database writer that extends BaseWriter.

__init__(data, target_cursor, target_table, batch_size=1000, commit_frequency=10000)[source]

Initialize database writer.

Parameters:
  • data – Source cursor or list of records

  • target_cursor – Target database cursor

  • target_table (str) – Name of target table

  • batch_size (int) – Number of records to insert per batch

  • commit_frequency (int) – How often to commit (in number of records)

write()[source]

Override to bypass file handle creation and call _write_data directly.

dbtk.writers.database.cursor_to_cursor(source_data, target_cursor, target_table, batch_size=1000, commit_frequency=10000)[source]

Copy data from source cursor/results to target database table.

Parameters:
  • source_data – Source cursor or list of records

  • target_cursor – Target database cursor

  • target_table (str) – Name of target table

  • batch_size (int) – Number of records to insert per batch

  • commit_frequency (int) – How often to commit (in number of records)

Returns:

Number of records inserted

Return type:

int

Example

# Copy between databases source_cursor = source_db.cursor() source_cursor.execute(“SELECT * FROM users”)

target_cursor = target_db.cursor() count = cursor_to_cursor(source_cursor, target_cursor, ‘users_copy’) print(f”Copied {count} records”)

Excel writer for database results using openpyxl.

class dbtk.writers.excel.ColumnRule(style=None, header_style=None, width=None, hidden=False, comment=None, filter=False, group_label=None, conditional_style=None)[source]

Bases: object

Per-column formatting rule for use with ExcelFormat.

Interchangeable with a plain dictExcelWriter accepts either. The order of rules in ExcelFormat.columns is significant: later rules override earlier ones on a per-property basis.

Parameters:
  • style (str or dict, optional) – Static style applied to every data cell in the column. Either a registered style name (str) or an inline property dict (bg_color, font, number_format, alignment).

  • header_style (str or dict, optional) – Static style applied to the header cell only (owns the cell entirely — include font={'bold': True} if you still want bold).

  • width (float, optional) – Explicit column width in Excel units; bypasses auto-sizing.

  • hidden (bool, default False) – Hide the column.

  • comment (str, optional) – Comment/note added to the header cell.

  • filter (bool, default False) – Show the auto-filter dropdown on this column only; hides dropdowns on all other columns.

  • group_label (str, optional) – Label for a group super-header spanning this column range. Only valid on range patterns ('col_a:col_z').

  • conditional_style (callable or list[callable], optional) – lambda record: style_name_or_None — evaluated per row; overrides style and row styles when non-None. Multiple callables are composed in order.

__init__(style=None, header_style=None, width=None, hidden=False, comment=None, filter=False, group_label=None, conditional_style=None)[source]
to_dict()[source]
class dbtk.writers.excel.ExcelFormat(styles=None, columns=None, rows=None, min_column_width=6, max_column_width=60, auto_filter=False, freeze=None, header_auto_rotate=None, tab_color=None)[source]

Bases: object

Formatting configuration for ExcelWriter.

Interchangeable with a plain dictExcelWriter accepts either. Using ExcelFormat gives full IDE autocomplete and type checking.

The columns dict is order-sensitive: patterns are applied in definition order and later rules override earlier ones per property.

Parameters:
  • styles (dict, optional) – Named style definitions. Keys are style names, values are property dicts (bg_color, font, number_format, alignment). Styles defined here take priority over the built-in styles, so you can redefine comma_style, currency_style, etc.

  • columns (dict, optional) – Pattern → ColumnRule (or plain dict). Patterns are case-insensitive fnmatch globs, literals, or 'start:end' ranges.

  • rows (dict, optional) –

    Row-type formatting. Recognised keys:

    • '*' — applied to all rows (lowest priority). Supports height and style (str or dict).

    • 'group_header' — the group-label row (only present when column ranges carry group_label). Supports height and style (str or dict).

    • 'header' — the column-header row. Supports height.

    • 'data' — data rows. Nested keys:

      • 'style': str or dict applied to all data rows.

      • 'odd' / 'even': {'style': style_name} for striping.

      • 'conditional_style': callable or list of callables lambda record: style_name_or_None. Multiple callables are composed in order.

      • 'height': uniform row height for data rows.

    Cascade (lowest → highest priority): '*''odd'/'even''conditional_style' callable(s).

  • min_column_width (float, default 6) – Minimum auto-sized column width.

  • max_column_width (float, default 60) – Maximum auto-sized column width.

  • auto_filter (bool, default False) – Enable Excel auto-filter dropdowns on the header row.

  • freeze (str or False, optional) – Freeze-panes cell reference (e.g. 'D2'). None uses the automatic default ('A2' or 'A3' when group headers are present). Pass False to disable freezing entirely.

  • header_auto_rotate (float or dict, optional) – Auto-rotate column headers that are significantly wider than their data. Pass a float ratio or {'ratio': 1.5, 'min_length': 8, 'height_factor': 6.5}.

  • tab_color (str, optional) – Worksheet tab color as a hex string ('#FF0000' or 'FF0000').

__init__(styles=None, columns=None, rows=None, min_column_width=6, max_column_width=60, auto_filter=False, freeze=None, header_auto_rotate=None, tab_color=None)[source]
to_dict()[source]
class dbtk.writers.excel.ExcelWriter(data=None, file=None, sheet_name=None, headers=None, write_headers=True, formatting=None)[source]

Bases: BatchWriter

Stateful Excel writer using openpyxl.

Keeps the workbook open across multiple write_batch() calls and saves only on context exit. Designed for both single-sheet legacy use and multi-sheet reports.

Supports all 3 BatchWriter modes: 1. Complete write from __init__ + write() 2. Batch write (no data on init) + write_batch() 3. Hybrid: data on init + write() + write_batch()

Usage examples:

# Mode 1: Traditional single-shot write
ExcelWriter(cursor, 'report.xlsx').write()

# Mode 2: Pure streaming with write_batch()
with ExcelWriter(file='report.xlsx') as writer:
    writer.write_batch(cursor)  # goes to sheet 'Data'

# Mode 3: Hybrid - initial data + streaming
with ExcelWriter(first_batch, 'report.xlsx') as writer:
    writer.write()  # Write initial batch
    writer.write_batch(second_batch)  # Stream additional batches

# Multi-sheet report
with ExcelWriter(file='report.xlsx', sheet_name='Summary') as writer:
    writer.write_batch(summary_data, sheet_name='Summary')
    writer.write_batch(users_data, sheet_name='Users')
    writer.write_batch(orders_data, sheet_name='Orders')

# Streaming / batch mode
with ExcelWriter(file='large.xlsx') as writer:
    for batch in large_generator:
        writer.write_batch(batch, sheet_name='Data')  # appends to 'Data'
__init__(data=None, file=None, sheet_name=None, headers=None, write_headers=True, formatting=None)[source]

Initialize the Excel writer.

Parameters:
  • data (Iterable[RecordLike], optional) – Initial data to write. If None, use write_batch() for streaming mode.

  • file (str or Path, optional) – Output Excel file (.xlsx). Required for Excel output.

  • sheet_name (str, optional) – Default/active sheet name to use for write_batch() calls without an explicit sheet_name.

  • headers (List[str], optional) – Header row text. If None, checks data.description for original column names, then falls back to detected column names.

  • write_headers (bool, default True) – Whether to write column headers (only when the sheet is empty).

  • formatting (ExcelFormat or dict, optional) – Worksheet formatting rules. Prefer ExcelFormat for IDE autocomplete; a plain dict is also accepted. See ExcelFormat for the full reference.

accepts_file_handle = False
close()[source]

Close the writer and save the workbook.

preserve_types = True
write_batch(data, sheet_name=None, headers=None, formatting=None)[source]

Write a batch of data to a sheet.

If this is the first write to this sheet in the current session, the sheet is cleared first. Subsequent writes to the same sheet append data.

Parameters:
  • data (Iterable[RecordLike]) – The data batch

  • sheet_name (str, optional) – Target sheet. If None, uses active_sheet or defaults to ‘Data’

  • headers (list of str, optional) – Display names for the header row. Overrides the writer-level headers set at initialisation for this batch only. Must match the column count.

  • formatting (ExcelFormat or dict, optional) – Per-call formatting override. None (default) uses the writer-level formatting. Pass {} to write this sheet with no formatting.

class dbtk.writers.excel.LinkSource(name, source_sheet=None, key_column=None, url_template=None, text_template=None, missing_text=None, external_only=False)[source]

Bases: object

Defines a linkable data source for creating internal and external hyperlinks in Excel.

LinkSource enables rich hyperlinking between worksheets and to external URLs. Each LinkSource is registered once with LinkedExcelWriter and can be used across multiple sheets to create consistent, navigable Excel reports.

As the source sheet is written, LinkSource caches each record’s cell reference and generates formatted link text and URLs using Python’s str.format_map(). Later sheets can reference these cached records to create hyperlinks.

class dbtk.writers.excel.LinkedExcelWriter(data=None, file=None, sheet_name=None, headers=None, write_headers=True, formatting=None)[source]

Bases: ExcelWriter

Advanced Excel writer with internal and external hyperlink management.

LinkedExcelWriter extends ExcelWriter to enable rich, bidirectional hyperlinking within Excel workbooks and to external systems. It automatically caches source records as they’re written and creates formatted hyperlinks in detail sheets that reference those sources.

This is particularly powerful for creating navigable multi-sheet reports with master-detail relationships, drill-through capabilities, and integration with external CRM, ticketing, or web applications.

Key Features

  • Internal navigation - Links between worksheets (e.g., #Students!B5)

  • External integration - Deep links to web applications

  • Hybrid linking - Store both internal and external, choose which to display

  • Template-based formatting - Use Python format strings for link text and URLs

  • Automatic caching - Source records cached as written, no manual tracking

  • Mode control - Force internal or external links per column via source:internal

Workflow

  1. Create LinkSource definitions for each linkable entity

  2. Register them with LinkedExcelWriter

  3. Write source sheets first (e.g., Students, Products)

  4. Write detail sheets with link specifications (e.g., Enrollments, Orders)

  5. Links are resolved from cache and applied automatically

param file:

Output Excel file (.xlsx)

type file:

str or Path

param data:

Initial data to write. If None, use write_batch() for streaming mode.

type data:

Iterable[RecordLike], optional

param sheet_name:

Default sheet name for write_batch() calls

type sheet_name:

str, optional

param write_headers:

Whether to write column headers

type write_headers:

bool, default True

Examples

Basic internal linking between sheets:

with LinkedExcelWriter(file='school_report.xlsx') as writer:
    # Define linkable entity
    student_link = LinkSource(
        name="student",
        source_sheet="Students",
        key_column="student_id"
    )
    writer.register_link_source(student_link)

    # Write source sheet
    writer.write_batch(students_data, sheet_name="Students")

    # Write detail sheet with internal links
    writer.write_batch(
        enrollments_data,
        sheet_name="Enrollments",
        links={"student_name": "student:internal"}
    )

External links to CRM system:

with LinkedExcelWriter(file='sales_report.xlsx') as writer:
    customer_link = LinkSource(
        name="customer",
        source_sheet="Customers",
        key_column="customer_id",
        url_template="https://crm.company.com/customers/{crm_id}",
        text_template="{company_name} ({customer_id})"
    )
    writer.register_link_source(customer_link)

    writer.write_batch(customers_data, sheet_name="Customers")
    writer.write_batch(
        orders_data,
        sheet_name="Orders",
        links={"customer": "customer"}  # Uses external URL
    )

Hybrid mode with internal and external links:

with LinkedExcelWriter(file='support_tickets.xlsx') as writer:
    ticket_link = LinkSource(
        name="ticket",
        source_sheet="Tickets",
        key_column="ticket_id",
        url_template="https://support.company.com/ticket/{ticket_id}",
        text_template="#{ticket_id} - {subject}"
    )
    writer.register_link_source(ticket_link)

    writer.write_batch(tickets_data, sheet_name="Tickets")
    writer.write_batch(
        comments_data,
        sheet_name="Comments",
        links={
            "ticket_link": "ticket",           # External to support system
            "ticket_ref": "ticket:internal"    # Internal sheet navigation
        }
    )

Multiple link sources in one sheet:

with LinkedExcelWriter(file='class_roster.xlsx') as writer:
    student_link = LinkSource(
        name="student",
        source_sheet="Students",
        key_column="student_id",
        text_template="{last_name}, {first_name}"
    )
    course_link = LinkSource(
        name="course",
        source_sheet="Courses",
        key_column="course_id",
        text_template="{course_code} - {title}"
    )

    writer.register_link_source(student_link)
    writer.register_link_source(course_link)

    writer.write_batch(students_data, sheet_name="Students")
    writer.write_batch(courses_data, sheet_name="Courses")
    writer.write_batch(
        enrollments_data,
        sheet_name="Enrollments",
        links={
            "student_name": "student:internal",
            "course_name": "course:internal"
        }
    )

Notes

  • Source sheets MUST be written before detail sheets that reference them

  • The key_column must exist in both source and detail datasets

  • Missing links display missing_text if set, otherwise show raw value

  • Link mode syntax: "source_name" (external) or "source_name:internal"

  • Templates use str.format_map() - all fields must exist in record data

  • Hyperlink styling (blue, underlined) is applied automatically

See also

LinkSource

Link definition class

ExcelWriter

Base writer without linking capabilities

__init__(data=None, file=None, sheet_name=None, headers=None, write_headers=True, formatting=None)[source]

Registered LinkSource instances, keyed by name.

Register a link source for use across sheets.

write_batch(data, sheet_name=None, headers=None, links=None, formatting=None)[source]

Write a batch with optional hyperlinking.

If this is the first write to this sheet in the current session, the sheet is cleared first. Subsequent writes to the same sheet append data.

headers: display names for this batch, overriding the writer-level default links: dict column_name → “source_name” or “source_name:internal” formatting: per-call formatting override; None uses writer-level formatting,

{} writes with no formatting

dbtk.writers.excel.check_dependencies()[source]
dbtk.writers.excel.to_excel(data, file, sheet='Data', headers=None, write_headers=True)[source]

Legacy convenience function — writes a single sheet.

Parameters:
  • data (Iterable[RecordLike]) – Data to write (cursor, list of Records, etc.)

  • file (str or Path) – Output Excel file (.xlsx)

  • sheet (str, default 'Data') – Sheet name to write to

  • headers (List[str], optional) – Header row text. If None, uses cursor.description or detected column names

  • write_headers (bool, default True) – Whether to write column headers

Examples

# Write cursor with original database column names to_excel(cursor, ‘report.xlsx’)

# Override header names to_excel(cursor, ‘report.xlsx’, headers=[‘User ID’, ‘Full Name’, ‘Email’])

For multi-sheet or advanced reports, use ExcelWriter as a context manager with write_batch().

Fixed-width and EDI text writers with batch streaming support.

Both writers are driven by FixedColumn schema objects — the same objects used by FixedReader and EDIReader on the read side. When input records are already FixedWidthRecord instances (e.g. round-tripped through a reader), they go straight to to_line(). Any other record type is cast into the appropriate FixedWidthRecord subclass first.

class dbtk.writers.fixed_width.EDIWriter(data=None, file=None, columns=None, encoding='utf-8', truncate_overflow=False)[source]

Bases: BatchWriter

Writer for fixed-width files containing multiple record types (EDI-like formats).

Symmetric counterpart to EDIReader. Takes the same Dict[str, List[FixedColumn]] schema and dispatches writes by record type code (always the first field of each record).

When input records are already FixedWidthRecord instances (e.g. from EDIReader), the type code is read from record[0] and checked against the schema, then to_line() is called directly. Any other record type is cast into the appropriate FixedWidthRecord subclass.

Parameters:
  • data (Iterable[RecordLike], optional) – Initial data to write. None for streaming mode.

  • file (str, Path, TextIO, or BinaryIO, optional) – Output file or handle. None writes to stdout.

  • columns (Dict[str, List[FixedColumn]]) – Mapping of type codes to column definitions — same format as EDIReader. All keys must be strings of identical length.

  • encoding (str, default 'utf-8') – File encoding.

  • truncate_overflow (bool, default False) – Truncate values that exceed their column width. False (default) raises ValueError — EDI files are typically length-strict.

Examples

Read-modify-write loop:

from dbtk.readers.fixed_width import EDIReader
from dbtk.writers.fixed_width import EDIWriter
from dbtk.formats.edi import ACH_COLUMNS

with open('in.ach') as fp, EDIWriter('out.ach', ACH_COLUMNS) as w:
    w.write_batch(EDIReader(fp, ACH_COLUMNS))

Single-shot:

records = list(EDIReader(open('in.ach'), ACH_COLUMNS))
to_edi(records, ACH_COLUMNS, 'out.ach')
__init__(data=None, file=None, columns=None, encoding='utf-8', truncate_overflow=False)[source]
accepts_file_handle = True
preserve_types = True
class dbtk.writers.fixed_width.FixedWidthWriter(data=None, file=None, columns=None, encoding='utf-8', truncate_overflow=True)[source]

Bases: BatchWriter

Fixed-width text file writer with batch streaming capabilities.

Field widths, alignment, and padding are driven by a List[FixedColumn] schema — the same schema used by FixedReader. When input records are already FixedWidthRecord instances they are written directly via to_line(). Any other record type (dict, list, namedtuple, generic Record) is cast into the appropriate FixedWidthRecord subclass first.

Parameters:
  • data (Iterable[RecordLike], optional) – Initial data to write. None for streaming mode.

  • file (str, Path, TextIO, or BinaryIO, optional) – Output file or handle. None writes to stdout.

  • columns (List[FixedColumn]) – Column definitions — width, alignment, padding, and type per field.

  • encoding (str, default 'utf-8') – File encoding.

  • truncate_overflow (bool, default True) – Truncate values that exceed their column width. False raises ValueError instead.

Examples

Round-trip from FixedReader:

with open('input.txt') as fp:
    records = list(FixedReader(fp, MY_COLS))
to_fixed_width(records, MY_COLS, 'output.txt')

Write from dicts:

rows = [{'code': 'A', 'amount': 42}, ...]
FixedWidthWriter(rows, 'output.txt', MY_COLS).write()

Streaming / batch mode:

with FixedWidthWriter(file='output.txt', columns=MY_COLS) as w:
    for batch in source:
        w.write_batch(batch)
__init__(data=None, file=None, columns=None, encoding='utf-8', truncate_overflow=True)[source]
accepts_file_handle = True
preserve_types = True
dbtk.writers.fixed_width.to_edi(data, columns, file=None, encoding='utf-8', truncate_overflow=False)[source]

Export EDI records to a fixed-width text file.

Parameters:
  • data – Iterable of FixedWidthRecord (or compatible) instances.

  • columns (Dict[str, List[FixedColumn]]) – Dict mapping type codes to FixedColumn definitions.

  • file (str | Path | None) – Output file path. If None, writes to stdout.

  • encoding (str) – File encoding.

  • truncate_overflow (bool) – Truncate values that exceed column width.

dbtk.writers.fixed_width.to_fixed_width(data, columns, file=None, encoding='utf-8', truncate_overflow=True)[source]

Export records to a fixed-width text file.

Parameters:
  • data – Iterable of records (FixedWidthRecord, dict, list, etc.)

  • columns (List[FixedColumn]) – FixedColumn definitions for width, alignment, and padding.

  • file (str | Path | None) – Output file path. If None, writes to stdout.

  • encoding (str) – File encoding.

  • truncate_overflow (bool) – Truncate values that exceed column width.

JSON writer for database results.

class dbtk.writers.json.JSONWriter(data=None, file=None, columns=None, encoding='utf-8', indent=2, compression='infer', **json_kwargs)[source]

Bases: BaseWriter

JSON writer class that extends BaseWriter.

__init__(data=None, file=None, columns=None, encoding='utf-8', indent=2, compression='infer', **json_kwargs)[source]

Initialize JSON writer.

Parameters:
  • data – Cursor object or list of records

  • file (str | Path | None) – Output file. If None, writes to stdout

  • columns (List[str] | None) – Column names for list-of-lists data (optional for other types)

  • encoding (str) – File encoding

  • indent (int | None) – JSON indentation - defaults to 2 (pretty-print), 0 or None for compact

  • compression (str) – Compression type. ‘infer’ detects from file extension (.gz, .bz2, .xz). Pass ‘gzip’, ‘bz2’, or ‘lzma’ to override, or None to disable.

  • **json_kwargs – Additional arguments passed to json.dump

to_string(obj)[source]

Convert object to string. For JSON just convert dates and times.

class dbtk.writers.json.NDJSONWriter(data=None, file=None, columns=None, encoding='utf-8', compression='infer', **json_kwargs)[source]

Bases: BatchWriter

NDJSON (newline-delimited JSON) writer.

__init__(data=None, file=None, columns=None, encoding='utf-8', compression='infer', **json_kwargs)[source]

Initialize NDJSON writer.

Parameters:
  • data – Cursor object or list of records

  • file (str | Path | None) – Output file. If None, writes to stdout

  • columns (List[str] | None) – Column names for list-of-lists data (optional for other types)

  • encoding (str) – File encoding

  • compression (str) – Compression type. ‘infer’ detects from file extension (.gz, .bz2, .xz). Pass ‘gzip’, ‘bz2’, or ‘lzma’ to override, or None to disable.

  • **json_kwargs – Additional arguments passed to json.dumps

to_string(obj)[source]

Convert object to string. For JSON just convert dates and times.

dbtk.writers.json.to_json(data, file=None, encoding='utf-8', indent=2, compression='infer', **json_kwargs)[source]

Export cursor or result set to JSON file as an array of dictionaries.

Parameters:
  • data – Cursor object or list of records

  • file (str | Path | None) – Output file. If None, writes to stdout

  • encoding (str) – File encoding

  • indent (int | None) – JSON indentation - defaults to 2 (pretty-print), 0 or None for compact

  • **json_kwargs – Additional arguments passed to json.dump

Example

# Write to file as JSON array to_json(cursor, ‘users.json’)

# Write to stdout to_json(cursor)

# Compact format to_json(cursor, ‘data.json’, indent=None)

dbtk.writers.json.to_ndjson(data, file=None, encoding='utf-8', compression='infer', **json_kwargs)[source]

Export cursor or result set to NDJSON (newline-delimited JSON) file.

Parameters:
  • data – Cursor object or list of records

  • file (str | Path | None) – Output file. If None, writes to stdout

  • encoding (str) – File encoding

  • **json_kwargs – Additional arguments passed to json.dumps

Example

# Write to file as NDJSON to_ndjson(cursor, ‘users.ndjson’)

# Write to stdout to_ndjson(cursor)

XML writer for database results using lxml.

class dbtk.writers.xml.XMLStreamer(data=None, file=None, columns=None, encoding='utf-8', root_element='data', record_element='record')[source]

Bases: BatchWriter

Streaming XML writer that writes records incrementally.

Memory-efficient for large datasets. Writes XML elements as they arrive without building the entire tree in memory.

Parameters:
  • data (Iterable[RecordLike], optional) – Initial data. For streaming mode, use data=None.

  • file (str, Path, or BinaryIO, optional) – Output file or binary file handle. Must be binary mode for streaming.

  • columns (List[str], optional) – Column names for list-of-lists data

  • encoding (str, default 'utf-8') – XML encoding declaration

  • root_element (str, default 'data') – Name of the root XML element

  • record_element (str, default 'record') – Name of each record element

Examples

Streaming mode:

with open('output.xml', 'wb') as f:
    with XMLStreamer(data=None, file=f, root_element='data') as writer:
        for batch in surge.batched(records):
            writer.write_batch(batch)

Single-shot mode:

XMLStreamer(data=records, file='output.xml').write()

Notes

  • Requires lxml library

  • File must be opened in binary mode (‘wb’) for streaming

  • No pretty-printing (streaming writes compact XML)

  • More memory-efficient than XMLWriter for large datasets

__init__(data=None, file=None, columns=None, encoding='utf-8', root_element='data', record_element='record')[source]

Initialize streaming XML writer.

class dbtk.writers.xml.XMLWriter(data=None, file=None, columns=None, encoding='utf-8', root_element='data', record_element='record', pretty=True)[source]

Bases: BaseWriter

XML writer that builds complete XML tree in memory.

Best for small to medium datasets. For large datasets that don’t fit in memory, use XMLStreamer instead.

Parameters:
  • data (Iterable[RecordLike]) – Data to write

  • file (str, Path, TextIO, or BinaryIO, optional) – Output file or file handle. If None, writes to stdout.

  • columns (List[str], optional) – Column names for list-of-lists data

  • encoding (str, default 'utf-8') – XML encoding declaration

  • root_element (str, default 'data') – Name of the root XML element

  • record_element (str, default 'record') – Name of each record element

  • pretty (bool, default True) – Whether to format with indentation

Examples

>>> to_xml(cursor, 'users.xml')
>>> to_xml(records, 'output.xml', root_element='users', record_element='user')
__init__(data=None, file=None, columns=None, encoding='utf-8', root_element='data', record_element='record', pretty=True)[source]

Initialize XML writer.

preserve_types = False
dbtk.writers.xml.check_dependencies()[source]

Check for optional dependencies and issue warnings if missing.

dbtk.writers.xml.to_xml(data, file=None, encoding='utf-8', root_element='data', record_element='record', stream=False, pretty=None)[source]

Export cursor or result set to XML file.

Parameters:
  • data (Iterable[RecordLike]) – Cursor object or list of records

  • file (str or Path, optional) – Output file. If None, writes to stdout (limited to 20 rows)

  • encoding (str, default 'utf-8') – XML encoding declaration

  • root_element (str, default 'data') – Name of the root XML element

  • record_element (str, default 'record') – Name of each record element

  • stream (bool, default False) – Whether to use streaming mode (reduces memory usage for large datasets)

  • pretty (bool, optional) – Whether to format with indentation. Defaults to True for tree mode, False for streaming mode.

Examples

Write to file:

to_xml(cursor, 'users.xml')

Write to stdout (limited to 20 rows):

to_xml(cursor)

Custom element names with streaming:

to_xml(cursor, 'active_users.xml',
       root_element='users',
       record_element='user',
       stream=True)

Notes

  • Tree mode (stream=False): Builds complete XML tree in memory, supports pretty printing

  • Streaming mode (stream=True): Memory-efficient, writes incrementally, no pretty printing

  • For large datasets (>100K rows), use stream=True

Utilities

Utility functions for dbtk.

class dbtk.utils.ErrorDetail(message, field=None, code=None)[source]

Bases: object

Structured error record for ETL and database operations.

Captures a single error with optional field attribution and driver-specific error code. Used by dbtk.etl.table.Table (last_error) and dbtk.etl.managers.IdentityManager (per-entity _errors list), and round-trips cleanly through JSON via save_state / load_state.

message

Human-readable description of the error.

Type:

str

field

Name of the source or target field the error is associated with. None when the error is not specific to a single field.

Type:

str, optional

code

Database- or application-level error code (e.g. pgcode from psycopg2, an ORA- number, or a custom application string). None when no structured code is available.

Type:

str, optional

__init__(message, field=None, code=None)[source]

Create an ErrorDetail.

Parameters:
  • message (str) – Human-readable description of the error.

  • field (str, optional) – Field the error relates to, or None.

  • code (str, optional) – Structured error code, or None.

code
field
message
class dbtk.utils.FixedColumn(name, start_pos, end_pos=None, column_type='text', comment=None, align=None, pad_char=None, width=None)[source]

Bases: object

Column definition for fixed width files

__init__(name, start_pos, end_pos=None, column_type='text', comment=None, align=None, pad_char=None, width=None)[source]
Parameters:
  • name (str) – database column name

  • start_pos (int) – start position of field, first position is 1 not 0

  • end_pos (int) – end position of field (mutually exclusive with width)

  • column_type (str) – text, int, float, date

  • comment (str) – discription for column usage/options

  • align (str) – override alignment (left, right, center)

  • pad_char (str) – override pad character

  • width (int) – field width in characters (mutually exclusive with end_pos)

FixedColumn(‘birthdate’, 25, 35, ‘date’) FixedColumn(‘birthdate’, 25, width=11, column_type=’date’)

property width: int
class dbtk.utils.ParamStyle[source]

Bases: object

SQL parameter placeholder styles for different database drivers.

Different database drivers use different parameter placeholder formats. This class provides constants and utilities for working with these formats:

  • QMARK: Question mark placeholders (?, ?) - SQLite, ODBC

  • NUMERIC: Numeric placeholders (:1, :2) - Oracle

  • NAMED: Named placeholders (:name, :email) - Oracle, psycopg2

  • FORMAT: Printf-style (%s, %s) - MySQL (MySQLdb)

  • PYFORMAT: Python format (%(name)s) - psycopg2, pymysql

values()[source]

Get all available parameter styles

positional_styles()[source]

Get styles that require tuples

named_styles()[source]

Get styles that require dicts

get_placeholder(paramstyle)[source]

Get placeholder string for a style

Example

::
>>> ParamStyle.get_placeholder('qmark')
'?'
>>> ParamStyle.get_placeholder('named')
':1'
DEFAULT = 'named'
FORMAT = 'format'
NAMED = 'named'
NUMERIC = 'numeric'
PYFORMAT = 'pyformat'
QMARK = 'qmark'
classmethod get_placeholder(paramstyle)[source]
classmethod get_positional_style(paramstyle)[source]

Return a positional paramstyle, mapping named style to the corresponding positional style if needed.

classmethod named_styles()[source]

Parameter styles where parameters must be in dict instead of tuple

classmethod positional_styles()[source]

Parameter styles where parameters must be in properly ordered tuple instead of dict

classmethod values()[source]
class dbtk.utils.QueryLogger(level=20)[source]

Bases: object

Simple query logger.

__init__(level=20)[source]

Initialize query logger.

Parameters:

level (int) – Logging level

dbtk.utils.batch_iterable(iterable, batch_size)[source]

Batch an iterable into chunks of specified size.

Parameters:
  • iterable (Iterable[Any]) – The iterable to batch

  • batch_size (int) – Size of each batch

Yields:

Lists of items up to batch_size length

dbtk.utils.identifier_needs_quoting(identifier)[source]

Check if identifier needs quoting.

dbtk.utils.normalize_field_name(name)[source]

Normalize field name for attribute access.

Converts to lowercase, replaces non-alphanumeric characters with underscores, collapses consecutive underscores, and strips leading/trailing underscores. A single leading underscore is preserved only if the original name explicitly started with one — underscores synthesized from other leading characters (e.g. $, #) are stripped.

Parameters:

name (str) – Original field name

Returns:

Normalized field name suitable for Python attribute access

Return type:

str

Examples

>>> normalize_field_name('Start Year')
'start_year'
>>> normalize_field_name('Start Year!')
'start_year'
>>> normalize_field_name('!Status')
'status'
>>> normalize_field_name('_Secret_Code!')
'_secret_code'
>>> normalize_field_name('$Secret_Code!')
'secret_code'
>>> normalize_field_name('__id__')
'_id'
>>> normalize_field_name('_row_num')
'_row_num'
>>> normalize_field_name('#Term Code')
'term_code'
>>> normalize_field_name('2025 Sales')
'n2025_sales'
dbtk.utils.process_sql_parameters(sql, paramstyle)[source]

Process SQL parameters according to the specified paramstyle.

Supports SQL input in either ‘named’ (:param) or ‘pyformat’ (%(param)s) format. Auto-detects the input format and converts to the target paramstyle.

Parameters:
  • sql (str) – SQL query with named (:name) or pyformat (%(name)s) parameters

  • paramstyle (str) – The desired parameter style for the resulting SQL string

Returns:

A tuple containing the processed SQL query string and a tuple of all named parameters extracted in the order in which they appear in the original query.

Raises:

ValueError – If the SQL contains mixed parameter formats or unsupported paramstyle.

Return type:

Tuple[str, Tuple[str, …]]

Examples

>>> # Named input, convert to pyformat
>>> process_sql_parameters("SELECT * FROM users WHERE id = :user_id", "pyformat")
("SELECT * FROM users WHERE id = %(user_id)s", ('user_id',))
>>> # Pyformat input, convert to qmark
>>> process_sql_parameters("SELECT * FROM users WHERE id = %(user_id)s", "qmark")
("SELECT * FROM users WHERE id = ?", ('user_id',))
dbtk.utils.quote_identifier(identifier)[source]

Quote identifier, handling qualified names by splitting on dots.

dbtk.utils.reset_format_cache()[source]

Clear format cache to force rebuilding on next call.

dbtk.utils.sanitize_identifier(name, idx=0)[source]

Sanitize an identifier/column name.

dbtk.utils.to_string(obj)[source]

Convert a value to string representation.

Parameters:

obj (Any) – Value to convert

Returns:

String representation

Return type:

str

dbtk.utils.validate_identifier(identifier, max_length=64, allow_temp=False)[source]

Validate that an identifier is safe for use (even if it needs quoting). Returns the identifier if valid, raises ValueError if invalid.

Parameters:
  • identifier (str) – The identifier to validate

  • max_length (int) – Maximum length for identifier

  • allow_temp (bool) – If True, allow underscore or hash prefix for temp tables

dbtk.utils.wrap_at_comma(text)[source]

Wrap text at commas, avoiding breaks inside parentheses.

ETL

ETL (Extract, Transform, Load) operations and utilities.

This module provides tools for managing database tables, resolving identities, and performing bulk operations:

  • Table: Schema-aware table operations with automatic SQL generation, field mapping, transformations, and per-operation error tracking via last_error.

  • DataSurge: Row-oriented bulk INSERT, UPDATE, DELETE, and MERGE using executemany. Works with all drivers; supports db_expr columns.

  • BulkSurge: High-throughput bulk loading via native database mechanisms (PostgreSQL COPY, Oracle direct-path, SQL Server bcp, etc.). Memory-efficient streaming; does not support db_expr columns.

  • IdentityManager: Resumable source-to-target identity resolution with per-entity status, error, and message tracking. State can be saved/loaded as JSON.

  • ValidationCollector: Callable collector for fn-pipelines that enriches and accumulates unique codes from source data.

  • TableLookup: Cached SQL-backed lookup transform for use in fn-pipelines and as a resolver for IdentityManager.

  • column_defs_from_db(): Generate Table column definitions by introspecting a live database table.

Example

from dbtk.etl import Table, DataSurge, BulkSurge, IdentityManager

# Define table structure
table = Table('users', columns={
    'id': {'field': 'id', 'key': True},
    'name': {'field': 'full_name', 'nullable': False},
    'email': {'field': 'email', 'fn': 'email'}
}, cursor=cursor)

# Row-oriented operations (INSERT + UPDATE/MERGE in same run, or db_expr columns)
surge = DataSurge(table)
surge.upsert(new_records)      # INSERT new, UPDATE existing
surge.delete(stale_records)

# High-throughput load (no db_expr columns, INSERT only)
bulk = BulkSurge(table)
bulk.insert(records)           # Streams via COPY / direct-path / bcp

# Identity resolution
stmt = cursor.prepare_file('sql/resolve_user.sql')
im = IdentityManager('source_id', 'user_id', resolver=stmt)
for row in reader:
    entity = im.resolve(row)
class dbtk.etl.BulkSurge(table, batch_size=10000, pass_through=False)[source]

Bases: BaseSurge

Lightning-fast bulk loading using native database tools and streaming.

BulkSurge provides high-performance data loading by leveraging database-specific bulk loading mechanisms. It supports both direct streaming (zero temp files) and external tool-based loading (bcp, SQL*Loader) depending on the database and method chosen.

Supported Databases

  • PostgreSQL/Redshift: COPY FROM STDIN (streaming, no temp files)

  • Oracle: direct_path_load (streaming) or SQL*Loader (external tool)

  • MySQL/MariaDB: LOAD DATA LOCAL INFILE via temp file (external only)

  • SQL Server: bcp utility (external tool, requires named connection)

Loading Methods

  • direct (default): Uses native Python drivers for streaming bulk loads
    • Postgres: COPY protocol with background writer thread

    • Oracle: direct_path_load API (requires python-oracledb 3.4+)

  • external: Dumps CSV then loads via command-line tool or SQL command
    • Oracle: SQL*Loader (sqlldr) with auto-generated control file

    • MySQL/MariaDB: LOAD DATA LOCAL INFILE from temp CSV file

    • SQL Server: bcp utility (only external method available)

Performance Notes

  • BulkSurge is memory-efficient: uses batching and streaming to handle large datasets

  • Direct methods are typically faster and require no temp files

  • External tools require credentials from config file (see connection_name)

  • Tables with db_expr columns are incompatible (use DataSurge instead)

param table:

Table instance with column definitions and cursor

type table:

Table

param batch_size:

Number of records per batch (default: 10,000)

type batch_size:

int, optional

param pass_through:

Skip transformation and validation, using source data directly (default: False).

When to use:

  • Database-to-database copies with identical schemas

  • Pre-transformed data from upstream pipelines (already validated)

  • Cursor-to-cursor streaming (maximum throughput)

  • Raw positional tuples pre-ordered for binding parameters

What’s skipped: Field mapping, type coercion, default values, null value handling, required field validation, and Table.set_values() overhead.

Warning: Do NOT use if records might have missing required fields, mismatched field names, need type transformations, or data quality is uncertain. All database constraints (primary keys, foreign keys) still apply.

type pass_through:

bool, optional

total_read

Total rows read from source. 1-based (first row = 1). Includes both loaded and skipped rows.

Type:

int

total_loaded

Total rows successfully loaded.

Type:

int

skipped

Total rows skipped due to missing required fields.

Type:

int

skip_details

Skip tracking grouped by reason. Key is a frozenset of missing required field names. Value is a dict with:

  • count: total rows skipped for this reason

  • sample: list of up to 20 1-based row numbers (for debugging)

Example:

{frozenset({'primary_name'}): {'count': 5, 'sample': [937887, 957847, ...]}}
Type:

dict

dump_path

Path of the last file written by dump(). Set after each dump() call.

Type:

Path or None

Examples

PostgreSQL streaming (zero temp files):

with BulkSurge(table) as surge:
    surge.load(reader)  # Uses COPY FROM STDIN

Oracle with SQL*Loader:

surge = BulkSurge(table)
surge.load(reader, method='external')  # Uses sqlldr

MySQL with custom dump location:

surge = BulkSurge(table)
surge.load(reader, method='external', dump_path='/data/staging')

SQL Server with bcp (requires config):

db = dbtk.connect('prod_mssql')  # Named connection required
surge = BulkSurge(table)
surge.load(reader)  # Uses bcp

Fast database-to-database copy (matching schemas):

# Source and destination schemas match exactly
surge = BulkSurge(dest_table, pass_through=True)
surge.load(source_cursor.fetchall())

Pre-transformed data (already validated):

# Data already transformed and validated by upstream process
surge = BulkSurge(table, pass_through=True)
surge.load(validated_records)

See also

DataSurge

Standard bulk operations using executemany

Table

Table definition and schema management

__init__(table, batch_size=10000, pass_through=False)[source]
dump(records, filename=None, write_headers=True, delimiter=',', encoding='utf-8', **csv_args)[source]

Export records to a delimited file.

Resolves the output path, writes all records via CSVWriter, and sets self.dump_path to the resolved path for callers that need it. For Oracle connections, automatically generates a SQL*Loader control file alongside the data file.

Parameters:
  • records (Iterable[Record]) – Records to export

  • filename (Union[str, Path], optional) – Target file path (directory or full path). See _resolve_file_path for resolution priority.

  • write_headers (bool, optional) – Include column headers (default: True)

  • delimiter (str, optional) – Field delimiter character (default: ‘,’). Extension is inferred: ‘t’ → .tsv, anything else → .csv

  • encoding (str, optional) – File encoding (default: ‘utf-8’)

  • **csv_args (optional) – Additional keyword arguments passed to csv.writer (e.g. quoting, quotechar, escapechar)

Returns:

  • int – Number of records written

  • Side Effects

  • ————

  • Sets self.dump_path to the resolved output Path.

Return type:

int

Notes

Oracle Auto-generation: When connected to Oracle, automatically generates a SQL*Loader control file (.ctl) alongside the data file and logs the sqlldr command to run.

Examples

Export with auto-generated Oracle control file:

db = dbtk.connect('oracle_prod')
surge = BulkSurge(table)
surge.dump(reader, '/staging/export.csv')
# Creates: /staging/export.csv + /staging/export_a1b2c3d4.ctl
# Logs sqlldr command, e.g.:
#   sqlldr userid=USER/PASS@DB control=... data=...

Custom delimiter with no quoting (e.g. for bcp):

surge.dump(reader, '/staging/data.csv', delimiter='\x1f',
           quoting=csv.QUOTE_NONE, escapechar=None)
load(records, method='direct', dump_path=None)[source]

Bulk load records using database-specific mechanisms.

Automatically selects the appropriate loading strategy based on database type and method parameter. Direct methods use streaming with zero temp files when possible. External methods use command-line tools and require a named connection.

Parameters:
  • records (Iterable[Record]) – Iterator of Record objects to load

  • method (str, optional) – Loading method to use (default: ‘direct’) - ‘direct’: Stream data using native drivers (Postgres COPY, Oracle direct_path_load) - ‘external’: Dump CSV then load (Oracle sqlldr, SQL Server bcp, MySQL LOAD DATA LOCAL INFILE)

  • dump_path (str or Path, optional) – Path for temp CSV files (only used by external methods) - If file path: use exactly as specified - If directory: generate timestamped file in that directory - If None: use settings[‘data_dump_dir’] or temp directory

Returns:

Number of records successfully loaded

Return type:

int

Raises:
  • RuntimeError – If external method requires credentials but connection_name is not set

  • NotImplementedError – If database type is not supported or required driver features unavailable

Notes

PostgreSQL/Redshift: - Only uses direct method (COPY FROM STDIN) - Streaming with background writer thread, no temp files - If you need to use psql copy, used BulkSurge.dump() to generate transformed CSV file

Oracle: - Direct: Uses direct_path_load (requires python-oracledb 3.4+) - External: Uses SQL*Loader (sqlldr) with auto-generated control file - External method requires named connection from config

MySQL/MariaDB: - Only external method supported (direct raises NotImplementedError) - Dumps CSV then executes LOAD DATA LOCAL INFILE - Requires local_infile=1 on server

SQL Server: - Only external method (uses bcp utility) - Requires named connection from config for credentials - Supports integrated auth (Windows) if no user/password in config

Examples

Direct streaming (default):

surge = BulkSurge(table)
surge.load(reader)  # Streams data, zero temp files

Oracle SQL*Loader:

surge = BulkSurge(table)
surge.load(reader, method='external', dump_path='/staging')

SQL Server with bcp (requires config connection):

db = dbtk.connect('prod_mssql')  # Must use named connection
table = Table('dbo.orders', columns=..., cursor=db.cursor())
surge = BulkSurge(table)
surge.load(reader)  # Uses bcp with credentials from config

See also

dump

Export records to CSV file

class dbtk.etl.DataSurge(table, batch_size=None, use_transaction=False, pass_through=False)[source]

Bases: BaseSurge

Handles bulk ETL operations by delegating to a stateful Table instance.

Note: The Table instance’s state (self.values) is modified during processing. Ensure the Table is not used concurrently by other operations or threads.

Parameters:
  • table (Table) – Table instance with column definitions and cursor

  • batch_size (int, optional) – Number of records per batch (default: cursor.batch_size or 1000)

  • use_transaction (bool, optional) – Wrap all operations in a transaction (default: False)

  • pass_through (bool, optional) –

    Skip transformation and validation, using source data directly (default: False). Only compatible for inserts. Not compatible with columns with database expressions db_expr

    When to use:

    • Database-to-database copies with identical schemas

    • Pre-transformed data from upstream pipelines (already validated)

    • Raw positional tuples pre-ordered for binding parameters

    What’s skipped: Field mapping, type coercion, default values, null value handling, required field validation, and Table.set_values() overhead.

    Warning: Do NOT use if records might have missing required fields, mismatched field names, need type transformations, or data quality is uncertain. All database constraints (primary keys, foreign keys) still apply.

total_read

Total rows read from source. 1-based (first row = 1). Includes both loaded and skipped rows.

Type:

int

total_loaded

Total rows successfully loaded.

Type:

int

skipped

Total rows skipped due to missing required fields.

Type:

int

skip_details

Skip tracking grouped by reason. Key is a frozenset of missing required field names. Value is a dict with:

  • count: total rows skipped for this reason

  • sample: list of up to 20 1-based row numbers (for debugging)

Example:

{frozenset({'primary_name'}): {'count': 5, 'sample': [937887, 957847, ...]}}
Type:

dict

Examples

Standard ETL with transformation:

table = Table(..., cursor=cursor)
surge = DataSurge(table, batch_size=1000, use_transaction=True)
errors = surge.insert(records, raise_error=False)

Fast database-to-database copy (matching schemas):

# Source and destination schemas match exactly
surge = DataSurge(dest_table, batch_size=5000, pass_through=True)
surge.insert(source_cursor)

Pre-transformed data (already validated):

# Data already transformed and validated by upstream process
surge = DataSurge(table, pass_through=True)
surge.insert(validated_records)
__init__(table, batch_size=None, use_transaction=False, pass_through=False)[source]

Initialize DataSurge for bulk operations.

Parameters:
  • table – Table instance with schema metadata

  • batch_size (int | None) – Number of records per batch

  • use_transaction (bool) – Use transaction for all operations (default: False)

  • pass_through (bool) – Skip transformation/validation for trusted data (default: False)

delete(records, raise_error=True)[source]

Perform bulk DELETE on records.

get_sql(operation)[source]

Get SQL for operation, checking local modifications first.

insert(records, raise_error=True)[source]

Perform bulk INSERT on records.

load(records, operation=None, raise_error=True)[source]

Core bulk execution using executemany() — shared path for insert/update/delete/merge.

merge(records, raise_error=True)[source]

Perform bulk MERGE using either direct upsert or temporary table strategy.

update(records, raise_error=True)[source]

Perform bulk UPDATE on records.

dbtk.etl.Lookup(table, key_cols, return_cols, *, cache=1, missing=None)[source]

One-liner database lookup for Table column configs.

class dbtk.etl.QueryLookup(query=None, filename=None, return_col=None, missing=None)[source]

Bases: object

Deferred PreparedStatement lookup for use as a Table column transform.

Accepts a SQL query string or file path. Cursor binding is deferred until the Table is initialized with a cursor. Use with field='*' to pass the full source row as bind variables — PreparedStatement ignores extra keys.

Parameters:
  • query (str, optional) – Inline SQL string.

  • filename (str or Path, optional) – Path to a SQL file.

  • return_col (str, optional) – Column name to extract from the result row. By default, the first column will be returned. Set to ‘*’ if you want to return multiple values to the next stage of the pipeline.

  • missing (any, optional) – Value to return when the query returns no rows. Default None.

Example

id_lookup_query = '''
    SELECT p.id, p.last_name, p.first_name
    FROM people p
    LEFT JOIN employees e ON e.id = p.id
    WHERE (p.email = :email
      OR e.tax_id = :tax_id)
'''

emp_cols = {
  'id': {'field': '*', 'primary_key': True, 'fn': dbtk.etl.QueryLookup(query=id_lookup_query)},
  'first_name': {'field': 'first_name', 'required': True},'
  ...
}
__init__(query=None, filename=None, return_col=None, missing=None)[source]
bind(cursor)[source]
class dbtk.etl.Table(name, columns, cursor, null_values=('', 'NULL', '<null>', '\\N'), is_temp=False)[source]

Bases: object

Stateful table class for ETL operations with schema-aware SQL generation.

The Table class provides a high-level interface for database operations by maintaining table metadata and current record state. It automatically generates parameterized SQL statements and handles field mapping, data transformations, and requirement validation.

Key features: - Field mapping: Map source record fields to database columns - Transformations: Apply functions to clean/transform data before database operations - Database functions: Use database-side functions (e.g., CURRENT_TIMESTAMP) - Default values: Provide constant values for columns - Requirement validation: Track required/nullable columns and validate before operations - Automatic SQL generation: Generate INSERT, UPDATE, SELECT, DELETE, MERGE statements - Operation tracking: Count successful operations and incomplete records via self.counts

Column Configuration

Each column in the columns dict is configured with a dict containing. Shorthand: An empty dict {} defaults the field name to the column name. For example: 'email': {} is equivalent to 'email': {'field': 'email'}.

  • field (str or list of str or ‘*’): Source field name(s) from input records. If list, extracts multiple fields as a list value. If ‘*’, passes the entire record to the transformation function instead of a single field value. If omitted, column is populated via ‘default’ or ‘db_expr’.

  • default (any or callable, optional): Default value for this column. Applied when the source field is missing, empty, or None. If callable (e.g. a zero-argument lambda), it is called at set_values() time so the value is resolved on each record rather than at column-definition time. Useful for values that come from runtime context (CLI args, job IDs, etc.) that aren’t available when columns are defined:

    conf_vars = {}  # populated later after table is initialized
    columns = {
        'user_id': {'default': lambda: conf_vars['user_id']},
    }
    
  • fn (Union[callable, List[callable], str], optional): Transformation function(s) to apply to the source value. callable: applied directly; list: functions applied in order (pipeline); str: magic shorthand (e.g. ‘int’, ‘maxlen:255’, ‘lookup:table:col:val’). See dbtk.etl.transforms.core.fn_resolver() for full shorthand reference.

  • db_expr (str, optional): Database-side function call (e.g., ‘CURRENT_TIMESTAMP’, ‘UPPER(#)’). Use ‘#’ as placeholder for the bind parameter. If specified without ‘#’, no bind parameter is created (useful for CURRENT_TIMESTAMP, etc.).

  • primary_key (bool, optional, default False): Marks column as primary key. Automatically implies required=True. Alias: key.

  • key (bool, optional, default False): Alias for primary_key. Either may be used interchangeably.

  • auto_key (bool, optional, default False): Convenience flag: sets both primary_key=True and auto_gen=True. Ideal for typical auto-increment primary keys.

  • nullable (bool, optional, default True): Controls whether the column must have a value for INSERT/UPDATE/MERGE. nullable=False is the anti-alias of required=True — both mark the column as required.

  • required (bool, optional, default False): Explicitly marks column as required. Anti-alias of nullable=False.

  • auto_gen (bool, optional, default False): If True, the column is omitted from INSERT statements. The database is expected to provide the value (e.g. AUTO_INCREMENT, DEFAULT CURRENT_TIMESTAMP, GENERATED ALWAYS, etc.). The column remains fully included in all other operations.

  • no_update (bool, optional, default False): If True, excludes column from UPDATE and MERGE operations.

  • bind_name (str, auto-generated): Sanitized parameter name for SQL bind variables. Automatically created from column name. Can not be specified in the column definition.

Example

import dbtk
from dbtk.etl import Table

with dbtk.connect('fire_nation_db') as db:
    cursor = db.cursor()

    soldiers = Table('fire_nation_army', {
        # Primary key from source 'recruit_id' field
        'soldier_id': {
            'field': 'recruit_id',
            'primary_key': True
        },

        # Required field with transformation
        'enlistment_date': {
            'field': 'join_date',
            'fn': 'date',
            'nullable': False
        },

        # Optional field with chained transformations
        'firebending_level': {
            'field': 'flame_skill',
            'fn': [str.strip, 'int']  # Clean then convert
        },

        # Constant value for all records
        'status': {
            'default': 'active'
        },

        # Database-side function with parameter
        'combat_name': {
            'field': 'full_name',
            'db_expr': 'generate_callsign(#)'
        },

        # Database-side function, no parameter
        'created_at': {
            'db_expr': 'CURRENT_TIMESTAMP'
        },

        # Multiple source fields as list
        'contact_methods': {
            'field': ['email', 'phone', 'pigeon']
        },

        # Empty dict shorthand - field name matches column name
        'rank': {},  # Equivalent to {'field': 'rank'}
        'division': {},

        # Whole record access for multi-field decisions
        'vip_status': {
            'field': '*',
            'fn': lambda record: 'VIP' if record.get('years_service', 0) > 10 else 'Regular'
        }
    }, cursor=cursor)

    # Set values from source record
    soldiers.set_values({
        'recruit_id': 'FN001',
        'join_date': '2024-03-15',
        'flame_skill': '  7  ',
        'full_name': 'Zuko'
    })

    # Execute operations
    soldiers.execute('insert')  # Automatically validates requirements
    print(soldiers.counts)  # {'insert': 1, 'update': 0, ...}
values

Current record values keyed by bind name (populated by set_values()).

Type:

dict

counts

Operation counters with keys: insert, update, delete, select, merge, records, incomplete.

Type:

dict

last_error

The error detail from the most recent execute() call. Set to None on success, or an dbtk.utils.ErrorDetail on DatabaseError (when raise_error=False). Cleared on every successful execution and on cursor() reassignment.

Type:

ErrorDetail or None

OPERATIONS = ('insert', 'select', 'update', 'delete', 'merge')
__init__(name, columns, cursor, null_values=('', 'NULL', '<null>', '\\N'), is_temp=False)[source]

Initialize Table with schema configuration and database cursor.

Creates a Table instance that manages the mapping between source data fields and database columns, along with all metadata needed for SQL generation and data validation.

Parameters:
  • name (str) – Database table name. Must be a valid SQL identifier.

  • columns (Dict[str, Dict[str, Any]]) – Dictionary mapping database column names to their configuration. Each column is configured with a dict containing options like ‘field’, ‘fn’, ‘default’, ‘db_expr’, ‘primary_key’, ‘nullable’, etc. See class docstring for complete column configuration options.

  • cursor (Cursor) – Database cursor instance. Provides connection to database and determines SQL parameter style (qmark, named, format, pyformat, numeric).

  • null_values (Tuple[str, ...]) – Tuple of string values that should be treated as NULL. When set_values() encounters these strings, they are converted to None. Default: (‘’, ‘NULL’, ‘<null>’, ‘N’)

  • is_temp (bool) – If True, allows underscore or hash prefix for temporary table names. Default: False

Raises:

ValueError – If table name or column names are invalid SQL identifiers.

Example

cursor = db.cursor()

table = Table('users', {
    'user_id': {'field': 'id', 'primary_key': True},
    'email': {'field': 'email_address', 'nullable': False},
    'created': {'db_expr': 'CURRENT_TIMESTAMP'}
}, cursor=cursor)
bind_name_column(bind_name)[source]
calc_update_excludes(record_fields=None)[source]
property columns: dict
property cursor: Cursor
db_expr_cols()[source]

Return list of all columns that use database expressions. Having db_expr on columns will make the table incompatible with some features such as all BulkSurge operations and DataSurge operations in pass_through mode.

execute(operation, raise_error=False)[source]

Execute the specified database operation using current record values.

Parameters:
  • operation (str) – One of 'insert', 'select', 'update', 'delete', 'merge'.

  • raise_error (bool, default False) – If True, re-raise DatabaseError instead of swallowing it. If False, the error is captured in last_error and 1 is returned.

Returns:

  • int – 0 on success; 1 when requirements are unmet (incomplete record) or when a DatabaseError occurs and raise_error=False.

  • Side Effects

  • ————

  • * counts[operation] incremented on success.

  • * counts['incomplete'] incremented when requirements are unmet.

  • * last_error set to None on success or an – dbtk.utils.ErrorDetail on database error.

Raises:
  • ValueError – If operation is not valid, or required key columns are missing.

  • DatabaseError – If the underlying execute fails and raise_error=True.

Return type:

int

fetch()[source]
force_positional()[source]

If cursor paramstyle is named style, switch to the corresponding positional style then rebuild all SQL.

get_bind_params(operation, mode=None)[source]
get_column_definitions(all_cols=False)[source]

Introspect database table columns to get type information.

Executes a SELECT * query against the database table and returns type information for columns defined in this Table object. Validates that all Table columns exist in the database.

Parameters:

all_cols (bool) – If True, return all columns from the database table. If False, return only columns on the Table object.

Returns:

(column_name, type_obj, internal_size, precision, scale, sql_type_def) where sql_type_def is the SQL type string like ‘VARCHAR(100)’ or ‘NUMBER(10,2)’

Return type:

List of tuples

Raises:

ValueError – If a column defined in this Table doesn’t exist in the database

Example

>>> table = Table('users', {'id': {}, 'email': {}}, cursor=cursor)
>>> col_defs = table.get_column_definitions()
>>> for name, type_obj, size, prec, scale, sql_type in col_defs:
...     print(f"{name}: {sql_type}")
get_sql(operation)[source]

Returns SQL for an operation. Automatically generates SQL if it hasn’t been created yet (lazy).

is_ready(operation)[source]

Fast O(1) check if the current record is ready for the given operation.

property key_cols: Tuple[str]
property name: str
property param_config: Dict[str, Tuple[str, ...]]
property paramstyle: str
refresh_readiness()[source]

Re-evaluate which operations can be executed based on current values.

property req_cols: Tuple[str]
reqs_met(operation)[source]
reqs_missing(operation)[source]
property row_count: int
set_values(record)[source]
class dbtk.etl.TableLookup(cursor, table, key_cols, return_cols=None, cache=1)[source]

Bases: object

Database table lookup with configurable caching for ETL transformations.

Performs lookups against database tables or views using PreparedStatement for efficient repeated queries. Supports three caching strategies:

  • CACHE_NONE (0): No caching, always query database

  • CACHE_LAZY (1): Cache results as encountered (default)

  • CACHE_PRELOAD (2): Preload entire table into memory upfront

Can operate in three modes: - Validator: No return_cols specified, returns bool indicating existence - Single lookup: One return column, returns scalar value - Multi lookup: Multiple return columns, returns row object (type depends on cursor)

Examples

Validator mode:

temple_validator = TableLookup(cursor, 'temples', key_cols='temple_id')
is_valid = temple_validator({'temple_id': 'eastern_air_temple'})  # True/False

Single value lookup:

state_lookup = TableLookup(cursor, 'states',
                           key_cols='name',
                           return_cols='abbreviation',
                           cache=TableLookup.CACHE_PRELOAD)  # Small table, preload it
abbrev = state_lookup({'name': 'California'})  # 'CA'

Multi-value lookup:

address_lookup = TableLookup(cursor, 'addresses',
                             key_cols='address_id',
                             return_cols=['street', 'city', 'state', 'zip'])
address = address_lookup({'address_id': 123})  # Record/dict/namedtuple

Multi-key lookup:

person_lookup = TableLookup(cursor, 'people',
                            key_cols=['first_name', 'last_name'],
                            return_cols='person_id',
                            cache=TableLookup.CACHE_NONE)  # Large table, don't cache
person_id = person_lookup({'first_name': 'Aang', 'last_name': 'Avatar'})

Use in Table config:

table = Table('citizens', {
    'state_abbrev': {'field': 'state_name', 'fn': state_lookup}
}, cursor=cursor)
CACHE_LAZY = 1
CACHE_NONE = 0
CACHE_PRELOAD = 2
__init__(cursor, table, key_cols, return_cols=None, cache=1)[source]

Initialize TableLookup with table schema and caching strategy.

Parameters:
  • cursor (Cursor) – Database cursor for executing queries

  • table (str) – Table or view name to query

  • key_cols (str or list of str) – Column name(s) to use in WHERE clause. Cannot be empty.

  • return_cols (str, list of str, or None) – Column(s) to return. If None, operates as validator (returns bool)

  • cache (int, default 1 (CACHE_LAZY)) – Caching strategy: - 0 (CACHE_NONE): No caching, always query database - 1 (CACHE_LAZY): Cache results as encountered - 2 (CACHE_PRELOAD): Preload entire table into memory

Raises:

ValueError – If key_cols is empty or cache value is invalid

property cursor
dbtk.etl.Validate(table, key_cols, *, cache=1, on_fail='warn')[source]

One-liner validation — returns original value if key exists in table.

dbtk.etl.column_defs_from_db(cursor, table_name, add_comments=False)[source]

Generate column definitions from database table schema.

Inspects the database table structure and returns a Python dictionary string representation of column configurations ready to use with the Table class.

Parameters:
  • cursor – Database cursor (any cursor type)

  • table_name (str) – Name of table to analyze (supports schema.table format)

  • add_comments (bool) – Include table/column comments from database metadata

Returns:

String containing Python dict of column definitions

Return type:

str

Example

>>> print(column_defs_from_db(cursor, 'users'))
{
    'id': {'field': 'id', 'primary_key': True},
    'name': {'field': 'name', 'nullable': False},
    'email': {'field': 'email'},
    'created_at': {'db_expr': 'CURRENT_TIMESTAMP'}
}
>>> # Copy output into your code:
>>> table = Table('users', columns={
...     'id': {'field': 'id', 'primary_key': True},
...     'name': {'field': 'name', 'nullable': False},
...     'email': {'field': 'email'},
...     'created_at': {'db_expr': 'CURRENT_TIMESTAMP'}
... }, cursor=cursor)

ETL Managers

Orchestration tools for multi-stage, resumable ETL processes.

IdentityManager provides lightweight, incremental identity resolution for imports where a reliable source primary key exists in source data.

class dbtk.etl.managers.EntityStatus[source]

Bases: object

Status constants for the entity resolution lifecycle.

PENDING

Entity has been registered but resolution has not yet been attempted.

Type:

str

RESOLVED

Entity was successfully matched; target_key is populated.

Type:

str

STAGED

Entity exists in a staging table but has not yet been matched to the target system (e.g. an ERP record not yet confirmed).

Type:

str

ERROR

An error occurred while creating or updating the entity. Possibly impacting downstream processing.

Type:

str

SKIPPED

Resolution was intentionally bypassed for this entity.

Type:

str

NOT_FOUND

Resolution was attempted but no matching record was found in the target.

Type:

str

ERROR = 'error'
NOT_FOUND = 'not_found'
PENDING = 'pending'
RESOLVED = 'resolved'
SKIPPED = 'skipped'
STAGED = 'staged'
VALUES = ('pending', 'resolved', 'staged', 'error', 'skipped', 'not_found')
class dbtk.etl.managers.IdentityManager(source_key, target_key, resolver=None, alternate_keys=None)[source]

Bases: object

Lightweight, resumable identity-resolution cache for ETL imports.

Maps source-system primary keys to target-system identifiers using a SQL resolver query. Resolved entities are stored as dbtk.record.Record instances keyed by source_key and enriched with status, messages, errors, and any configured alternate_keys.

State can be persisted to JSON between runs with save_state() and restored with load_state(), allowing long-running and multi-stage imports to be resumed without re-querying already-resolved entities.

Parameters:
  • source_key (str) – Field name for the source-system primary key (e.g. 'student_id').

  • target_key (str) – Field name for the target-system primary key that the resolver returns (e.g. 'erp_person_id').

  • resolver (PreparedStatement or TableLookup, optional) – Query used to look up a target_key from a source_key. Can be set or replaced later via the resolver property.

  • alternate_keys (list of str, optional) – Additional key fields to track per entity (e.g. ['staging_id', 'erp_vendor_id']). These are persisted alongside target_key in saved state.

entities

Mapping of source_key value → resolved dbtk.record.Record. Each Record contains all resolver columns plus _status, _errors, _messages, and any alternate_keys.

Type:

dict

Example

stmt = cursor.prepare_file('sql/resolve_student.sql')
im = IdentityManager('student_id', 'erp_person_id', resolver=stmt,
                     alternate_keys=['banner_id'])

for row in reader:
    entity = im.resolve(row)
    if entity['_status'] == EntityStatus.RESOLVED:
        table.set_values(row)
        if table.execute('insert'):          # returns 1 on DB error
            im.add_error(row['student_id'], table.last_error)
    else:
        im.add_error(row['student_id'],
                     ErrorDetail('Not found', field='student_id'))

im.save_state('state/students.json')

# to initialize from saved state
em = IdentityManager.load_state('state/students.json', resolver=stmt)
em.batch_resolve([EntityStatus.STAGED])
__init__(source_key, target_key, resolver=None, alternate_keys=None)[source]

Initialize IdentityManager.

Parameters:
  • source_key (str) – Field name of the source-system primary key.

  • target_key (str) – Field name of the target-system primary key. Must be returned by the resolver. Set equal to source_key to skip ID resolution.

  • resolver (PreparedStatement or TableLookup, optional) – Resolution query. Accepts either type; TableLookup is unwrapped to its underlying PreparedStatement.

  • alternate_keys (list of str, optional) – Additional key fields to persist and track per entity.

add_error(source_id, error)[source]

Append an dbtk.utils.ErrorDetail to an entity’s _errors list.

Parameters:
  • source_id (str) – Source-system key identifying the entity (must already be cached).

  • error (ErrorDetail) – Structured error to attach to the entity.

add_message(source_id, message)[source]

Append an informational message to an entity’s _messages list.

Parameters:
  • source_id (str) – Source-system key identifying the entity (must already be cached).

  • message (str) – Message text to append.

batch_resolve(additional_statuses=None)[source]

Re-run the resolver for all entities whose status is PENDING or NOT_FOUND.

Useful after bulk-loading staging data when some entities could not be resolved on first pass. Initializes the record factory from a dry-run resolver call if it has not yet been set up.

Parameters:

additional_statuses (optional list of str) – Additional statuses to resolve in addition to EntityStatus.NOT_FOUND and EntityStatus.PENDING

calc_stats()[source]

Count entities by status.

Returns:

Mapping of each EntityStatus value to the number of entities currently at that status.

Return type:

dict

Example

stats = im.calc_stats()
print(stats)
# {'pending': 0, 'resolved': 142, 'staged': 5, 'error': 3, ...}
entities: Dict[Any, Record]
get_id(source_id, id_type)[source]

Retrieve a target or alternate key value for a cached entity.

Parameters:
  • source_id (str) – Source-system key identifying the entity (must already be cached).

  • id_type (str) – Either target_key or one of alternate_keys.

Returns:

The stored identifier value, or None if not yet set.

Return type:

str or None

Raises:

ValueError – If id_type is not the target_key or a registered alternate_key.

classmethod load_state(path, resolver=None)[source]

Restore an IdentityManager from a previously saved JSON file.

Re-creates the entity Record factory from field_order stored in the file. Deserializes _errors lists back to dbtk.utils.ErrorDetail instances.

Parameters:
  • path (str or Path) – Path to the JSON file written by save_state().

  • resolver (PreparedStatement or TableLookup, optional) – Resolver to attach to the restored instance. If the saved file has no field_order, the resolver is used as a fallback to initialize the record factory.

Returns:

Fully restored instance with all entities re-hydrated.

Return type:

IdentityManager

resolve(value)[source]

Resolve a source key to a target entity, caching the result.

Parameters:

value (scalar, dict, or Record) –

  • scalar — treated as the raw source_key value. The resolver is called with {source_key: value} and the returned entity is cached but the caller’s record is not mutated.

  • dict or Recordsource_key is extracted from the mapping. On a successful resolution the target_key is written back into the caller’s record.

Returns:

The cached/resolved entity Record, or None if source_key cannot be found in value.

Return type:

Record or None

Raises:

ValueError – If the resolved target_key conflicts with a value already present in the caller’s record.

property resolver: PreparedStatement | None

The active PreparedStatement used for resolution queries.

save_state(path)[source]

Persist the current entity cache to a JSON file.

The file captures source_key, target_key, alternate_keys, field_order (for factory reconstruction), summary stats, and the full entity dict. dbtk.utils.ErrorDetail objects are serialized to {"message": ..., "field": ..., "code": ...} dicts.

Parameters:

path (str or Path) – Destination file path. Parent directory must exist.

set_id(source_id, id_type, value)[source]

Store a target or alternate key value for a cached entity.

Parameters:
  • source_id (str) – Source-system key identifying the entity (must already be cached).

  • id_type (str) – Either target_key or one of alternate_keys.

  • value (str) – The identifier value to store.

Raises:

ValueError – If id_type is not the target_key or a registered alternate_key.

class dbtk.etl.managers.ValidationCollector(lookup=None, return_col=None)[source]

Bases: object

Callable collector/enricher for fn pipelines.

During row-wise processing:
  • Collects unique codes

  • Optionally enriches them with descriptions using TableLookup

  • Can return a specific field from the lookup result instead of the raw code

Supports:
  • Preload mode: instant enrichment, perfect valid/new split

  • Lazy mode: enrich on first encounter

  • No lookup: pure collection

Set return_col to the field name you want returned; None (default) returns the raw code.

__init__(lookup=None, return_col=None)[source]
added: Dict[Any, Any]
collect_new(code, **fields)[source]

Attach extra fields to a newly-encountered code for later bulk insertion.

No-ops immediately when the preceding __call__ did not add a new code (_recently_added is False), so it is safe to call unconditionally on every record. First annotation wins — subsequent calls for the same code are ignored.

Parameters:
  • code (Any) – The code value that was passed to the validator (used as a cross-check and as the key into added).

  • **fields – Extra columns to store, e.g. stvcipc_desc=record.cip_discipline.

Example

cip_validator = ValidationCollector(lookup=cip_lookup)
for record in reader:
    stvmajr.set_values(record)      # triggers cip_validator(record.cip_code)
    cip_validator.collect_new(record.cip_code, stvcipc_desc=record.cip_discipline)
existing: Dict[Any, Any]
get_all()[source]

Get all codes (existing + added) as a set.

Useful for filtering with tools like polars that need a set/list of valid values rather than a callable.

Returns:

Union of existing codes and added codes

Return type:

set

Example

# Collect valid titles
title_collector = ValidationCollector()
for record in titles:
    title_collector(record['tconst'])

# Use with polars filtering
all_titles = title_collector.get_all()
df = pl.scan_csv('principals.tsv.gz').filter(
    pl.col('tconst').is_in(all_titles)
)

# Or with dbtk reader filtering
reader.add_filter(lambda r: r.tconst in title_collector)  # Uses __contains__
get_all_mapping()[source]
get_valid_mapping()[source]

ETL Transforms

Data transformation and parsing functions.

This package provides utilities for: - Date/time parsing with timezone support (transforms.datetime) - Phone number validation and formatting (transforms.phone) - Email validation and cleaning (transforms.email) - Database-backed code validation and lookups (transforms.database) - Basic data type conversions and text manipulation (transforms.core - exported here)

Core functions are available directly from this module for convenience. Specialized functionality requires explicit submodule imports.

Usage:

# Core functions - direct import from dbtk.etl.transforms import get_int, coalesce, capitalize

# Specialized functions - submodule import from dbtk.etl.transforms.datetime import parse_date, parse_datetime from dbtk.etl.transforms.phone import Phone, PhoneFormat from dbtk.etl.transforms.email import email_validate from dbtk.etl.transforms.database import CodeValidator

Optional Dependencies:
phonenumbers - For robust international phone number support

pip install phonenumbers

dateutil - For additional date/time parsing flexibility

pip install python-dateutil

usaddress - For parsing and cleaning address strings

pip install usaddress

dbtk.etl.transforms.Lookup(table, key_cols, return_cols, *, cache=1, missing=None)[source]

One-liner database lookup for Table column configs.

class dbtk.etl.transforms.TableLookup(cursor, table, key_cols, return_cols=None, cache=1)[source]

Bases: object

Database table lookup with configurable caching for ETL transformations.

Performs lookups against database tables or views using PreparedStatement for efficient repeated queries. Supports three caching strategies:

  • CACHE_NONE (0): No caching, always query database

  • CACHE_LAZY (1): Cache results as encountered (default)

  • CACHE_PRELOAD (2): Preload entire table into memory upfront

Can operate in three modes: - Validator: No return_cols specified, returns bool indicating existence - Single lookup: One return column, returns scalar value - Multi lookup: Multiple return columns, returns row object (type depends on cursor)

Examples

Validator mode:

temple_validator = TableLookup(cursor, 'temples', key_cols='temple_id')
is_valid = temple_validator({'temple_id': 'eastern_air_temple'})  # True/False

Single value lookup:

state_lookup = TableLookup(cursor, 'states',
                           key_cols='name',
                           return_cols='abbreviation',
                           cache=TableLookup.CACHE_PRELOAD)  # Small table, preload it
abbrev = state_lookup({'name': 'California'})  # 'CA'

Multi-value lookup:

address_lookup = TableLookup(cursor, 'addresses',
                             key_cols='address_id',
                             return_cols=['street', 'city', 'state', 'zip'])
address = address_lookup({'address_id': 123})  # Record/dict/namedtuple

Multi-key lookup:

person_lookup = TableLookup(cursor, 'people',
                            key_cols=['first_name', 'last_name'],
                            return_cols='person_id',
                            cache=TableLookup.CACHE_NONE)  # Large table, don't cache
person_id = person_lookup({'first_name': 'Aang', 'last_name': 'Avatar'})

Use in Table config:

table = Table('citizens', {
    'state_abbrev': {'field': 'state_name', 'fn': state_lookup}
}, cursor=cursor)
CACHE_LAZY = 1
CACHE_NONE = 0
CACHE_PRELOAD = 2
__init__(cursor, table, key_cols, return_cols=None, cache=1)[source]

Initialize TableLookup with table schema and caching strategy.

Parameters:
  • cursor (Cursor) – Database cursor for executing queries

  • table (str) – Table or view name to query

  • key_cols (str or list of str) – Column name(s) to use in WHERE clause. Cannot be empty.

  • return_cols (str, list of str, or None) – Column(s) to return. If None, operates as validator (returns bool)

  • cache (int, default 1 (CACHE_LAZY)) – Caching strategy: - 0 (CACHE_NONE): No caching, always query database - 1 (CACHE_LAZY): Cache results as encountered - 2 (CACHE_PRELOAD): Preload entire table into memory

Raises:

ValueError – If key_cols is empty or cache value is invalid

property cursor
exhaustive: bool
dbtk.etl.transforms.Validate(table, key_cols, *, cache=1, on_fail='warn')[source]

One-liner validation — returns original value if key exists in table.

dbtk.etl.transforms.capitalize(val)[source]

Title case a string, but only if it’s already in all uppercase or all lowercase.

Parameters:

val (Any) – String value to capitalize

Returns:

Title-cased string if applicable, otherwise original value

Return type:

Any

Example

capitalize(“HELLO WORLD”) # “Hello World” capitalize(“hello world”) # “Hello World” capitalize(“Hello World”) # “Hello World” (unchanged)

dbtk.etl.transforms.coalesce(vals)[source]

Returns the first non-empty and non-None value from a list or tuple.

Parameters:

vals (List[Any] | tuple) – List or tuple of values to check

Returns:

First non-empty, non-None value, or the original input if not a list/tuple

Return type:

Any

Example

coalesce([None, ‘’, ‘first’, ‘second’]) # “first” coalesce([0, 1, 2]) # 0 coalesce([None, None]) # None

dbtk.etl.transforms.email_clean(val)[source]

Clean and normalize email address.

Parameters:

val (str) – Email address string

Returns:

Cleaned email address (lowercase, stripped) or empty string if invalid

Return type:

str

Example

email_clean(” User@Example.COM “) # “user@example.com” email_clean(“invalid.email”) # “” email_clean(”user@domain.co”) # “user@domain.co

dbtk.etl.transforms.email_validate(val)[source]

Validate email address format.

Uses a practical regex that catches most valid emails and rejects obviously invalid ones. Not RFC 5322 compliant but good enough for most data validation needs.

Parameters:

val (str) – Email address string to validate

Returns:

True if email appears valid, False otherwise

Return type:

bool

Example

email_validate(”user@example.com”) # True email_validate(”user.name@domain.co”) # True email_validate(“invalid.email”) # False email_validate(“@domain.com”) # False email_validate(“”) # False

dbtk.etl.transforms.fn_resolver(shorthand)[source]

Convert a concise string shorthand into a transformation function.

Syntax

Shorthands take one of three forms:

name                 simple name, no arguments          e.g. 'int'
name:arg1:arg2       name with colon-separated args     e.g. 'maxlen:200'
type.method:arg1     cast to type, call method          e.g. 'str.rjust:+9:0'

Parameters passed after the first : are strings by default. Prefix an integer with + for non-negative or - for negative to force it to int (required where the called function expects an integer, e.g. str.rjust). True, False, and None are parsed to their Python equivalents.

Names

Type conversion — None-safe (empty string or None returns None):

int, float, bool digits strip non-digit characters number parse formatted numbers (e.g. '1,234.56')

Date / time:

date, datetime, time, timestamp

String:

maxlen:50 truncate to 50 characters split_and_get:0 first comma-delimited field split_and_get:1:      ``       second tab-delimited field ``str.method all string methods (capitalize, upper, lower, split) are available using cast and call format, see below.

List / Tuple:

nth:0 first element of a sequence (nth:-1 → last) coalesce first non-empty, non-None value

Boolean indicators:

indicator True → 'Y', False/None → None indicator:Y:N True → 'Y', False → 'N' indicator:N:Y True → 'N', False → 'Y' (inverted) indicator:None:Y True → None, False → 'Y' (active-flag inversion) indicator:1:0 True → '1', False → '0'

Cast and call (type.method):

str.upper str(val).upper() str.strip:=" strip = and " chars (e.g. Excel ="val") str.lstrip:0 strip leading zeros str.split:, split on comma → list str.rjust:+9:0 right-justify to width 9, padded with '0' str.ljust:+10: ``             left-justify to width 10, padded with space ``datetime.strftime:%Y-%m-%d format a parsed datetime int.to_bytes:+4:big int(val).to_bytes(4, 'big') float.hex float(val).hex()

Supported cast types: int, float, str, datetime.

Database:

lookup:table:key_col:val_col look up val_col from table by key_col validate:table:key_col assert key_col exists in table

returns:

Single-argument function; or a _DeferredTransform for lookup:/validate: that must be bound to a cursor before use.

rtype:

callable or _DeferredTransform

raises ValueError:

If the shorthand is not recognized.

Examples

>>> fn_resolver('int')('123')
123
>>> fn_resolver('maxlen:10')('supercalifragilistic')
'supercalif'
>>> fn_resolver('indicator:N:Y')(True)
'N'
>>> fn_resolver('str.rjust:+9:0')('123')
'000000123'
>>> fn_resolver('str.split:,')('a,b,c')
['a', 'b', 'c']
dbtk.etl.transforms.format_digits(val, pattern)[source]

Format a number string according to a pattern.

Extracts digits from the value and applies the pattern if the number of digits matches the number of ‘#’ characters in the pattern.

Parameters:
  • val (Any) – Number value to format (string or numeric)

  • pattern (str) – Format pattern using ‘#’ for digit positions

Returns:

Formatted string if digit count matches, otherwise original string

Return type:

str

Example

format_digits(‘8001234567’, ‘(###) ###-####’) # “(800) 123-4567” format_digits(‘012345678’, ‘###-##-####’) # “012-34-5678” format_digits(‘(800) 123-4567’, ‘###.###.####’) # “800.123.4567” format_digits(‘12345’, ‘###-##-####’) # “12345” (wrong length)

dbtk.etl.transforms.get_bool(val)[source]

Parse a value as a boolean.

Parameters:

val (Any) – Value to parse

Returns:

True for truthy values, False for falsy values, None for None

Return type:

bool | None

Truthy: True, ‘T’, ‘TRUE’, ‘YES’, ‘Y’, ‘1’, 1, 1.0, non-zero numbers Falsy: False, ‘F’, ‘FALSE’, ‘NO’, ‘N’, ‘0’, 0, 0.0, empty string None: None

Example

get_bool(True) # True get_bool(‘yes’) # True get_bool(‘Y’) # True get_bool(1) # True get_bool(1.0) # True get_bool(False) # False get_bool(‘no’) # False get_bool(0) # False get_bool(None) # None

dbtk.etl.transforms.get_digits(val)[source]

Extract only numeric digits from a value, preserving leading zeros.

Removes all non-digit characters except leading +/- signs. Returns a string to preserve leading zeros.

Parameters:

val (Any) – Value to extract digits from

Returns:

String containing only digits (and possibly leading +/-), or empty string if no value

Return type:

str

Example

get_digits(“(800) 123-4567”) # “8001234567” get_digits(“012-34-5678”) # “012345678” get_digits(“$-42.35”) # “4235” get_digits(“+1-555-123-4567”) # “15551234567”

dbtk.etl.transforms.get_float(val)[source]

Convert value to float.

Handles currency symbols and formatting by using to_number().

Parameters:

val (Any) – Value to convert

Returns:

Float value or None if val is falsy or cannot be converted

Return type:

float | None

Example

get_float(“123.45”) # 123.45 get_float(“$123.45”) # 123.45 get_float(“123”) # 123.0 get_float(None) # None

dbtk.etl.transforms.get_int(val)[source]

Convert value to integer.

Handles string numbers with decimals and currency symbols by using to_number().

Parameters:

val (Any) – Value to convert

Returns:

Integer value or None if val is falsy or cannot be converted

Return type:

int | None

Example

get_int(“123”) # 123 get_int(“123.45”) # 123 get_int(“$123.45”) # 123 get_int(123.45) # 123 get_int(None) # None

dbtk.etl.transforms.indicator(val, true_val='Y', false_val=None)[source]

Convert a value to a boolean indicator.

Uses get_bool() for consistent boolean interpretation.

Parameters:
  • val (Any) – Value to test for truthiness

  • true_val (str) – Value to return if val is truthy (default: ‘Y’)

  • false_val (Any) – Value to return if val is falsy (default: None)

Returns:

true_val if val is truthy, false_val otherwise, or None if val is None

Return type:

Any

Example

indicator(True) # “Y” indicator(‘yes’) # “Y” indicator(1) # “Y” indicator(False) # None indicator(0, ‘T’, ‘F’) # “F” indicator(None) # None

dbtk.etl.transforms.normalize_whitespace(val)[source]

Normalize whitespace in a string.

  • Strips leading and trailing whitespace

  • Collapses multiple spaces/tabs/newlines into single spaces

Parameters:

val (Any) – Value to normalize

Returns:

String with normalized whitespace, or empty string if no value

Return type:

str

Example

normalize_whitespace(” hello world “) # “hello world” normalize_whitespace(“hellonnworld”) # “hello world” normalize_whitespace(“hellottworld”) # “hello world”

dbtk.etl.transforms.parse_date(val, default_tz=None)[source]

Parse various date formats to date object.

Parameters:
  • val (Any) – Date string, datetime object, or other value

  • default_tz (str | None) – Default timezone (not used for dates, kept for consistency)

Returns:

date object or None if parsing fails

Return type:

date | None

Example

parse_date(“2024-01-15”) # -> date(2024, 1, 15) parse_date(“01/15/2024”) # -> date(2024, 1, 15) parse_date(“15 Jan 2024”) # -> date(2024, 1, 15)

dbtk.etl.transforms.parse_datetime(val)[source]

Parse various datetime formats to datetime object.

Preserves timezone if present in the input string, otherwise returns naive datetime. Use parse_datetimetz() to automatically apply default timezone from settings.

Parameters:

val (Any) – Datetime string, date object, or other value

Returns:

datetime object or None if parsing fails

Return type:

datetime | None

Example

parse_datetime(“2024-01-15 14:30:00”) # -> naive datetime parse_datetime(“2024-01-15T14:30:00Z”) # -> datetime with UTC parse_datetime(“01/15/2024 2:30 PM EST”) # -> datetime with EST

dbtk.etl.transforms.phone_clean(val, country=None)[source]

Clean and format phone number.

Parameters:
  • val (Any) – Phone number value to clean

  • country (str | None) – ISO country code (default from settings)

Returns:

Formatted phone number or empty string if invalid

Return type:

str

Example

phone_clean(“555-123-4567”) # “(555) 123-4567” phone_clean(” (555) 123-4567 “) # “(555) 123-4567” phone_clean(“invalid”) # “”

dbtk.etl.transforms.phone_validate(val, country=None)[source]

Validate phone number.

Parameters:
  • val (Any) – Phone number value to validate

  • country (str | None) – ISO country code (default from settings)

Returns:

True if valid phone number, False otherwise

Return type:

bool

Example

phone_validate(“(555) 123-4567”) # True phone_validate(“555-1234”) # False phone_validate(“invalid”) # False

dbtk.etl.transforms.split_and_get(val, index, delimiter=',')[source]

Get an item from a delimited string list.

Parameters:
  • val (str) – Delimited string

  • index (int) – Zero-based index of item to retrieve

  • delimiter (str) – Delimiter character (default: ‘,’)

Returns:

Item at specified index (stripped), or None if index out of range

Return type:

str | None

Example

split_and_get(“a,b,c”, 1) # “b” split_and_get(“a|b|c”, 0, “|”) # “a” split_and_get(“a,b”, 5) # None

dbtk.etl.transforms.standardize_address(address, style='standard')[source]

Standardize address formatting.

Parameters:
  • address (str) – Address string to standardize

  • style (str) – Format style (‘standard’, ‘single_line’, ‘multiline’)

Returns:

Standardized address string

Return type:

str

Example

standardize_address(“123 main street, springfield il 62701”) # “123 Main St, Springfield, IL 62701”

dbtk.etl.transforms.to_number(val)[source]

Convert a value to a number by extracting digits and converting to float.

Strips out all non-numeric characters (except leading +/- and decimal point).

Parameters:

val (Any) – Value to convert to number

Returns:

Float value or None if no valid number can be extracted

Return type:

float | None

Example

to_number(“$42.35”) # 42.35 to_number(“$-42.35”) # -42.35 to_number(“1,234.56”) # 1234.56 to_number(“N/A”) # None

dbtk.etl.transforms.validate_us_address(address)[source]

Validate US address format and components.

Parameters:

address (str) – Address string to validate

Returns:

True if valid US address, False otherwise

Return type:

bool

Example

validate_us_address(“123 Main St, Springfield, IL 62701”) # True validate_us_address(“123 Main St, Springfield, XX 12345”) # False

Logging Utilities

Integration script logging with timestamped files and error tracking:

Logging utilities for integration scripts.

Provides convenient logging setup for ETL and integration scripts that follow the pattern of creating timestamped log files like script_name_YYYYMMDD_HHMMSS.log

class dbtk.logging_utils.ErrorCountHandler(error_log_path=None, formatter=None)[source]

Bases: Handler

Custom handler that counts ERROR and CRITICAL level messages and lazily creates error log.

__init__(error_log_path=None, formatter=None)[source]
emit(record)[source]

Count errors and lazily create error log file on first error.

dbtk.logging_utils.cleanup_old_logs(log_dir=None, retention_days=None, pattern='*.log', dry_run=False)[source]

Remove log files older than retention period.

Parameters:
  • log_dir (str | None) – Directory to clean (defaults to config setting or ‘./logs’)

  • retention_days (int | None) – Keep logs newer than this many days (defaults to config or 30)

  • pattern (str) – Glob pattern for log files (default: '*.log')

  • dry_run (bool) – If True, only report what would be deleted without actually deleting

Returns:

List of deleted (or would-be-deleted if dry_run) file paths

Return type:

List[str]

Example

import dbtk

# Clean logs older than 30 days (from config)
deleted = dbtk.cleanup_old_logs()
print(f"Deleted {len(deleted)} old log files")

# Custom retention
deleted = dbtk.cleanup_old_logs(retention_days=7)

# Dry run to see what would be deleted
would_delete = dbtk.cleanup_old_logs(dry_run=True)
print(f"Would delete: {would_delete}")

# Clean specific pattern
deleted = dbtk.cleanup_old_logs(pattern="error_*.log")
dbtk.logging_utils.errors_logged()[source]

Check if any ERROR or CRITICAL messages were logged during this run.

Returns the path to the log file containing errors if any were logged, None otherwise. This allows integration scripts to easily detect if errors occurred and take action (e.g., send notification emails).

Returns:

Path to error log (if split_errors=True) or main log (if split_errors=False) when errors were logged. Returns None if no errors were logged or setup_logging() was not called.

Return type:

str or None

Example

import dbtk
import logging

dbtk.setup_logging('my_integration')

# ... do ETL work ...
try:
    process_data()
except Exception as e:
    logging.error(f"Processing failed: {e}")

# Check if errors occurred
error_log = dbtk.errors_logged()
if error_log:
    print(f"Errors detected! See: {error_log}")
    # send_notification_email(subject="Integration errors", attachment=error_log)
else:
    print("Integration completed successfully")
dbtk.logging_utils.setup_logging(script_name=None, log_dir=None, level=None, split_errors=None, console=None)[source]

Configure logging for integration scripts.

Creates log files with pattern: {script_name}_{datetime}.log Optionally creates separate error log: {script_name}_{datetime}_error.log

Parameters:
  • script_name (str | None) – Base name for log files (defaults to script filename without extension)

  • log_dir (str | None) – Directory for log files (defaults to config setting or ‘./logs’)

  • level (str | None) – Logging level string - DEBUG, INFO, WARNING, ERROR (defaults to config or ‘INFO’)

  • split_errors (bool | None) – Create separate error log file (defaults to config or True)

  • console (bool | None) – Also log to console/stdout (defaults to config or True)

Returns:

Tuple of (log_file_path, error_log_path or None)

Return type:

Tuple[str, str | None]

Example

import dbtk

# Simple - uses defaults from config
dbtk.setup_logging('fire_nation_etl')

# Custom settings
dbtk.setup_logging('my_script', log_dir='/var/log/etl', level='DEBUG')

# Single log file per day (set filename_format in config to '%Y%m%d')
dbtk.setup_logging('daily_job')

# Single rolling log file (set filename_format in config to '')
dbtk.setup_logging('rolling_log')

Note

To customize filename patterns, set ‘logging.filename_format’ in dbtk.yml: - ‘%Y%m%d_%H%M%S’ - One log per run with date and time (default) - ‘%Y%m%d’ - One log per day - ‘’ - Single rolling log file (overwrites)

Command Line Interface

DBTK provides command-line tools for managing encryption keys and configuration files.

# Check dependencies, drivers, and configuration
dbtk checkup

# Interactive configuration setup wizard
dbtk config-setup

# Generate encryption key
dbtk generate-key

# Store encryption key in system keyring
dbtk store-key [key] [--force]

# Encrypt passwords in config file
dbtk encrypt-config [config_file]

# Encrypt a single password
dbtk encrypt-password [password]

# Migrate config to new encryption key
dbtk migrate-config old_file new_file [--new-key KEY]

The CLI is implemented in dbtk.cli module:

dbtk.cli.checkup()[source]

Check which optional dependencies are installed.

dbtk.cli.main()[source]

Formats

Pre-defined column layouts for common multi-record fixed-width EDI-like formats.

Use with EDIReader or EDIWriter:

from dbtk.formats.edi import ACH_COLUMNS from dbtk.readers.fixed_width import EDIReader from dbtk.writers.fixed_width import EDIWriter

with open(‘in.ach’) as fp, EDIWriter(‘out.ach’, ACH_COLUMNS) as w:

w.write_batch(EDIReader(fp, ACH_COLUMNS))