API Reference
Configuration
Configuration management for database connections. Supports YAML configuration files with optional password encryption and global settings.
- class dbtk.config.ConfigManager(config_file=None)[source]
Bases:
objectManage DBTK configuration from YAML files.
ConfigManager handles loading and parsing YAML configuration files that define database connections, encrypted passwords, and global settings. It searches for configuration files in standard locations, validates the structure, and provides methods for accessing connections and passwords.
The manager supports encrypted passwords using Fernet symmetric encryption, environment variable substitution, and automatic sample config generation for new users.
Configuration File Structure
# dbtk.yml settings: default_timezone: UTC default_country: US default_paramstyle: named connections: my_db: type: postgres host: localhost database: myapp user: admin encrypted_password: gAAAAABh... passwords: api_key: encrypted_password: gAAAAABh... description: API key for external service
Configuration Locations
ConfigManager searches for configuration files in this order:
File specified in config_file parameter
./dbtk.yml(current directory)./dbtk.yaml(current directory)~/.config/dbtk.yml(user config directory)~/.config/dbtk.yaml(user config directory)
If no config is found, creates a sample config at
~/.config/dbtk_sample.yml.- param config_file:
Path to YAML config file. If None, searches standard locations.
- type config_file:
str or Path, optional
- config_file
Path to the loaded configuration file
- Type:
Path
- config
Parsed configuration dictionary
- Type:
dict
Example
from dbtk.config import ConfigManager # Load from default location config_mgr = ConfigManager() # Access connection settings conn_params = config_mgr.get_connection('production_db') # Get encrypted password api_key = config_mgr.get_password('external_api') # Load specific config file config_mgr = ConfigManager('/path/to/custom.yml')
See also
dbtk.connectConnect to database using config
generate_encryption_keyCreate encryption key for passwords
encrypt_config_fileEncrypt passwords in config file
Notes
YAML files must have .yml or .yaml extension
Connections require ‘type’ field (postgres, oracle, mysql, etc.)
Encrypted passwords require DBTK_ENCRYPTION_KEY environment variable
Environment variables can be used with ${VAR_NAME} syntax
Sample config is created at ~/.config/dbtk_sample.yml on first run if no config exists
- __init__(config_file=None)[source]
Initialize config manager and load configuration.
- Parameters:
config_file (str or Path, optional) –
Path to YAML config file. If None, searches for:
dbtk.ymlin current directorydbtk.yamlin current directory~/.config/dbtk.yml~/.config/dbtk.yaml
- Raises:
FileNotFoundError – If no config file found in any search location
ValueError – If config file is invalid or malformed
Example
# Use default config location config = ConfigManager() # Use specific config file config = ConfigManager('/etc/dbtk/production.yml') # Config auto-creates sample if none exists config = ConfigManager() # Creates ~/.config/dbtk.yml if needed
- add_password(name, password, description=None, encrypt=True)[source]
Add or update a password entry.
- Parameters:
name (str) – Password name/key
password (str) – Password value
description (str) – Optional description
encrypt (bool) – Whether to encrypt the password (default: True)
- get_password(name)[source]
Get a stored password by name.
- Parameters:
name (str) – Password name/key
- Returns:
Decrypted password string
- Raises:
ValueError – If password not found or decryption fails
- Return type:
str
- get_setting(key, default=None)[source]
Get a setting value from the config.
- Parameters:
key (str) – Setting key (supports dot notation like ‘database.timeout’)
default (Any) – Default value if key not found
- Returns:
Setting value or default
- Return type:
Any
Example
timeout = config.get_setting(‘database.timeout’, 30) tz = config.get_setting(‘default_timezone’, ‘UTC’)
- dbtk.config.connect(name, password=None, config_file=None)[source]
Connect to a named database from configuration.
- Parameters:
name (str) – Connection name from config file
password (str) – Optional password if not stored in config
config_file (str | Path | None) – Optional path to config file
- Returns:
Database connection instance
- Return type:
Example
db = connect(‘prod_warehouse’) cursor = db.cursor() cursor.execute(“SELECT * FROM users”)
- dbtk.config.diagnose_config(config_file=None)[source]
Full config health check using a real ConfigManager instance.
- dbtk.config.encrypt_config_file(filename=None)[source]
CLI Utility to encrypt all passwords in a config file.
- dbtk.config.encrypt_password(password=None, encryption_key=None)[source]
CLI utility function to encrypt a password.
- Parameters:
password (str) – Password to encrypt (if None, prompts for input)
encryption_key (str) – Optional encryption key. If None, uses DBTK_ENCRYPTION_KEY env var
- Returns:
Encrypted password
- Return type:
str
- dbtk.config.generate_encryption_key()[source]
Generate a random encryption key.
This function generates a random encryption key that can be used to encrypt and decrypt data securely. The key is returned as a string and should be stored in the DBTK_ENCRYPTION_KEY environment variable or on keyring by calling dbtk store-key [your key]
- Returns:
A randomly generated encryption key.
- Return type:
str
- dbtk.config.get_password(name, config_file=None)[source]
Get a stored password from configuration.
- Parameters:
name (str) – Password name from config file
config_file (str | Path | None) – Optional path to config file
- Returns:
Decrypted password string
- Return type:
str
Example
api_key = get_password(‘openai_api_key’) secret = get_password(‘jwt_secret’)
- dbtk.config.get_setting(key, default=None, config_file=None)[source]
Get a setting value from configuration.
- Parameters:
key (str) – Setting key (supports dot notation like ‘database.timeout’)
default (Any) – Default value if key not found
config_file (str | Path | None) – Optional path to config file
- Returns:
Setting value or default
- Return type:
Any
Example
timeout = get_setting(‘database.timeout’, 30) tz = get_setting(‘default_timezone’, ‘UTC’)
- dbtk.config.migrate_config(source_file, target_file, new_encryption_key)[source]
Migrate config file with new encryption key.
- dbtk.config.setup_config()[source]
Interactive setup wizard for DBTK configuration.
This command guides you through: - Choosing config file location (project vs user) - Creating a config file from dbtk_sample.yml - Setting up encryption (keyring or environment variable) - Adding database connections
Example
# Interactive setup dbtk config-setup
Database Connections
Database connection wrapper that provides a uniform interface to different database adapters.
- class dbtk.database.Database(connection, driver, database_name=None, connection_name=None, cursor_settings=None)[source]
Bases:
objectDatabase connection wrapper providing a uniform interface across database adapters.
The Database class wraps database-specific connection objects and provides a consistent API regardless of which database driver is being used (psycopg2, oracledb, mysqlclient, etc.). It handles parameter style conversions, manages cursors, and delegates attribute access to the underlying connection for driver-specific functionality.
Key features:
Unified interface - Same API for PostgreSQL, Oracle, MySQL, SQL Server, SQLite
Cursor factory - Create different cursor types (Record, tuple, dict, list)
Transaction management - Context managers for safe transactions
Attribute delegation - Access underlying driver features when needed
Parameter style abstraction - Automatic handling of different bind parameter formats
- driver
The database adapter module (e.g., psycopg2, oracledb)
- database_type
Database type: ‘postgres’, ‘oracle’, ‘mysql’, ‘sqlserver’, or ‘sqlite’
- Type:
str
- database_name
Name of the connected database
- Type:
str
- placeholder
Parameter placeholder for this database’s parameter style
- Type:
str
Example
import dbtk # Create connection db = dbtk.database.postgres(user='admin', password='secret', database='mydb') # Or from configuration db = dbtk.connect('production_db') # Use as context manager with db: cursor = db.cursor() cursor.execute("SELECT * FROM users WHERE status = :status", {'status': 'active'}) users = cursor.fetchall() # Manual connection management db = dbtk.connect('production_db') cursor = db.cursor('dict') # Dictionary cursor cursor.execute("SELECT * FROM orders") db.commit() db.close()
See also
dbtk.connectConnect to database from configuration
Database.cursorCreate a cursor for executing queries
Database.transactionContext manager for transactions
- __init__(connection, driver, database_name=None, connection_name=None, cursor_settings=None)[source]
Initialize Database wrapper around an existing connection.
This is typically called by connection factory functions rather than directly. Use
dbtk.database.postgres(),dbtk.connect(), etc. instead.- Parameters:
connection – Underlying database connection object from the adapter
driver – Database adapter module (psycopg2, oracledb, mysqlclient, etc.)
database_name (str, optional) – Name of the database. If None, attempts to extract from connection.
connection_name (str, optional) – Name/alias from config file (e.g., ‘imdb’, ‘prod_db’). None if not from config.
cursor_settings (dict, optional) – Values passed to the cursor constructor. e.g. {‘batch_size’: 2000}
Example
import psycopg2 from dbtk.database import Database # Direct instantiation (not typical) conn = psycopg2.connect(dbname='mydb', user='admin', password='secret') db = Database(conn, psycopg2, 'mydb') # Typical usage via factory functions db = dbtk.database.postgres(user='admin', password='secret', database='mydb')
- classmethod create(db_type, driver=None, connection_name=None, cursor_settings=None, **kwargs)[source]
Factory method to create database connections.
- Parameters:
db_type (str) – Database type (‘postgres’, ‘oracle’, ‘mysql’, etc.)
driver (str | None) – Specific driver to use (optional)
connection_name (str | None) – Config file connection name/alias (optional)
cursor_settings (dict | None) – Defaults to use when creating cursors.
**kwargs – Connection parameters
- Returns:
Database instance
- Return type:
- cursor(**kwargs)[source]
Create a cursor for executing database queries.
Returns a cursor that yields Record objects, providing flexible access to query results through attribute, dictionary, and index notation.
- Parameters:
**kwargs –
Optional cursor configuration:
batch_size(int) - Rows to process at once in bulk operationsdebug(bool) - Enable debug output showing queries and bind variablesreturn_cursor(bool) - If True, execute() returns cursor for method chaining
- Returns:
Cursor instance that returns Record objects
- Return type:
Example
# Create cursor - returns Records cursor = db.cursor() cursor.execute("SELECT id, name, email FROM users WHERE status = :status", {'status': 'active'}) # Record supports multiple access patterns for row in cursor: print(row['name']) # Dictionary access print(row.name) # Attribute access print(row[1]) # Index access print(row.get('phone', 'N/A')) # Safe access with default # With configuration options cursor = db.cursor(debug=True)
See also
RecordFlexible row object with dict, attribute, and index access
Database.transactionContext manager for safe transactions
- property dialect
Database dialect instance providing SQL generation and schema introspection.
- transaction()[source]
Context manager for safe database transactions.
Automatically commits the transaction on successful completion or rolls back if an exception occurs. This ensures transactions are properly cleaned up even if errors occur.
- Yields:
Database – This database connection instance
- Raises:
Exception – Re-raises any exception that occurs within the transaction block after rolling back the transaction
Example
with db.transaction(): cursor = db.cursor() cursor.execute("INSERT INTO orders (customer, amount) VALUES (:c, :a)", {'c': 'Aang', 'a': 100}) cursor.execute("UPDATE inventory SET stock = stock - 1 WHERE item_id = :id", {'id': 42}) # Automatically commits on success # If exception occurs, transaction is automatically rolled back try: with db.transaction(): cursor = db.cursor() cursor.execute("INSERT INTO invalid_table ...") # Raises error # Rollback happens automatically except Exception as e: logger.error(f"Transaction failed: {e}")
See also
Database.commitManually commit a transaction
Database.rollbackManually roll back a transaction
- dbtk.database.mysql(user, password=None, database='mysql', host='localhost', port=3306, driver=None, **kwargs)[source]
Create a MySQL/MariaDB database connection.
Automatically selects the best available MySQL driver (mysqlclient, mysql.connector, pymysql, or MySQLdb).
- Parameters:
user (str) – Database username
password (str | None) – Database password
database (str) – Database name (default: ‘mysql’)
host (str) – Server hostname or IP (default: ‘localhost’)
port (int) – Server port (default: 3306)
driver (str) – Specific driver to use (‘mysqlclient’, ‘mysql.connector’, ‘pymysql’, ‘MySQLdb’)
**kwargs – Additional driver-specific parameters (charset, ssl, etc.)
- Returns:
Database connection object with context manager support
- Return type:
Example
- ::
>>> from dbtk.database import mysql >>> with mysql(user='root', password='pass', database='myapp') as db: ... cursor = db.cursor() ... cursor.execute("SELECT * FROM users")
See also
Database.create() for more connection options
- dbtk.database.oracle(user=None, password=None, database=None, host=None, port=1521, driver=None, **kwargs)[source]
Create an Oracle database connection.
Supports both DSN and connection string formats. Automatically selects the best available Oracle driver (oracledb or cx_Oracle).
- Parameters:
user (str | None) – Database username. Not required when using OCI cloud native authentication via
extra_auth_params.password (str | None) – Database password
database (str) – Service name or SID
host (str | None) – Server hostname or IP (required if not using dsn)
port (int) – Server port (default: 1521)
driver (str) – Specific driver to use (‘oracledb’, ‘cx_Oracle’)
**kwargs – Additional driver-specific parameters (dsn, mode, extra_auth_params, etc.)
- Returns:
Database connection object with context manager support
- Return type:
Example
- ::
>>> from dbtk.database import oracle >>> # Using service name >>> db = oracle(user='scott', password='tiger', ... host='oracle.example.com', database='ORCL') >>> >>> # Using DSN directly >>> db = oracle(user='scott', password='tiger', ... dsn='oracle.example.com:1521/ORCL') >>> >>> # OCI Resource Principal (no credentials needed) >>> db = oracle(dsn='mydb.adb.us-phoenix-1.oraclecloud.com:1521/myservice', ... extra_auth_params={'auth_type': 'ResourcePrincipal'})
See also
Database.create() for more connection options
- dbtk.database.postgres(user, password=None, database='postgres', host='localhost', port=5432, driver=None, **kwargs)[source]
Create a PostgreSQL database connection.
Automatically selects the best available PostgreSQL driver (psycopg2, psycopg3, or pgdb). You can specify a specific driver if needed.
- Parameters:
user (str) – Database username
password (str | None) – Database password
database (str) – Database name (default: ‘postgres’)
host (str) – Server hostname or IP (default: ‘localhost’)
port (int) – Server port (default: 5432)
driver (str) – Specific driver to use (‘psycopg2’, ‘psycopg’, ‘pgdb’)
**kwargs – Additional driver-specific connection parameters
- Returns:
Database connection object with context manager support
- Return type:
Example
- ::
>>> from dbtk.database import postgres >>> with postgres(user='user', password='pass', database='mydb') as db: ... cursor = db.cursor() ... cursor.execute("SELECT * FROM users")
See also
Database.create() for more connection options
- dbtk.database.sqlite(database, **kwargs)[source]
Create a SQLite database connection.
SQLite is a serverless, file-based database. Use ‘:memory:’ for an in-memory database.
- Parameters:
database (str) – Path to database file or ‘:memory:’ for in-memory database
**kwargs – Additional sqlite3.connect() parameters (timeout, isolation_level, etc.)
- Returns:
Database connection object with context manager support
- Return type:
Example
- ::
>>> from dbtk.database import sqlite >>> # File-based database >>> with sqlite('app.db') as db: ... cursor = db.cursor() ... cursor.execute("CREATE TABLE users (id INTEGER, name TEXT)") >>> >>> # In-memory database (useful for testing) >>> db = sqlite(':memory:')
See also
Database.create() for more connection options sqlite3 module documentation for additional parameters
- dbtk.database.sqlserver(user, password=None, database=None, host='localhost', port=1433, **kwargs)[source]
Create a Microsoft SQL Server database connection.
Automatically selects the best available SQL Server driver (pyodbc or pymssql).
- Parameters:
user (str) – Database username
password (str | None) – Database password
database (str) – Database name
host (str) – Server hostname or IP (default: ‘localhost’)
port (int) – Server port (default: 1433)
**kwargs – Additional driver-specific parameters (driver, encrypt, etc.)
- Returns:
Database connection object with context manager support
- Return type:
Example
- ::
>>> from dbtk.database import sqlserver >>> db = sqlserver(user='sa', password='pass', ... database='AdventureWorks', host='sqlserver.local') >>> cursor = db.cursor()
Note
When using pyodbc, you may need to specify the ODBC driver: sqlserver(…, driver=’ODBC Driver 17 for SQL Server’)
See also
Database.create() for more connection options
Cursors
Cursor classes that wrap database cursors and provide different return types. All cursors delegate to the underlying database cursor stored in _cursor.
- class dbtk.cursors.Cursor(connection, batch_size=None, debug=False, return_cursor=False, **kwargs)[source]
Bases:
objectCursor that returns query results as Records.
It wraps database-specific cursor objects and provides a consistent interface plus additional functionality like SQL file execution, parameter conversion, and prepared statements. It also maintains a clean reference hierarchy to the connection (cursor.connection) and to the driver (cursor.connection.driver)
Cursor returns Record objects, which provide flexible access via dictionary keys, attributes, or integer indices.
- paramstyle
Parameter style of the underlying database (‘qmark’, ‘named’, etc.)
- Type:
str
- placeholder
Placeholder string for bind parameters (e.g., ‘?’, ‘:1’, etc.)
- Type:
str
- description
Column metadata from the last query (delegated to underlying cursor)
Note
Cursors delegate attribute access to the underlying database-specific cursor, so all native cursor functionality is available.
Example
db = dbtk.connect('prod_ods') cursor = db.cursor() cursor.execute("SELECT id, name, email FROM users WHERE status = :status", {'status': 'active'}) for row in cursor: user_id, name, email = row # Plain list - index access only print(f"{user_id}: {name} ({email})")
See also
RecordFlexible data structure supporting dict, attribute, and index access
- WRAPPER_SETTINGS = ('batch_size', 'debug', 'return_cursor', 'fast_executemany')
- __init__(connection, batch_size=None, debug=False, return_cursor=False, **kwargs)[source]
Initialize a cursor for database operations.
- Parameters:
connection (Database) – Database connection object
batch_size (int, optional) – How many rows to process at a time when using executemany() or bulk operations in DataSurge
debug (bool, default False) – Enable debug output showing queries and bind variables
return_cursor (bool, default False) – If True, execute() returns the cursor for method chaining
**kwargs – Additional arguments passed to the underlying database cursor
Example
# Typically created via Database.cursor() cursor = db.cursor() # With debug enabled cursor = db.cursor(debug=True) # With method chaining cursor = db.cursor(return_cursor=True) results = cursor.execute("SELECT * FROM users").fetchall()
- columns(normalized=False)[source]
Return list of column names.
- Parameters:
normalized (bool, default False) – If True, return normalized column names (sanitized for Python attributes). If False, return original column names from database.
- Returns:
Column names in order
- Return type:
List[str]
Example
cursor.execute("SELECT 'First Name', 'User ID' FROM ...") cursor.columns() # ['First Name', 'User ID'] cursor.columns(normalized=True) # ['first_name', 'user_id']
- execute(query, bind_vars=(), convert_params=False)[source]
Execute a database query.
Pass convert_params=True to have the query rewritten to the cursor’s paramstyle and parameters handled automatically (same as PreparedStatement and execute_file).
- Parameters:
bind_vars (tuple or dict, default ()) –
Bind parameters to pass to the database.
When convert_params is False (the default), passed directly to the underlying cursor and must already be in the format required by the cursor’s paramstyle.
When convert_params is True, must be a dict (or Record).
convert_params (bool, default False) – If True, parameter order will be extracted from the query and the query will be rewritten to match the cursor’s paramstyle. Missing parameters will be defaulted to None and extra parameters will be ignored.
- execute_file(filename, bind_vars=None, **kwargs)[source]
Execute SQL query from a file with named parameter substitution.
This is a convenience method for one-off queries. For queries that will be executed multiple times, use prepare_file() instead for better performance.
- Parameters:
filename (str | Path) – Path to SQL file. Resolved relative to CWD first; if not found there, falls back to the directory of the calling script.
bind_vars (dict | None) – Dictionary of named parameters
**kwargs – encoding: File encoding (default: utf-8-sig)
- Returns:
Cursor if return_cursor=True, else None
- Return type:
Any
Example
cursor.execute_file(‘queries/get_user.sql’, {‘user_id’: 123})
- prepare_file(filename, encoding='utf-8-sig')[source]
Prepare a SQL statement from a file for repeated execution.
The SQL file is read once and parameter conversion is performed once. The returned PreparedStatement can be executed multiple times efficiently.
- Parameters:
filename (str | Path) – Path to SQL file. Resolved relative to CWD first; if not found there, falls back to the directory of the calling script.
encoding (str) – File encoding (default: utf-8-sig)
- Returns:
PreparedStatement object
- Return type:
Example
stmt = cursor.prepare_file('queries/insert_user.sql') for user in users: stmt.execute({'user_id': user.id, 'name': user.name})
- prepare_params(param_names, bind_vars, paramstyle=None)[source]
Convert named parameters to the format required by the cursor’s paramstyle. Used automatically by PreparedStatement, Table and DataSurge classes.
- Parameters:
param_names (list of str) – Ordered list of parameter names as they appear in the SQL statement. Missing names default to
Nonewith a debug-level log message.bind_vars (dict) – Dictionary of named parameter values keyed by parameter name.
paramstyle (str, optional) – Override the cursor’s native paramstyle for this call. Must be one of the values in
dbtk.utils.ParamStyle. Ignored if not a recognised style.
- Returns:
tuple – For positional styles (
qmark,numeric,format): values ordered to matchparam_names.dict – For named styles (
named,pyformat): subset ofbind_varscontaining only the required parameters.
- Return type:
Any
Example
- ::
from dbtk.utils import ParamStyle, process_sql_parameters
# query with :named or %(pyformat)s parameters sql = “SELECT * FROM warriors WHERE nation = :nation AND rank = COALESCE(:rank, rank)”
# query rewritten in cursor’s parameter style and parameter names in order they appear query, params_names = process_sql_parameters(sql, ParamStyle.get_positional_style(cur.paramstyle))
# missing parameter defaults to None, extra parameters are ignored cur.prepare_params(params_names, {‘nation’: ‘Fire Nation’, ‘nick_name’: ‘Sparky’}) (‘Fire Nation’, None)
- prepare_query(query)[source]
Prepare a SQL statement from a query string for repeated execution.
The parameter conversion is performed once. The returned PreparedStatement can be executed multiple times efficiently.
- Parameters:
query (str)
- Returns:
PreparedStatement object
- Return type:
Example
stmt = cursor.prepare_query('SELECT * FROM users WHERE user_id = :user_id') for user in users: stmt.execute({'user_id': user.id})
- class dbtk.cursors.PreparedStatement(cursor, query=None, filename=None, encoding='utf-8-sig')[source]
Bases:
objectA prepared SQL statement loaded from a file with cached parameter mapping.
The statement is read from file once and SQL parameter conversion is performed once based on the cursor’s paramstyle. The prepared statement can then be executed multiple times efficiently. It retains a reference to the cursor, so it can be used in the same way as a regular cursor (fetchone(), fetchmany(), etc.).
- __init__(cursor, query=None, filename=None, encoding='utf-8-sig')[source]
Create a prepared statement from a SQL file.
- Parameters:
cursor – The cursor that will execute this statement
query (str | None) – SQL query string (optional)
filename (str | Path | None) – Path to SQL file (relative to CWD)
encoding (str | None) – File encoding (default: utf-8-sig)
Record
Record classes for database result sets.
- class dbtk.record.FixedWidthRecord(*args, **kwargs)[source]
Bases:
RecordA Record subclass optimized for fixed-width data parsing and reconstruction.
Instances represent a single row from a fixed-width file, with values accessible by name, attribute, or index. The class retains the original List[FixedColumn] definitions so that to_line() can reconstruct the exact source line by splicing each formatted value into its correct byte position.
Designed for use with FixedReader or EDIReader, where each record type gets its own dynamic subclass with the appropriate column definitions.
- Class attributes (set automatically by set_fields):
- _columnsList[FixedColumn]
Full column definitions in definition order, including name, position, type, alignment, pad character, and comment.
- _line_lenint
Total line width in characters (max end_pos across all columns).
Example usage:
# In reader factory RecordClass.set_fields(columns) # columns is List[FixedColumn] record = RecordClass(*row_values) original_line = record.to_line() # reconstructs formatted string # Parse a raw fixed-width string directly record = RecordClass.from_line(raw_line) # corollary to to_line()
- classmethod from_line(line, auto_trim=True)[source]
Parse a fixed-width string and return a new instance of this record type.
The corollary to
to_line(): slice each field from its declared position, apply type conversion based oncolumn_type, and construct the record. Mirrors the parsing logic inFixedReaderso that records created here behave identically to those produced by the reader.- Parameters:
line (str) – A fixed-width string. Should be at least
_line_lencharacters; shorter strings are handled gracefully (missing positions return an empty string).auto_trim (bool) – If True (default), strip leading/trailing whitespace from
textfields before storing. Set to False to preserve the raw padded value exactly as it appears in the source line.
- Returns:
A new instance of this
FixedWidthRecordsubclass with values populated from the parsed string.- Raises:
TypeError – If
lineis not a string.- Return type:
Example:
MyRecord = fixed_record_factory([ ('record_type', 1), FixedColumn('amount', 2, 11, 'int'), ]) record = MyRecord.from_line('6 42') assert record.record_type == '6' assert record.amount == 42 # Round-trip assert record.to_line() == MyRecord.from_line(record.to_line()).to_line()
- pprint(normalized=False, add_comments=False)[source]
Pretty-print the record with aligned columns.
- Parameters:
normalized (bool) – If True, use normalized field names.
add_comments (bool) – If True, append each column’s comment (from the FixedColumn definition) after the value. Columns without a comment are left blank in that position. Has no effect when there are no _columns defined.
- classmethod set_fields(fields)[source]
Set field names and column definitions from a list of FixedColumn objects.
Stores the column list as-is on the class (preserving position, type, alignment, pad character, and comment for introspection) and pre-computes _line_len as the rightmost end_pos across all columns.
- Parameters:
fields (List[FixedColumn]) – FixedColumn definitions for this record type. Definition order determines value order on instances; to_line() places each value by start_idx so out-of-position-order definitions work correctly.
- to_line(truncate_overflow=False)[source]
Reconstruct the original fixed-width line from this record’s values.
Builds a space-filled buffer of _line_len characters and splices each field value into its position using start_idx. Column order in the definition does not matter; gaps between columns remain as spaces. Iterates only the column fields (stops before _row_num or any other appended fields). Missing values are treated as empty strings.
- Parameters:
truncate_overflow (bool) – If False (default), raise ValueError when a value exceeds its column width. If True, silently truncate.
- Returns:
A string exactly matching the fixed-width format for this record type.
- Raises:
ValueError – If truncate_overflow=False and any value exceeds its width.
Example
record.to_line() # -> ‘1234567890ABC 0000012345’
- visualize()[source]
Return a diagnostic string showing column boundaries over the record value.
Output format (4 lines):
1 2 ... 1234567890123456789012345... ← position ruler || | | |... ← '|' at each column start 101 123456789 87654321... ← to_line() output
Returns a string; call
print(record.visualize())to display. Consistent withFixedReader.visualize()which also returns a string.
- class dbtk.record.Record(*args, **kwargs)[source]
Bases:
listFlexible/lightweight that strikes a balance between the memory efficiency of list and the functionality of dicts/objects.
Record extends list to provide a rich interface for accessing query result rows. It supports attribute access, dictionary-style key access, integer indexing, and slicing - all on the same object. This makes it a very flexible and memory efficient return type for both cursors and readers.
Access Patterns
Dictionary-style:
row['column_name']- Safe with .get() methodAttribute access:
row.column_name- Clean, readable syntaxInteger index:
row[3]- Positional accessSlicing:
row[1:4]- Get multiple columns at onceIteration:
for value in row- Iterate over valuesContainment:
'column_name' in row- Check if column exists
Key Methods
get(key, default=None) - Safe dictionary-style access with default
keys() - Get list of column names
values() - Get list of column values
items() - Get (column, value) pairs
copy() - Create a shallow copy of the record
update(dict) - Update multiple columns from a dictionary
coalesce(dict) - Update only missing values from a dictionary
pprint() - Pretty-print the record
Column Names: Original vs Normalized
Every Record stores two parallel lists of names for each column:
_fields— the original names exactly as returned by the database (e.g.'First Name','User ID','#term_code')._fields_normalized— Python-safe versions used for attribute access (e.g.'first_name','user_id','term_code').
Both lists are set once by
set_fields()when the cursor executes its first query. Normalization converts field to be suitable for attribute access. It lowercases, replaces non-alphanumeric characters with underscores, collapses runs, strips trailing underscores, and prefixes digit-leading names withn.Which to use:
Use original names (
row['First Name'],row.keys(),row.to_dict()) when round-tripping data back to the database or to a CSV, where column names must match the schema exactly.Use normalized names (
row.first_name,row['first_name'],row.keys(normalized=True),row.to_dict(normalized=True)) in application code where Pythonic attribute access is preferred and when case and white-space insensitive matching is beneficial.
Both forms work interchangeably for item get/set and
inchecks, sorow['First Name']androw['first_name']return the same value.Note
Record is dynamically subclassed when a cursor executes a query. Each unique set of column names gets its own Record subclass with those names set as class attributes. This enables attribute access while maintaining the list base class for compatibility.
Example
cursor = db.cursor() cursor.execute("SELECT id, name, email, created FROM users WHERE id = :id", {'id': 42}) user = cursor.fetchone() # All these access patterns work on the same object: print(user['name']) # Dictionary-style: 'Aang' print(user.name) # Attribute access: 'Aang' print(user[1]) # Index access: 'Aang' print(user[1:3]) # Slicing: ['Aang', 'aang@avatar.com'] # Safe access with default print(user.get('phone', 'N/A')) # 'N/A' if no phone column # Dictionary methods for col, val in user.items(): print(f"{col}: {val}") # List compatibility user_id, name, email, created = user # Unpack like a tuple print(' | '.join(user)) # Join like a list # Update columns user['email'] = 'newemail@avatar.com' user.name = 'Avatar Aang'
See also
CursorDatabase cursor that returns Record objects
- coalesce(other=None, **kwargs)[source]
Fill in missing or empty fields from another dict, Record, or keyword arguments.
Updates the current Record by copying values from other (or **kwargs) only for fields that are currently None or an empty string (‘’). Existing non-empty values are preserved.
This is a non-destructive “fill gaps” operation — it will never overwrite valid data.
- Parameters:
other – Optional dict or Record containing values to coalesce. If provided, its items are processed first.
**kwargs – Additional key-value pairs to coalesce (overrides keys in other if both provide a value).
- Returns:
The updated Record (for chaining).
- Return type:
self
Examples
>>> Record.set_fields(['id', 'name', 'email', 'phone', 'notes']) >>> record = Record(None, "Scott", "", "scott@example.com", None) >>> resolved = {'id': 123, 'name': 'Scott Bailey', 'notes': 'VIP'} >>> record.coalesce(resolved, phone="555-1234") >>> record # [123, "Scott", "", "555-1234", "VIP"]
- copy()[source]
Return a shallow copy of the Record.
Copies the underlying list values
Copies the field metadata (_fields, _fields_normalized)
Copies deleted fields set
Copies any runtime-added fields (_added dict)
Preserves the same Record subclass (so attribute access works)
- Returns:
A new Record instance with the same data and state
- Return type:
- items(normalized=False)[source]
Get (field_name, value) pairs.
- Parameters:
normalized (bool) – If True, use normalized field names. If False (default), use original field names.
- Yields:
Tuples of (field_name, value)
- keys(normalized=False)[source]
Get list of field names.
- Parameters:
normalized (bool) – If True, return normalized field names. If False (default), return original field names.
- Returns:
List of field names
- Return type:
List[str]
- pprint(normalized=False)[source]
Pretty-print the record with aligned columns.
- Parameters:
normalized (bool) – If True, use normalized field names. If False (default), use original field names.
- classmethod set_fields(fields)[source]
Set the field names for this Record class.
Stores original field names and generates normalized versions for attribute access. Handles collisions by appending _2, _3, etc. to duplicate normalized names.
- Parameters:
fields (List[str]) – Original field names (e.g., [‘Start Year’, ‘End Date’])
Examples
>>> rec.set_fields(['Start Year', 'End Date']) >>> rec._fields ['Start Year', 'End Date'] >>> rec._fields_normalized ['start_year', 'end_date']
- to_dict(normalized=False)[source]
Convert Record to dictionary.
- Parameters:
normalized (bool) – If True, use normalized field names as keys. If False (default), use original field names.
- Returns:
Dictionary representation of the record
- Return type:
dict
Examples
>>> rec = Record(2020, 2025) >>> rec.set_fields(['Start Year', 'End Year']) >>> rec.to_dict() {'Start Year': 2020, 'End Year': 2025} >>> rec.to_dict(normalized=True) {'start_year': 2020, 'end_year': 2025}
- update(other=None, **kwargs)[source]
Update fields from another dict, Record, or keyword arguments.
Overwrites existing field values unconditionally. To preserve existing non-empty values, use
coalesce()instead.Accepts any mapping with an
items()method, an iterable of(key, value)pairs, or keyword arguments. Unknown keys are added as runtime fields.- Parameters:
other – Optional dict, Record, or iterable of (key, value) pairs.
**kwargs – Additional key-value pairs to set.
Examples
>>> Record.set_fields(['id', 'name', 'email']) >>> record = Record(1, 'Scott', 'old@example.com') >>> record.update({'email': 'new@example.com'}, name='Scott Bailey') >>> record # [1, 'Scott Bailey', 'new@example.com']
- dbtk.record.fixed_record_factory(columns, name='FixedRecord')[source]
Class factory: build a
FixedWidthRecordsubclass from a compact column spec.Follows the same convention as
collections.namedtuple()— returns a class, not an instance.- Parameters:
columns (list of FixedColumn or (name, width) tuple) – May be mixed. Tuples specify
(name, width)and are assigned sequential positions starting at 1 (or immediately after the previous column).FixedColumnobjects are used as-is and advance the auto-position cursor tocol.end_pos + 1.name (str, optional) – Class name of the returned type. Defaults to
'FixedRecord'.
- Returns:
A
FixedWidthRecordsubclass withset_fields()already called.- Return type:
type
Examples
# Tuple-only: positions calculated automatically AchDetail = fixed_record_factory([ ('record_type', 1), ('priority_code', 2), ('routing_number', 9), ('account_number', 17), ('amount', 10), ], name='AchDetail') line = AchDetail('6', '22', '123456789', '00012345678', 100) print(line.to_line()) # Mixed: use FixedColumn where you need explicit control AchHeader = fixed_record_factory([ FixedColumn('record_type', 1, 1), ('priority_code', 2), FixedColumn('routing_number', 4, 12, column_type='int', align='right'), ('filler', 39), ])
Readers
Base classes and utilities for file readers.
Defines the abstract Reader interface and Clean enumeration for header normalization across all reader implementations.
- class dbtk.readers.base.Reader(add_row_num=True, skip_rows=0, n_rows=None, headers=None, null_values=None)[source]
Bases:
ABCAbstract base class for all file readers in DBTK.
Provides unified interface and common functionality for reading various file formats (CSV, Excel, JSON, XML, fixed-width). All readers support the same features regardless of file format: header cleaning, record skipping, row number tracking, and flexible return types.
Readers are designed to work as context managers and iterators, making them ideal for memory-efficient processing of large files. They automatically handle resource cleanup and support both Record objects (with multiple access patterns) and plain dictionaries as return types.
Common Features
Row number tracking - Automatic _row_num field for debugging
Record skipping - Skip header rows or bad data
Record limiting - Process only first N records
Record filtering - Filter records with custom functions (new!)
Flexible return types - Record objects or dictionaries
Context manager - Automatic resource cleanup
Iterator protocol - Memory-efficient streaming
Null value conversion - Convert specified values to None
- param add_row_num:
Add a ‘_row_num’ field to each record containing the 1-based row number
- type add_row_num:
bool, default True
- param skip_rows:
Number of data rows to skip after headers (useful for skipping footer rows or known bad data at start of file)
- type skip_rows:
int, default 0
- param n_rows:
Maximum number of rows to read. None (default) reads all rows.
- type n_rows:
int, optional
- param null_values:
Values to convert to None. Can be a single string or a collection of strings. Common examples: ‘N’, ‘NULL’, ‘NA’, ‘’ (empty string)
- type null_values:
str, list, tuple, or set, optional
Example
# Subclasses implement specific file formats from dbtk import readers # CSV with default settings - returns Record objects with readers.CSVReader(open('data.csv')) as reader: for record in reader: print(record.name, record.email) # attribute access print(record['name'], record['email']) # or dict-style # Skip first 5 rows, read only 100 records with readers.CSVReader(open('data.csv'), skip_rows=5, n_rows=100) as reader: for row in reader: print(row.name) # Access fields with original or normalized names with readers.CSVReader(open('messy.csv')) as reader: # Headers like "ID #", "Student Name" preserved as originals # but also accessible as normalized: id_hash, student_name for record in reader: print(record['ID #'], record['Student Name']) # original print(record.id, record.student_name) # normalized # Filter records with custom function with readers.CSVReader(open('data.csv')) as reader: reader.add_filter(lambda r: int(r.age) >= 18) reader.add_filter(lambda r: r.country == 'US') for record in reader: print(record.name) # Only US adults
See also
CSVReaderRead CSV files
JSONReaderRead JSON files
ExcelReaderRead Excel .xlsx files
XMLReaderRead XML files
FixedReaderRead fixed-width text files
RecordFlexible row objects with multiple access patterns
Notes
This is an abstract base class. Use one of the concrete implementations (CSVReader, JSONReader, etc.) for actual file reading.
Subclasses must implement:
_read_headers()- Return list of raw column names from file_generate_rows()- Yield raw data rows as lists
Optionally override:
_cleanup()- Release resources (file handles, etc.)
- BIG_BYTE_THRESHOLD = 5242880
- BIG_ROW_THRESHOLD = 10000
- __init__(add_row_num=True, skip_rows=0, n_rows=None, headers=None, null_values=None)[source]
Initialize the reader with common options.
- Parameters:
add_row_num (bool, default True) – Add a ‘_row_num’ field to each record containing the 1-based row number
skip_rows (int, default 0) – Number of data rows to skip after headers
n_rows (int, optional) – Maximum number of rows to read, or None for all rows
headers (Optional list of header names to use instead of reading from row 0)
null_values (str, list, tuple, or set, optional) – Values to convert to None. Can be a single string or collection of strings. Common examples: ‘N’ (IMDB), ‘NULL’, ‘NA’, ‘’ (empty string)
Example
# In subclass implementation class MyReader(Reader): def __init__(self, file_path, **kwargs): super().__init__(**kwargs) self.file = open(file_path) def _read_headers(self): return ['id', 'name', 'email'] def _generate_rows(self): for line in self.file: yield line.strip().split(',')
- add_filter(func)[source]
Add a filter function to the filtering pipeline.
Filter functions are applied after skip_rows and null_values conversion, but before the n_rows limit. Multiple calls to add_filter() accumulate in a pipeline - all filters must return True for a record to be included.
The filter operates on the final Record object, after null value conversion has been applied. This allows you to filter on clean data rather than raw values.
- Parameters:
func (callable) – A function that takes a record and returns True to keep it, False to filter it out. The function should accept a single argument (the record).
- Returns:
Returns self to allow method chaining
- Return type:
Example
from dbtk import readers # Single filter with readers.CSVReader(open('users.csv')) as reader: reader.add_filter(lambda r: r.age >= 18) for record in reader: print(record.name) # Only adults # Multiple filters (all must pass) with readers.CSVReader(open('users.csv')) as reader: reader.add_filter(lambda r: r.age >= 18) reader.add_filter(lambda r: r.country == 'US') reader.add_filter(lambda r: r.active == 'true') for record in reader: print(record.name) # US adults who are active # Complex filter function def valid_email(record): return '@' in record.email and '.' in record.email with readers.CSVReader(open('users.csv')) as reader: reader.add_filter(valid_email) for record in reader: print(record.email) # Get exactly n_rows after filtering with readers.CSVReader(open('users.csv')) as reader: reader.add_filter(lambda r: r.age >= 18) reader.n_rows = 100 # Get 100 records that pass the filter data = list(reader) print(len(data)) # Will be 100 (or less if fewer than 100 match)
Notes
Filters are lazy - they’re applied during iteration, not when add_filter() is called
Execution order: read → skip_rows → null_values → filter pipeline → n_rows
If both skip_rows and add_filter() are used, a warning is logged (skip_rows applies first)
The n_rows limit applies after filtering, so n_rows=100 returns 100 filtered records
Record._row_num field reflects the count of returned (filtered) records, not raw file rows
- property fieldnames: List[str]
Alias for headers to maintain compatibility with csv.DictReader.
- Returns:
List of cleaned header names
- property headers: List[str]
Get the column headers.
- Returns:
List of cleaned header names
- property row_count: int
Returns the number of rows.
This property provides access to the total number of rows, which is stored in the private attribute _row_num.
- Returns:
The total number of rows.
- Return type:
int
- property source: str
Get the filename of the source file.
For the ExcelReader and XLSReader, the source must be set manually because the Workbook objects do not keep a reference to the original file.
CSV file reader with flexible delimiter and quoting support.
- class dbtk.readers.csv.CSVReader(fp, dialect=<class 'csv.excel'>, headers=None, add_row_num=True, skip_rows=0, n_rows=None, null_values=None, **kwargs)[source]
Bases:
ReaderRead CSV (Comma-Separated Values) files with flexible formatting options.
CSVReader provides a simple, consistent interface for reading CSV files with support for various delimiters, quoting styles, and header handling. It handles messy real-world CSV files by providing automatic header cleaning, custom dialects, and the ability to override headers entirely.
The reader returns Record objects by default (supporting attribute, key, and index access) or plain dictionaries if preferred. It automatically handles tab-delimited files when delimiter=’t’ is specified.
- Parameters:
fp (file-like object) – Open file pointer to CSV file (from open() or similar). For encoding detection, use
get_reader('file.csv', encoding='detect')instead of opening the file directly.dialect (csv.Dialect, default csv.excel) –
CSV dialect defining formatting rules. Common options:
csv.excel- Standard CSV format (comma delimiter, quoted strings)csv.excel_tab- Tab-delimited formatcsv.unix_dialect- Unix-style CSV (LF line endings)
headers (List[str], optional) – Custom header names to use instead of reading from first row. Useful when CSV has no header row or you want to rename columns.
add_rownum (bool, default True) – Add ‘_row_num’ field to each record with 1-based row number
skip_records (int, default 0) – Number of data rows to skip after headers
max_records (int, optional) – Maximum records to read, None for all
**kwargs – Additional arguments passed to csv.reader() like delimiter, quotechar, etc.
Example
from dbtk import readers # Basic CSV reading with readers.CSVReader(open('users.csv')) as reader: for user in reader: print(f"{user.name}: {user.email}") # Tab-delimited file with readers.CSVReader(open('data.tsv'), delimiter='\t') as reader: for record in reader: process(record) # Custom delimiter and quoting with readers.CSVReader(open('data.txt'), delimiter='|', quotechar='"', quoting=csv.QUOTE_MINIMAL) as reader: for record in reader: print(record) # Provide custom headers (file has no header row) headers = ['id', 'name', 'email', 'created'] with readers.CSVReader(open('data.csv'), headers=headers) as reader: for record in reader: print(record.id, record.name) # Skip first 10 data rows, read only 100 rows with readers.CSVReader(open('large.csv'), skip_rows=10, n_rows=100) as reader: data = list(reader)
See also
ReaderBase class with common reader features
readers.get_readerAutomatic reader selection based on file extension
Notes
Automatically converts ‘t’ delimiter to excel_tab dialect
Headers are read from first row unless custom headers provided
File pointer is automatically closed when used as context manager
- __init__(fp, dialect=<class 'csv.excel'>, headers=None, add_row_num=True, skip_rows=0, n_rows=None, null_values=None, **kwargs)[source]
Initialize CSV reader for a file.
- Parameters:
fp (file-like object) – Open file pointer to CSV file
dialect (csv.Dialect, default csv.excel) – CSV dialect (excel, excel_tab, unix_dialect, etc.)
headers (List[str], optional) – Custom headers to use instead of reading from file
add_row_num (bool, default True) – Add _row_num field to records
skip_rows (int, default 0) – Data rows to skip after headers
n_rows (int, optional) – Maximum rows to read
null_values (str, list, tuple, or set, optional) – Values to convert to None (e.g., ‘N’ for IMDB files)
**kwargs – Additional csv.reader() arguments (delimiter, quotechar, etc.)
Excel workbook reader supporting XLS and XLSX formats.
- class dbtk.readers.excel.ExcelReader(worksheet, headers=None, add_row_num=True, skip_rows=0, n_rows=None, null_values=None)[source]
Bases:
ReaderClass to iterate over an Excel Spreadsheet using openpyxl.
- __init__(worksheet, headers=None, add_row_num=True, skip_rows=0, n_rows=None, null_values=None)[source]
Initialize ExcelReader for reading Excel .xlsx files.
- Parameters:
worksheet – openpyxl.Worksheet object to read from.
headers (List[str] | None) – Optional list of header names to use instead of reading from row 1.
add_row_num (bool) – If True, adds a _row_num field to each record (default: True).
skip_rows (int) – Number of data rows to skip after headers (default: 0).
n_rows (int | None) – Maximum number of rows to read, or None for all (default: None).
null_values – Values to convert to None (e.g., ‘N’, ‘NULL’, ‘NA’).
- Raises:
TypeError – If worksheet is not an openpyxl.Worksheet.
- class dbtk.readers.excel.XLSReader(worksheet, headers=None, add_row_num=True, skip_rows=0, n_rows=None, null_values=None)[source]
Bases:
ReaderClass to iterate over an Excel Spreadsheet using xlrd.
- __init__(worksheet, headers=None, add_row_num=True, skip_rows=0, n_rows=None, null_values=None)[source]
Initialize XLReader for reading Excel .xls files.
- Parameters:
worksheet – xlrd.Sheet object to read from.
headers (List[str] | None) – Optional list of header names to use instead of reading from row 0.
add_row_num (bool) – If True, adds a _row_num field to each record (default: True).
skip_rows (int) – Number of data rows to skip after headers (default: 0).
n_rows (int | None) – Maximum number of rows to read, or None for all (default: None).
null_values – Values to convert to None (e.g., ‘N’, ‘NULL’, ‘NA’).
- Raises:
TypeError – If worksheet is not an xlrd.Sheet.
- dbtk.readers.excel.check_dependencies()[source]
Check for optional dependencies and issue warnings if missing.
- dbtk.readers.excel.get_sheet_by_index(wb, index)[source]
Get a worksheet from a workbook by index.
- Parameters:
wb – Workbook object (openpyxl.Workbook or xlrd.Book).
index (int) – Index of the sheet to retrieve (0-based).
- Returns:
Worksheet object (openpyxl.Worksheet or xlrd.Sheet).
- Raises:
TypeError – If workbook type is not supported.
- dbtk.readers.excel.get_sheet_by_name(wb, sheet_name)[source]
Get a worksheet from a workbook by name.
- dbtk.readers.excel.open_workbook(filename)[source]
Open an Excel workbook using openpyxl for .xlsx or xlrd for .xls.
- Parameters:
filename (str | Path) – Path to the Excel file (.xlsx or .xls).
- Returns:
Workbook object (openpyxl.Workbook or xlrd.Book).
- Raises:
ImportError – If neither openpyxl nor xlrd is available.
Fixed-width text file reader with column position specifications.
- class dbtk.readers.fixed_width.EDIReader(fp, columns, type_name_map=None, strict=False, **kwargs)[source]
Bases:
FixedReaderReader for fixed-width files containing multiple record types (EDI-like formats).
Parses files where each line’s layout is determined by a type identifier prefix (e.g., NACHA ACH files with ‘1’, ‘5’, ‘6’, ‘7’, ‘8’, ‘9’ record types). Each record type uses its own set of FixedColumn definitions, allowing different column positions and formats per type.
Record type codes must all be the same length (automatically detected from keys). The reader dispatches parsing based on the prefix of each line and returns typed Record instances (one dynamic subclass per record type).
Supports common legacy formats such as NACHA ACH, COBOL copybooks, and other multi-layout fixed-width EDI-style files. The column specifications for several common EDI-like files, including ACH, are defined in dbtk.formats.edi
- Parameters:
fp (TextIO) – Open file pointer in text mode
columns (Dict[str, List[FixedColumn]]) – Mapping of record type codes (keys) to their column definitions. All keys must be strings of identical length.
type_name_map (Dict[str, str], optional) – Optional friendly names for record types (e.g., {‘1’: ‘File Header’}) used in logging or output fields.
strict (bool, default False) – If True raise error if record type code not mapped in columns, else skipped and logged
auto_trim (bool, default True) – Trim whitespace from field values
**kwargs – Additional arguments passed to FixedReader base class
- Raises:
ValueError – If record type keys have inconsistent lengths or columns dict is invalid
Example
>>> columns = { ... '1': [FixedColumn('record_type', 1, 1), FixedColumn('priority_code', 2, 3), ...], ... '5': [FixedColumn('record_type', 1, 1), FixedColumn('service_class_code', 2, 4), ...], ... # ... other types ... ... } >>> reader = EDIReader(open('ach_file.ach'), columns=columns) >>> for record in reader: ... print(record.company_name) # fields available depend on record type
- visualize()[source]
Visualize column boundaries for each record type found in the file.
Scans the entire file, emitting one block per record type the first time that type is encountered. Each block shows the rulers, column boundary markers, the raw source line, and the interpreted line. Blocks are separated by blank lines. The file pointer is saved and restored.
- Returns:
String with one visualization block per record type.
- Return type:
str
- class dbtk.readers.fixed_width.FixedReader(fp, columns, auto_trim=True, add_row_num=False, skip_rows=0, n_rows=None, null_values=None)[source]
Bases:
ReaderReader for fixed width files
- __init__(fp, columns, auto_trim=True, add_row_num=False, skip_rows=0, n_rows=None, null_values=None)[source]
Initializes the instance with the provided file pointer, column definitions, and processing options.
- fp
The file pointer from which data is read. For encoding detection, use
get_reader('file.txt', encoding='detect')instead of opening the file directly.- Type:
TextIO
- columns
A list of FixedColumn objects defining the structure of columns in the data.
- Type:
List[FixedColumn]
- auto_trim
Determines whether to automatically trim whitespace from field values. Default is True.
- Type:
bool
- add_row_num
Determines whether to add a row number attribute
- Type:
bool
- skip_rows
The number of rows to skip before reading data.
- Type:
int
- n_rows
The maximum number of rows to read.
- Type:
Optional[int]
- null_values
Values to convert to None (e.g., ‘N’, ‘NULL’, ‘NA’).
- visualize(sample_lines=2)[source]
Visualize column boundaries over sample data from the file.
Seeks to the beginning of the file, reads up to
sample_linesrecords, then restores the file pointer. Output shows the rulers and column boundary markers once, then for each record both the raw source line and the interpreted line reconstructed viarecord.to_line().- Parameters:
sample_lines (int) – Number of records to include in the preview.
- Returns:
String representation of column layout with sample data.
- Return type:
str
JSON and NDJSON (newline-delimited JSON) file readers.
- class dbtk.readers.json.JSONReader(fp, flatten=True, add_row_num=True, skip_rows=0, n_rows=None, null_values=None, **kwargs)[source]
Bases:
ReaderRead JSON files containing arrays of objects.
JSONReader parses JSON files that contain an array of objects (standard JSON format for tabular data). It can optionally flatten nested objects using dot notation, making it easy to work with hierarchical JSON data in a flat, record-based format.
The reader automatically discovers the schema by scanning all objects in the array, ensuring all possible keys are available even if some objects don’t have all fields.
- Parameters:
fp (file-like object) – Open file pointer to JSON file
flatten (bool, default True) – Flatten nested objects with dot notation. For example,
{"user": {"name": "Bob"}}becomes{"user.name": "Bob"}. Arrays are preserved as-is.add_rownum (bool, default True) – Add _row_num field to each record
skip_records (int, default 0) – Number of records to skip
max_records (int, optional) – Maximum records to read
**kwargs – Reserved for future use
Example
from dbtk import readers # Simple JSON array # [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}] with readers.JSONReader(open('users.json')) as reader: for user in reader: print(user.id, user.name) # Nested JSON with flattening # [{"id": 1, "user": {"name": "Alice", "email": "a@example.com"}}] with readers.JSONReader(open('nested.json'), flatten=True) as reader: for record in reader: print(record.id, record['user.name'], record['user.email']) # Disable flattening to keep nested structure with readers.JSONReader(open('nested.json'), flatten=False) as reader: for record in reader: print(record.user) # {'name': 'Alice', 'email': 'a@example.com'}
See also
NDJSONReaderRead newline-delimited JSON files
ReaderBase reader class
writers.to_jsonWrite JSON files
Notes
JSON file must contain an array at the root level
All objects in array are scanned to discover complete schema
Empty arrays raise ValueError
Nested objects are flattened with dot notation by default
Arrays within objects are never flattened
- __init__(fp, flatten=True, add_row_num=True, skip_rows=0, n_rows=None, null_values=None, **kwargs)[source]
- property record_count: int
- class dbtk.readers.json.NDJSONReader(fp, add_row_num=True, skip_rows=0, n_rows=None, null_values=None)[source]
Bases:
ReaderNewline-delimited JSON file reader that returns Record objects or dicts.
- __init__(fp, add_row_num=True, skip_rows=0, n_rows=None, null_values=None)[source]
Initialize NDJSON reader.
- Parameters:
fp (TextIO) – File pointer to NDJSON file (one JSON object per line)
add_row_num (bool) – Add _row_num to each record
skip_rows (int) – Number of rows to skip from the beginning
n_rows (int | None) – Maximum number of rows to read (None = unlimited)
null_values – Values to convert to None (e.g., ‘N’, ‘NULL’, ‘NA’)
XML file reader with XPath support for element extraction.
- class dbtk.readers.xml.XMLColumn(name, xpath=None, data_type='text')[source]
Bases:
objectColumn definition for XML extraction.
- class dbtk.readers.xml.XMLReader(fp, record_xpath='//record', columns=None, sample_size=10, add_row_num=True, skip_rows=0, n_rows=None, null_values=None)[source]
Bases:
ReaderXML file reader that returns Record objects.
- __init__(fp, record_xpath='//record', columns=None, sample_size=10, add_row_num=True, skip_rows=0, n_rows=None, null_values=None)[source]
Initialize XML reader.
- Parameters:
fp (TextIO) – File pointer to XML file
record_xpath (str) – XPath expression to find record elements
columns (List[XMLColumn] | None) – List of XMLColumn definitions for custom extraction
sample_size (int) – Number of records to sample for column discovery
add_row_num (bool) – Add _row_num to each record
skip_rows (int) – Number of data rows to skip after headers
n_rows (int | None) – Maximum number of rows to read, or None for all
null_values – Values to convert to None (e.g., ‘N’, ‘NULL’, ‘NA’)
- property record_count: int
Return total number of records found.
- dbtk.readers.xml.open_xml(filename, **kwargs)[source]
Open XML file for reading.
- Parameters:
filename (str | Path) – Path to XML file
**kwargs – Arguments passed to XMLReader
- Returns:
XMLReader instance
- Return type:
Example
- ::
- with open_xml(‘data.xml’, record_xpath=’//user’) as reader:
- for record in reader:
print(record.name)
- class dbtk.readers.data_frame.DataFrameReader(df, add_row_num=True, skip_rows=0, n_rows=None, null_values=None)[source]
Bases:
ReaderRead directly from pandas or polars DataFrames — zero intermediate files.
This reader accepts a pre-loaded DataFrame (from pandas or polars) and streams rows as Record objects. It supports all standard Reader features (add_rownum, skip_records, max_records) while providing accurate progress tracking based on known row count.
No pandas or polars are imported in this module — the user has already imported one and passed a DataFrame.
- Parameters:
df (DataFrame) – pandas.DataFrame or polars.DataFrame containing the data
add_rownum (bool, default True) – Add ‘_row_num’ field with 1-based row number
skip_records (int, default 0) – Number of rows to skip from the beginning
max_records (int, optional) – Maximum number of records to yield
Examples
>>> import pandas as pd >>> df = pd.read_parquet("data.parquet") >>> with DataFrameReader(df) as reader: >>> for row in reader: >>> print(row.id)
>>> import polars as pl >>> df = pl.read_parquet("data.parquet") >>> with DataFrameReader(df, add_rownum=False) as reader: >>> BulkSurge(table).load(reader)
- property total: int | None
Total rows for progress bar — known for DataFrames.
Writers
Base classes for data writers with common file handling and data extraction patterns.
- class dbtk.writers.base.BaseWriter(data, file=None, columns=None, encoding='utf-8', write_headers=True, compression='infer', **fmt_kwargs)[source]
Bases:
ABCAbstract base class for all data writers in DBTK.
Provides common functionality for writing data to various formats (CSV, JSON, Excel, XML, etc.). Writers accept data from multiple sources - cursors, lists of Records, lists of dicts, or lists of lists - and handle the conversion to the target format automatically.
All writers share common features like automatic column detection, stdout support for quick previews, configurable encodings, and optional type preservation. Writers are designed to work seamlessly with DBTK cursors and readers.
Common Features
Multiple data sources - Cursors, Records, dicts, lists
Automatic column detection - From cursors, Record objects, or dict keys
Stdout preview - Write to console with automatic row limiting
Configurable encoding - UTF-8, Latin-1, etc.
Type preservation - Optionally keep native types vs converting to strings
Consistent API - Same interface regardless of output format
- param data:
Data to write. Accepts:
Cursor objects (from database queries)
List of Record objects (from readers)
List of dictionaries
List of lists/tuples (requires columns parameter)
- type data:
Iterable[RecordLike]
- param filename:
Output filename or file handle. If None, writes to stdout (limited to 20 rows for preview).
- type filename:
str, Path, TextIO, or BinaryIO, optional
- param columns:
Column names for list-of-lists data. Ignored for other data types which have columns embedded.
- type columns:
List[str], optional
- param encoding:
File encoding for text-based formats
- type encoding:
str, default ‘utf-8’
- param write_headers:
If True, include header row in formats that support it.
- type write_headers:
bool, default True
- param **fmt_kwargs:
Additional format-specific arguments (passed to subclasses)
- columns
Column names detected from data or provided explicitly
- Type:
List[str]
- data_iterator
Iterator over data records
- Type:
Iterator
Examples
Subclasses implement specific formats:
from dbtk import writers # Write cursor results to CSV cursor.execute("SELECT * FROM users") writers.to_csv(cursor, 'users.csv') # Write list of records to JSON with readers.CSVReader(open('input.csv')) as reader: records = list(reader) writers.to_json(records, 'output.json') # Preview to stdout (shows first 20 rows) cursor.execute("SELECT * FROM large_table") writers.to_csv(cursor, None) # Prints to console
See also
to_csvWrite CSV files
to_jsonWrite JSON files
to_excelWrite Excel files
to_xmlWrite XML files
cursor_to_cursorDatabase-to-database transfer
Notes
This is an abstract base class. Use one of the concrete implementations (CSVWriter, JSONWriter, etc.) or the convenience functions (to_csv, to_json, etc.).
Subclasses must implement:
_write_data()- Perform the actual write operation
When filename is None (stdout mode), output is automatically limited to 20 rows to prevent accidentally printing huge result sets to the console.
- __init__(data, file=None, columns=None, encoding='utf-8', write_headers=True, compression='infer', **fmt_kwargs)[source]
Initialize the writer with data and options.
- Parameters:
data (Iterable[RecordLike]) – Data source (cursor, list of records, etc.)
file (str, Path, TextIO, or BinaryIO, optional) – Output file. None writes to stdout.
columns (List[str], optional) – Column names for list-of-lists
encoding (str, default 'utf-8') – File encoding
write_headers (bool, default True) – Include header row in output
**fmt_kwargs – Format-specific arguments
- accepts_file_handle = True
- close()[source]
Close the output file if it was opened by this writer.
Safe to call multiple times (idempotent). Automatically called when using the writer as a context manager.
- preserve_types = False
- property row_count: int
Number of rows written (updated during write operation)
- class dbtk.writers.base.BatchWriter(data=None, file=None, columns=None, headers=None, encoding='utf-8', write_headers=True, compression='infer', **fmt_kwargs)[source]
Bases:
BaseWriterBase class for writers that support incremental, batch-based output.
Unlike traditional writers that require all data up-front, BatchWriter subclasses are designed for streaming and bulk ETL workloads where data arrives in chunks (e.g., from BulkSurge, large queries, or infinite streams).
Key Features
Lazy initialization - Columns and iterator are resolved on first write
Reusable file handle - Write multiple batches without reopening
Header control - First batch includes headers, subsequent batches omit
Zero-copy compatible - Works with Record objects and generators
Dual-mode operation - Use as traditional writer or streaming writer
Usage Patterns
- Pattern 1: Traditional (single-shot)
>>> writer = CSVWriter(data=all_records, file='output.csv') >>> writer.write()
- Pattern 2: Pure streaming
>>> with CSVWriter(data=None, file='output.csv') as writer: ... for batch in surge.batched(records): ... writer.write_batch(batch)
- Pattern 3: Hybrid
>>> writer = CSVWriter(data=first_batch, file='output.csv') >>> writer.write() # Process initial batch >>> writer.write_batch(second_batch) # Continue streaming >>> writer.write_batch(third_batch)
- param data:
Initial data. If None, setup is deferred until first write_batch(). This enables streaming use cases where data arrives in batches.
- type data:
Iterable[RecordLike], optional
- param file:
Output destination. For streaming, pass an open file handle.
- type file:
str, Path, TextIO, or BinaryIO, optional
- param columns:
Explicit column names. If not provided, inferred from first batch.
- type columns:
List[str], optional
- param encoding:
File encoding for text-based formats
- type encoding:
str, default ‘utf-8’
- param write_headers:
Whether to write column headers on the first batch.
- type write_headers:
bool, default True
- param **fmt_kwargs:
Format-specific options passed to _write_data().
Notes
Subclasses must implement
_write_data()but inheritwrite_batch()for free.- Used by:
BulkSurge.dump() and .load(fallback_path=…)
Any high-performance streaming export pipeline
See also
CSVWriterBatchable CSV writer
NDJSONWriterBatchable newline-delimited JSON writer
XMLStreamerBatchable streaming XML writer
- __init__(data=None, file=None, columns=None, headers=None, encoding='utf-8', write_headers=True, compression='infer', **fmt_kwargs)[source]
Initialize a batch-capable writer with optional deferred setup.
- Parameters:
data (Iterable[RecordLike], optional) – Initial data. If None, setup is deferred until first write_batch().
file (str, Path, TextIO, or BinaryIO, optional) – Output destination.
columns (List[str], optional) – Explicit column names. If not provided, inferred from data.
headers (List[str], optional) – Header row text for CSV/Excel writers. If None, checks data.description for original column names, then falls back to detected column names. Only used by writers that have header rows (CSV, Excel).
encoding (str, default 'utf-8') – File encoding
write_headers (bool, default True) – Include header row in output
**fmt_kwargs – Format-specific options
- Raises:
ValueError – If headers provided but write_headers=False
- accepts_file_handle = True
- data_iterator: Iterator | None
- preserve_types = False
- write()[source]
Write initial data provided at initialization.
- Returns:
Number of rows written
- Return type:
int
- Raises:
RuntimeError – If no initial data was provided to __init__. Use write_batch() for streaming.
- write_batch(data)[source]
Write a batch of records to the output stream.
This is the core method that makes BatchWriter suitable for BulkSurge and other high-volume streaming scenarios.
- Parameters:
data (Iterable[RecordLike]) – A batch of Record objects (or compatible row objects).
- Raises:
RuntimeError – If initial data was provided but write() hasn’t been called yet.
- class dbtk.writers.csv.CSVWriter(data=None, file=None, columns=None, headers=None, write_headers=True, null_string=None, compression='infer', **csv_kwargs)[source]
Bases:
BatchWriterCSV writer class that extends BatchWriter.
- __init__(data=None, file=None, columns=None, headers=None, write_headers=True, null_string=None, compression='infer', **csv_kwargs)[source]
Initialize CSV writer.
- Parameters:
data – Cursor object or list of records
file (str | Path | TextIO | None) – Output file. If None, writes to stdout
columns (List[str] | None) – Column names for list-of-lists data (optional for other types)
headers (List[str] | None) – Header row text. If None, checks data.description for original column names, then falls back to detected column names. Useful when field names have been normalized but you want original database column names in the CSV header.
write_headers (bool) – Whether to include column headers
null_string (str) – String representation for null values
compression (str) – Compression type (‘infer’, ‘gzip’, ‘bz2’, ‘lzma’, or None)
**csv_kwargs – Additional arguments passed to csv.writer
- dbtk.writers.csv.to_csv(data, file=None, headers=None, write_headers=True, null_string=None, compression='infer', **csv_kwargs)[source]
Export cursor or result set to CSV file.
- Parameters:
data – Cursor object or list of records
file (str | Path | None) – Output file. If None, writes to stdout
headers (List[str] | None) – Header row text. If None, uses cursor.description or detected column names
write_headers (bool) – Whether to include column headers
null_string (str) – String representation for null values
compression (str) – Compression type. ‘infer’ detects from file extension (.gz, .bz2, .xz). Pass ‘gzip’, ‘bz2’, or ‘lzma’ to override, or None to disable.
**csv_kwargs – Additional arguments passed to csv.writer
Example
# Write to file to_csv(cursor, ‘users.csv’)
# Write to stdout to_csv(cursor)
# Custom delimiter to_csv(cursor, ‘data.tsv’, delimiter=’ ‘)
# Override header names to_csv(cursor, ‘users.csv’, headers=[‘User ID’, ‘Full Name’, ‘Email’])
Database writing utilities and ETL table operations.
- class dbtk.writers.database.DatabaseWriter(data, target_cursor, target_table, batch_size=1000, commit_frequency=10000)[source]
Bases:
BaseWriterDatabase writer that extends BaseWriter.
- __init__(data, target_cursor, target_table, batch_size=1000, commit_frequency=10000)[source]
Initialize database writer.
- Parameters:
data – Source cursor or list of records
target_cursor – Target database cursor
target_table (str) – Name of target table
batch_size (int) – Number of records to insert per batch
commit_frequency (int) – How often to commit (in number of records)
- dbtk.writers.database.cursor_to_cursor(source_data, target_cursor, target_table, batch_size=1000, commit_frequency=10000)[source]
Copy data from source cursor/results to target database table.
- Parameters:
source_data – Source cursor or list of records
target_cursor – Target database cursor
target_table (str) – Name of target table
batch_size (int) – Number of records to insert per batch
commit_frequency (int) – How often to commit (in number of records)
- Returns:
Number of records inserted
- Return type:
int
Example
# Copy between databases source_cursor = source_db.cursor() source_cursor.execute(“SELECT * FROM users”)
target_cursor = target_db.cursor() count = cursor_to_cursor(source_cursor, target_cursor, ‘users_copy’) print(f”Copied {count} records”)
Excel writer for database results using openpyxl.
- class dbtk.writers.excel.ColumnRule(style=None, header_style=None, width=None, hidden=False, comment=None, filter=False, group_label=None, conditional_style=None)[source]
Bases:
objectPer-column formatting rule for use with
ExcelFormat.Interchangeable with a plain
dict—ExcelWriteraccepts either. The order of rules inExcelFormat.columnsis significant: later rules override earlier ones on a per-property basis.- Parameters:
style (str or dict, optional) – Static style applied to every data cell in the column. Either a registered style name (str) or an inline property dict (
bg_color,font,number_format,alignment).header_style (str or dict, optional) – Static style applied to the header cell only (owns the cell entirely — include
font={'bold': True}if you still want bold).width (float, optional) – Explicit column width in Excel units; bypasses auto-sizing.
hidden (bool, default False) – Hide the column.
comment (str, optional) – Comment/note added to the header cell.
filter (bool, default False) – Show the auto-filter dropdown on this column only; hides dropdowns on all other columns.
group_label (str, optional) – Label for a group super-header spanning this column range. Only valid on range patterns (
'col_a:col_z').conditional_style (callable or list[callable], optional) –
lambda record: style_name_or_None— evaluated per row; overridesstyleand row styles when non-None. Multiple callables are composed in order.
- class dbtk.writers.excel.ExcelFormat(styles=None, columns=None, rows=None, min_column_width=6, max_column_width=60, auto_filter=False, freeze=None, header_auto_rotate=None, tab_color=None)[source]
Bases:
objectFormatting configuration for
ExcelWriter.Interchangeable with a plain
dict—ExcelWriteraccepts either. UsingExcelFormatgives full IDE autocomplete and type checking.The
columnsdict is order-sensitive: patterns are applied in definition order and later rules override earlier ones per property.- Parameters:
styles (dict, optional) – Named style definitions. Keys are style names, values are property dicts (
bg_color,font,number_format,alignment). Styles defined here take priority over the built-in styles, so you can redefinecomma_style,currency_style, etc.columns (dict, optional) – Pattern →
ColumnRule(or plain dict). Patterns are case-insensitive fnmatch globs, literals, or'start:end'ranges.rows (dict, optional) –
Row-type formatting. Recognised keys:
'*'— applied to all rows (lowest priority). Supportsheightandstyle(str or dict).'group_header'— the group-label row (only present when column ranges carrygroup_label). Supportsheightandstyle(str or dict).'header'— the column-header row. Supportsheight.'data'— data rows. Nested keys:'style': str or dict applied to all data rows.'odd'/'even':{'style': style_name}for striping.'conditional_style': callable or list of callableslambda record: style_name_or_None. Multiple callables are composed in order.'height': uniform row height for data rows.
Cascade (lowest → highest priority):
'*'→'odd'/'even'→'conditional_style'callable(s).min_column_width (float, default 6) – Minimum auto-sized column width.
max_column_width (float, default 60) – Maximum auto-sized column width.
auto_filter (bool, default False) – Enable Excel auto-filter dropdowns on the header row.
freeze (str or False, optional) – Freeze-panes cell reference (e.g.
'D2').Noneuses the automatic default ('A2'or'A3'when group headers are present). PassFalseto disable freezing entirely.header_auto_rotate (float or dict, optional) – Auto-rotate column headers that are significantly wider than their data. Pass a float ratio or
{'ratio': 1.5, 'min_length': 8, 'height_factor': 6.5}.tab_color (str, optional) – Worksheet tab color as a hex string (
'#FF0000'or'FF0000').
- class dbtk.writers.excel.ExcelWriter(data=None, file=None, sheet_name=None, headers=None, write_headers=True, formatting=None)[source]
Bases:
BatchWriterStateful Excel writer using openpyxl.
Keeps the workbook open across multiple write_batch() calls and saves only on context exit. Designed for both single-sheet legacy use and multi-sheet reports.
Supports all 3 BatchWriter modes: 1. Complete write from __init__ + write() 2. Batch write (no data on init) + write_batch() 3. Hybrid: data on init + write() + write_batch()
Usage examples:
# Mode 1: Traditional single-shot write ExcelWriter(cursor, 'report.xlsx').write() # Mode 2: Pure streaming with write_batch() with ExcelWriter(file='report.xlsx') as writer: writer.write_batch(cursor) # goes to sheet 'Data' # Mode 3: Hybrid - initial data + streaming with ExcelWriter(first_batch, 'report.xlsx') as writer: writer.write() # Write initial batch writer.write_batch(second_batch) # Stream additional batches # Multi-sheet report with ExcelWriter(file='report.xlsx', sheet_name='Summary') as writer: writer.write_batch(summary_data, sheet_name='Summary') writer.write_batch(users_data, sheet_name='Users') writer.write_batch(orders_data, sheet_name='Orders') # Streaming / batch mode with ExcelWriter(file='large.xlsx') as writer: for batch in large_generator: writer.write_batch(batch, sheet_name='Data') # appends to 'Data'
- __init__(data=None, file=None, sheet_name=None, headers=None, write_headers=True, formatting=None)[source]
Initialize the Excel writer.
- Parameters:
data (Iterable[RecordLike], optional) – Initial data to write. If None, use write_batch() for streaming mode.
file (str or Path, optional) – Output Excel file (.xlsx). Required for Excel output.
sheet_name (str, optional) – Default/active sheet name to use for write_batch() calls without an explicit sheet_name.
headers (List[str], optional) – Header row text. If None, checks data.description for original column names, then falls back to detected column names.
write_headers (bool, default True) – Whether to write column headers (only when the sheet is empty).
formatting (ExcelFormat or dict, optional) – Worksheet formatting rules. Prefer
ExcelFormatfor IDE autocomplete; a plaindictis also accepted. SeeExcelFormatfor the full reference.
- accepts_file_handle = False
- preserve_types = True
- write_batch(data, sheet_name=None, headers=None, formatting=None)[source]
Write a batch of data to a sheet.
If this is the first write to this sheet in the current session, the sheet is cleared first. Subsequent writes to the same sheet append data.
- Parameters:
data (Iterable[RecordLike]) – The data batch
sheet_name (str, optional) – Target sheet. If None, uses active_sheet or defaults to ‘Data’
headers (list of str, optional) – Display names for the header row. Overrides the writer-level
headersset at initialisation for this batch only. Must match the column count.formatting (ExcelFormat or dict, optional) – Per-call formatting override.
None(default) uses the writer-level formatting. Pass{}to write this sheet with no formatting.
- class dbtk.writers.excel.LinkSource(name, source_sheet=None, key_column=None, url_template=None, text_template=None, missing_text=None, external_only=False)[source]
Bases:
objectDefines a linkable data source for creating internal and external hyperlinks in Excel.
LinkSource enables rich hyperlinking between worksheets and to external URLs. Each LinkSource is registered once with LinkedExcelWriter and can be used across multiple sheets to create consistent, navigable Excel reports.
As the source sheet is written, LinkSource caches each record’s cell reference and generates formatted link text and URLs using Python’s str.format_map(). Later sheets can reference these cached records to create hyperlinks.
Link Types
Internal links - Excel cell references like
#Students!A5for navigationExternal links - HTTP/HTTPS URLs like
https://crm.example.com/contact/123Hybrid mode - Stores both internal and external, defaults to external if available
- param name:
Unique identifier for this link source. Used when registering links in write_batch() (e.g.,
links={"student_name": "student"}).- type name:
str
- param source_sheet:
The worksheet name that serves as the authoritative source for this entity. Internal links will point to rows in this sheet. This sheet must be written before any sheets that reference it. Not required when external_only=True.
- type source_sheet:
str, optional
- param key_column:
The column name containing unique identifiers for records (e.g., “student_id”). This column must exist in both the source sheet data and any sheets that create links to it. Not required when external_only=True.
- type key_column:
str, optional
- param url_template:
Python format string for generating external URLs. Uses str.format_map() with the full record dict as context. Example:
"https://app.com/users/{user_id}"- type url_template:
str, optional
- param text_template:
Python format string for generating link display text. Uses str.format_map() with the full record dict. Example:
"{last_name}, {first_name} ({dept})"If not provided, uses the column value.- type text_template:
str, optional
- param missing_text:
Fallback text to display when a link target cannot be resolved. If None, displays the raw value from the detail row.
- type missing_text:
str, optional
- param external_only:
If True, this LinkSource generates external links directly from current row data without caching. Can be reused across multiple sheets. source_sheet and key_column are not required. If False (default), caches records for cross-sheet linking.
- type external_only:
bool, default False
- _records
Internal cache mapping key values to link metadata (ref, display_text, url). Populated automatically as the source sheet is written.
- Type:
dict
Examples
Internal links only (sheet navigation):
student_link = LinkSource( name="student", source_sheet="Students", key_column="student_id" )
External links with custom text:
employee_link = LinkSource( name="employee", source_sheet="Employees", key_column="employee_id", url_template="https://hr.company.com/profile/{employee_id}", text_template="{last_name}, {first_name} ({department})" )
Hybrid with missing value handling:
customer_link = LinkSource( name="customer", source_sheet="Customers", key_column="customer_id", url_template="https://crm.company.com/customers/{crm_id}", text_template="{company_name} - {contact_name}", missing_text="[Unknown Customer]" )
External-only links (reusable across sheets):
# For external links - reusable on any sheet with required columns imdb_link = LinkSource( name="imdb", url_template="https://imdb.com/title/{tconst}", text_template="{primary_title} ({start_year})", external_only=True # No source_sheet needed - works on any sheet ) # Use on multiple sheets with same columns: writer.write_batch(movies, "Movies", links={"primary_title": "imdb"}) writer.write_batch(top_rated, "Top Rated", links={"primary_title": "imdb"})
Notes
The source sheet MUST be written before sheets that reference it
All template fields must exist in the record data or KeyError will be logged
Key values are converted to strings for cache lookups
Templates use Python’s str.format_map() - use double braces {{}} to escape
See also
LinkedExcelWriterWriter that uses LinkSource for hyperlinking
LinkedExcelWriter.register_link_sourceMethod to register a LinkSource
- __init__(name, source_sheet=None, key_column=None, url_template=None, text_template=None, missing_text=None, external_only=False)[source]
- cache_record(key_value, row_dict, ref)[source]
Cache a record for cross-sheet linking (unless external_only=True).
- generate_link_from_row(row_dict, ref, mode='external', column_value=None)[source]
Generate link info directly from row data (for self-linking or external-only).
Used when writing the source sheet itself to create links from current row instead of looking up from cache.
- Parameters:
row_dict (dict) – The current row’s data as a dictionary
ref (str) – The cell reference for internal links (e.g., “#Movies!A5”)
mode (str) – “external” or “internal”
column_value (Any, optional) – For external_only sources, the value from the linked column
- Returns:
Dict with “target” and “display_text”, or None if link cannot be generated
- Return type:
dict or None
- get_link(key_value, mode='external')[source]
Resolve link for a key.
mode: “external” (default) or “internal” Returns dict with “target” and “display_text” or None if missing.
- property max_display_width: int
Maximum display text width observed across sampled links.
Samples first 100 records, capped at 50 characters to handle outliers. Used for automatic column width sizing.
- class dbtk.writers.excel.LinkedExcelWriter(data=None, file=None, sheet_name=None, headers=None, write_headers=True, formatting=None)[source]
Bases:
ExcelWriterAdvanced Excel writer with internal and external hyperlink management.
LinkedExcelWriter extends ExcelWriter to enable rich, bidirectional hyperlinking within Excel workbooks and to external systems. It automatically caches source records as they’re written and creates formatted hyperlinks in detail sheets that reference those sources.
This is particularly powerful for creating navigable multi-sheet reports with master-detail relationships, drill-through capabilities, and integration with external CRM, ticketing, or web applications.
Key Features
Internal navigation - Links between worksheets (e.g.,
#Students!B5)External integration - Deep links to web applications
Hybrid linking - Store both internal and external, choose which to display
Template-based formatting - Use Python format strings for link text and URLs
Automatic caching - Source records cached as written, no manual tracking
Mode control - Force internal or external links per column via
source:internal
Workflow
Create LinkSource definitions for each linkable entity
Register them with LinkedExcelWriter
Write source sheets first (e.g., Students, Products)
Write detail sheets with link specifications (e.g., Enrollments, Orders)
Links are resolved from cache and applied automatically
- param file:
Output Excel file (.xlsx)
- type file:
str or Path
- param data:
Initial data to write. If None, use write_batch() for streaming mode.
- type data:
Iterable[RecordLike], optional
- param sheet_name:
Default sheet name for write_batch() calls
- type sheet_name:
str, optional
- param write_headers:
Whether to write column headers
- type write_headers:
bool, default True
Examples
Basic internal linking between sheets:
with LinkedExcelWriter(file='school_report.xlsx') as writer: # Define linkable entity student_link = LinkSource( name="student", source_sheet="Students", key_column="student_id" ) writer.register_link_source(student_link) # Write source sheet writer.write_batch(students_data, sheet_name="Students") # Write detail sheet with internal links writer.write_batch( enrollments_data, sheet_name="Enrollments", links={"student_name": "student:internal"} )
External links to CRM system:
with LinkedExcelWriter(file='sales_report.xlsx') as writer: customer_link = LinkSource( name="customer", source_sheet="Customers", key_column="customer_id", url_template="https://crm.company.com/customers/{crm_id}", text_template="{company_name} ({customer_id})" ) writer.register_link_source(customer_link) writer.write_batch(customers_data, sheet_name="Customers") writer.write_batch( orders_data, sheet_name="Orders", links={"customer": "customer"} # Uses external URL )
Hybrid mode with internal and external links:
with LinkedExcelWriter(file='support_tickets.xlsx') as writer: ticket_link = LinkSource( name="ticket", source_sheet="Tickets", key_column="ticket_id", url_template="https://support.company.com/ticket/{ticket_id}", text_template="#{ticket_id} - {subject}" ) writer.register_link_source(ticket_link) writer.write_batch(tickets_data, sheet_name="Tickets") writer.write_batch( comments_data, sheet_name="Comments", links={ "ticket_link": "ticket", # External to support system "ticket_ref": "ticket:internal" # Internal sheet navigation } )
Multiple link sources in one sheet:
with LinkedExcelWriter(file='class_roster.xlsx') as writer: student_link = LinkSource( name="student", source_sheet="Students", key_column="student_id", text_template="{last_name}, {first_name}" ) course_link = LinkSource( name="course", source_sheet="Courses", key_column="course_id", text_template="{course_code} - {title}" ) writer.register_link_source(student_link) writer.register_link_source(course_link) writer.write_batch(students_data, sheet_name="Students") writer.write_batch(courses_data, sheet_name="Courses") writer.write_batch( enrollments_data, sheet_name="Enrollments", links={ "student_name": "student:internal", "course_name": "course:internal" } )
Notes
Source sheets MUST be written before detail sheets that reference them
The key_column must exist in both source and detail datasets
Missing links display missing_text if set, otherwise show raw value
Link mode syntax:
"source_name"(external) or"source_name:internal"Templates use str.format_map() - all fields must exist in record data
Hyperlink styling (blue, underlined) is applied automatically
See also
LinkSourceLink definition class
ExcelWriterBase writer without linking capabilities
- __init__(data=None, file=None, sheet_name=None, headers=None, write_headers=True, formatting=None)[source]
- link_sources: Dict[str, LinkSource]
Registered LinkSource instances, keyed by name.
- write_batch(data, sheet_name=None, headers=None, links=None, formatting=None)[source]
Write a batch with optional hyperlinking.
If this is the first write to this sheet in the current session, the sheet is cleared first. Subsequent writes to the same sheet append data.
headers: display names for this batch, overriding the writer-level default links: dict column_name → “source_name” or “source_name:internal” formatting: per-call formatting override; None uses writer-level formatting,
{} writes with no formatting
- dbtk.writers.excel.to_excel(data, file, sheet='Data', headers=None, write_headers=True)[source]
Legacy convenience function — writes a single sheet.
- Parameters:
data (Iterable[RecordLike]) – Data to write (cursor, list of Records, etc.)
file (str or Path) – Output Excel file (.xlsx)
sheet (str, default 'Data') – Sheet name to write to
headers (List[str], optional) – Header row text. If None, uses cursor.description or detected column names
write_headers (bool, default True) – Whether to write column headers
Examples
# Write cursor with original database column names to_excel(cursor, ‘report.xlsx’)
# Override header names to_excel(cursor, ‘report.xlsx’, headers=[‘User ID’, ‘Full Name’, ‘Email’])
For multi-sheet or advanced reports, use ExcelWriter as a context manager with write_batch().
Fixed-width and EDI text writers with batch streaming support.
Both writers are driven by FixedColumn schema objects — the same objects used by FixedReader and EDIReader on the read side. When input records are already FixedWidthRecord instances (e.g. round-tripped through a reader), they go straight to to_line(). Any other record type is cast into the appropriate FixedWidthRecord subclass first.
- class dbtk.writers.fixed_width.EDIWriter(data=None, file=None, columns=None, encoding='utf-8', truncate_overflow=False)[source]
Bases:
BatchWriterWriter for fixed-width files containing multiple record types (EDI-like formats).
Symmetric counterpart to
EDIReader. Takes the sameDict[str, List[FixedColumn]]schema and dispatches writes by record type code (always the first field of each record).When input records are already
FixedWidthRecordinstances (e.g. fromEDIReader), the type code is read fromrecord[0]and checked against the schema, thento_line()is called directly. Any other record type is cast into the appropriateFixedWidthRecordsubclass.- Parameters:
data (Iterable[RecordLike], optional) – Initial data to write.
Nonefor streaming mode.file (str, Path, TextIO, or BinaryIO, optional) – Output file or handle.
Nonewrites to stdout.columns (Dict[str, List[FixedColumn]]) – Mapping of type codes to column definitions — same format as
EDIReader. All keys must be strings of identical length.encoding (str, default
'utf-8') – File encoding.truncate_overflow (bool, default
False) – Truncate values that exceed their column width.False(default) raisesValueError— EDI files are typically length-strict.
Examples
Read-modify-write loop:
from dbtk.readers.fixed_width import EDIReader from dbtk.writers.fixed_width import EDIWriter from dbtk.formats.edi import ACH_COLUMNS with open('in.ach') as fp, EDIWriter('out.ach', ACH_COLUMNS) as w: w.write_batch(EDIReader(fp, ACH_COLUMNS))
Single-shot:
records = list(EDIReader(open('in.ach'), ACH_COLUMNS)) to_edi(records, ACH_COLUMNS, 'out.ach')
- accepts_file_handle = True
- preserve_types = True
- class dbtk.writers.fixed_width.FixedWidthWriter(data=None, file=None, columns=None, encoding='utf-8', truncate_overflow=True)[source]
Bases:
BatchWriterFixed-width text file writer with batch streaming capabilities.
Field widths, alignment, and padding are driven by a
List[FixedColumn]schema — the same schema used byFixedReader. When input records are alreadyFixedWidthRecordinstances they are written directly viato_line(). Any other record type (dict, list, namedtuple, generic Record) is cast into the appropriateFixedWidthRecordsubclass first.- Parameters:
data (Iterable[RecordLike], optional) – Initial data to write.
Nonefor streaming mode.file (str, Path, TextIO, or BinaryIO, optional) – Output file or handle.
Nonewrites to stdout.columns (List[FixedColumn]) – Column definitions — width, alignment, padding, and type per field.
encoding (str, default
'utf-8') – File encoding.truncate_overflow (bool, default
True) – Truncate values that exceed their column width.FalseraisesValueErrorinstead.
Examples
Round-trip from FixedReader:
with open('input.txt') as fp: records = list(FixedReader(fp, MY_COLS)) to_fixed_width(records, MY_COLS, 'output.txt')
Write from dicts:
rows = [{'code': 'A', 'amount': 42}, ...] FixedWidthWriter(rows, 'output.txt', MY_COLS).write()
Streaming / batch mode:
with FixedWidthWriter(file='output.txt', columns=MY_COLS) as w: for batch in source: w.write_batch(batch)
- accepts_file_handle = True
- preserve_types = True
- dbtk.writers.fixed_width.to_edi(data, columns, file=None, encoding='utf-8', truncate_overflow=False)[source]
Export EDI records to a fixed-width text file.
- Parameters:
data – Iterable of FixedWidthRecord (or compatible) instances.
columns (Dict[str, List[FixedColumn]]) – Dict mapping type codes to FixedColumn definitions.
file (str | Path | None) – Output file path. If None, writes to stdout.
encoding (str) – File encoding.
truncate_overflow (bool) – Truncate values that exceed column width.
- dbtk.writers.fixed_width.to_fixed_width(data, columns, file=None, encoding='utf-8', truncate_overflow=True)[source]
Export records to a fixed-width text file.
- Parameters:
data – Iterable of records (FixedWidthRecord, dict, list, etc.)
columns (List[FixedColumn]) – FixedColumn definitions for width, alignment, and padding.
file (str | Path | None) – Output file path. If None, writes to stdout.
encoding (str) – File encoding.
truncate_overflow (bool) – Truncate values that exceed column width.
JSON writer for database results.
- class dbtk.writers.json.JSONWriter(data=None, file=None, columns=None, encoding='utf-8', indent=2, compression='infer', **json_kwargs)[source]
Bases:
BaseWriterJSON writer class that extends BaseWriter.
- __init__(data=None, file=None, columns=None, encoding='utf-8', indent=2, compression='infer', **json_kwargs)[source]
Initialize JSON writer.
- Parameters:
data – Cursor object or list of records
file (str | Path | None) – Output file. If None, writes to stdout
columns (List[str] | None) – Column names for list-of-lists data (optional for other types)
encoding (str) – File encoding
indent (int | None) – JSON indentation - defaults to 2 (pretty-print), 0 or None for compact
compression (str) – Compression type. ‘infer’ detects from file extension (.gz, .bz2, .xz). Pass ‘gzip’, ‘bz2’, or ‘lzma’ to override, or None to disable.
**json_kwargs – Additional arguments passed to json.dump
- class dbtk.writers.json.NDJSONWriter(data=None, file=None, columns=None, encoding='utf-8', compression='infer', **json_kwargs)[source]
Bases:
BatchWriterNDJSON (newline-delimited JSON) writer.
- __init__(data=None, file=None, columns=None, encoding='utf-8', compression='infer', **json_kwargs)[source]
Initialize NDJSON writer.
- Parameters:
data – Cursor object or list of records
file (str | Path | None) – Output file. If None, writes to stdout
columns (List[str] | None) – Column names for list-of-lists data (optional for other types)
encoding (str) – File encoding
compression (str) – Compression type. ‘infer’ detects from file extension (.gz, .bz2, .xz). Pass ‘gzip’, ‘bz2’, or ‘lzma’ to override, or None to disable.
**json_kwargs – Additional arguments passed to json.dumps
- dbtk.writers.json.to_json(data, file=None, encoding='utf-8', indent=2, compression='infer', **json_kwargs)[source]
Export cursor or result set to JSON file as an array of dictionaries.
- Parameters:
data – Cursor object or list of records
file (str | Path | None) – Output file. If None, writes to stdout
encoding (str) – File encoding
indent (int | None) – JSON indentation - defaults to 2 (pretty-print), 0 or None for compact
**json_kwargs – Additional arguments passed to json.dump
Example
# Write to file as JSON array to_json(cursor, ‘users.json’)
# Write to stdout to_json(cursor)
# Compact format to_json(cursor, ‘data.json’, indent=None)
- dbtk.writers.json.to_ndjson(data, file=None, encoding='utf-8', compression='infer', **json_kwargs)[source]
Export cursor or result set to NDJSON (newline-delimited JSON) file.
- Parameters:
data – Cursor object or list of records
file (str | Path | None) – Output file. If None, writes to stdout
encoding (str) – File encoding
**json_kwargs – Additional arguments passed to json.dumps
Example
# Write to file as NDJSON to_ndjson(cursor, ‘users.ndjson’)
# Write to stdout to_ndjson(cursor)
XML writer for database results using lxml.
- class dbtk.writers.xml.XMLStreamer(data=None, file=None, columns=None, encoding='utf-8', root_element='data', record_element='record')[source]
Bases:
BatchWriterStreaming XML writer that writes records incrementally.
Memory-efficient for large datasets. Writes XML elements as they arrive without building the entire tree in memory.
- Parameters:
data (Iterable[RecordLike], optional) – Initial data. For streaming mode, use data=None.
file (str, Path, or BinaryIO, optional) – Output file or binary file handle. Must be binary mode for streaming.
columns (List[str], optional) – Column names for list-of-lists data
encoding (str, default 'utf-8') – XML encoding declaration
root_element (str, default 'data') – Name of the root XML element
record_element (str, default 'record') – Name of each record element
Examples
Streaming mode:
with open('output.xml', 'wb') as f: with XMLStreamer(data=None, file=f, root_element='data') as writer: for batch in surge.batched(records): writer.write_batch(batch)
Single-shot mode:
XMLStreamer(data=records, file='output.xml').write()
Notes
Requires lxml library
File must be opened in binary mode (‘wb’) for streaming
No pretty-printing (streaming writes compact XML)
More memory-efficient than XMLWriter for large datasets
- class dbtk.writers.xml.XMLWriter(data=None, file=None, columns=None, encoding='utf-8', root_element='data', record_element='record', pretty=True)[source]
Bases:
BaseWriterXML writer that builds complete XML tree in memory.
Best for small to medium datasets. For large datasets that don’t fit in memory, use XMLStreamer instead.
- Parameters:
data (Iterable[RecordLike]) – Data to write
file (str, Path, TextIO, or BinaryIO, optional) – Output file or file handle. If None, writes to stdout.
columns (List[str], optional) – Column names for list-of-lists data
encoding (str, default 'utf-8') – XML encoding declaration
root_element (str, default 'data') – Name of the root XML element
record_element (str, default 'record') – Name of each record element
pretty (bool, default True) – Whether to format with indentation
Examples
>>> to_xml(cursor, 'users.xml') >>> to_xml(records, 'output.xml', root_element='users', record_element='user')
- __init__(data=None, file=None, columns=None, encoding='utf-8', root_element='data', record_element='record', pretty=True)[source]
Initialize XML writer.
- preserve_types = False
- dbtk.writers.xml.check_dependencies()[source]
Check for optional dependencies and issue warnings if missing.
- dbtk.writers.xml.to_xml(data, file=None, encoding='utf-8', root_element='data', record_element='record', stream=False, pretty=None)[source]
Export cursor or result set to XML file.
- Parameters:
data (Iterable[RecordLike]) – Cursor object or list of records
file (str or Path, optional) – Output file. If None, writes to stdout (limited to 20 rows)
encoding (str, default 'utf-8') – XML encoding declaration
root_element (str, default 'data') – Name of the root XML element
record_element (str, default 'record') – Name of each record element
stream (bool, default False) – Whether to use streaming mode (reduces memory usage for large datasets)
pretty (bool, optional) – Whether to format with indentation. Defaults to True for tree mode, False for streaming mode.
Examples
Write to file:
to_xml(cursor, 'users.xml')
Write to stdout (limited to 20 rows):
to_xml(cursor)
Custom element names with streaming:
to_xml(cursor, 'active_users.xml', root_element='users', record_element='user', stream=True)
Notes
Tree mode (stream=False): Builds complete XML tree in memory, supports pretty printing
Streaming mode (stream=True): Memory-efficient, writes incrementally, no pretty printing
For large datasets (>100K rows), use stream=True
Utilities
Utility functions for dbtk.
- class dbtk.utils.ErrorDetail(message, field=None, code=None)[source]
Bases:
objectStructured error record for ETL and database operations.
Captures a single error with optional field attribution and driver-specific error code. Used by
dbtk.etl.table.Table(last_error) anddbtk.etl.managers.IdentityManager(per-entity_errorslist), and round-trips cleanly through JSON viasave_state/load_state.- message
Human-readable description of the error.
- Type:
str
- field
Name of the source or target field the error is associated with.
Nonewhen the error is not specific to a single field.- Type:
str, optional
- code
Database- or application-level error code (e.g.
pgcodefrom psycopg2, an ORA- number, or a custom application string).Nonewhen no structured code is available.- Type:
str, optional
- __init__(message, field=None, code=None)[source]
Create an ErrorDetail.
- Parameters:
message (str) – Human-readable description of the error.
field (str, optional) – Field the error relates to, or
None.code (str, optional) – Structured error code, or
None.
- code
- field
- message
- class dbtk.utils.FixedColumn(name, start_pos, end_pos=None, column_type='text', comment=None, align=None, pad_char=None, width=None)[source]
Bases:
objectColumn definition for fixed width files
- __init__(name, start_pos, end_pos=None, column_type='text', comment=None, align=None, pad_char=None, width=None)[source]
- Parameters:
name (str) – database column name
start_pos (int) – start position of field, first position is 1 not 0
end_pos (int) – end position of field (mutually exclusive with width)
column_type (str) – text, int, float, date
comment (str) – discription for column usage/options
align (str) – override alignment (left, right, center)
pad_char (str) – override pad character
width (int) – field width in characters (mutually exclusive with end_pos)
FixedColumn(‘birthdate’, 25, 35, ‘date’) FixedColumn(‘birthdate’, 25, width=11, column_type=’date’)
- property width: int
- class dbtk.utils.ParamStyle[source]
Bases:
objectSQL parameter placeholder styles for different database drivers.
Different database drivers use different parameter placeholder formats. This class provides constants and utilities for working with these formats:
QMARK: Question mark placeholders (?, ?) - SQLite, ODBC
NUMERIC: Numeric placeholders (:1, :2) - Oracle
NAMED: Named placeholders (:name, :email) - Oracle, psycopg2
FORMAT: Printf-style (%s, %s) - MySQL (MySQLdb)
PYFORMAT: Python format (%(name)s) - psycopg2, pymysql
Example
- ::
>>> ParamStyle.get_placeholder('qmark') '?' >>> ParamStyle.get_placeholder('named') ':1'
- DEFAULT = 'named'
- FORMAT = 'format'
- NAMED = 'named'
- NUMERIC = 'numeric'
- PYFORMAT = 'pyformat'
- QMARK = 'qmark'
- classmethod get_positional_style(paramstyle)[source]
Return a positional paramstyle, mapping named style to the corresponding positional style if needed.
- classmethod named_styles()[source]
Parameter styles where parameters must be in dict instead of tuple
- dbtk.utils.batch_iterable(iterable, batch_size)[source]
Batch an iterable into chunks of specified size.
- Parameters:
iterable (Iterable[Any]) – The iterable to batch
batch_size (int) – Size of each batch
- Yields:
Lists of items up to batch_size length
- dbtk.utils.normalize_field_name(name)[source]
Normalize field name for attribute access.
Converts to lowercase, replaces non-alphanumeric characters with underscores, collapses consecutive underscores, and strips leading/trailing underscores. A single leading underscore is preserved only if the original name explicitly started with one — underscores synthesized from other leading characters (e.g.
$,#) are stripped.- Parameters:
name (str) – Original field name
- Returns:
Normalized field name suitable for Python attribute access
- Return type:
str
Examples
>>> normalize_field_name('Start Year') 'start_year' >>> normalize_field_name('Start Year!') 'start_year' >>> normalize_field_name('!Status') 'status' >>> normalize_field_name('_Secret_Code!') '_secret_code' >>> normalize_field_name('$Secret_Code!') 'secret_code' >>> normalize_field_name('__id__') '_id' >>> normalize_field_name('_row_num') '_row_num' >>> normalize_field_name('#Term Code') 'term_code' >>> normalize_field_name('2025 Sales') 'n2025_sales'
- dbtk.utils.process_sql_parameters(sql, paramstyle)[source]
Process SQL parameters according to the specified paramstyle.
Supports SQL input in either ‘named’ (:param) or ‘pyformat’ (%(param)s) format. Auto-detects the input format and converts to the target paramstyle.
- Parameters:
sql (str) – SQL query with named (:name) or pyformat (%(name)s) parameters
paramstyle (str) – The desired parameter style for the resulting SQL string
- Returns:
A tuple containing the processed SQL query string and a tuple of all named parameters extracted in the order in which they appear in the original query.
- Raises:
ValueError – If the SQL contains mixed parameter formats or unsupported paramstyle.
- Return type:
Tuple[str, Tuple[str, …]]
Examples
>>> # Named input, convert to pyformat >>> process_sql_parameters("SELECT * FROM users WHERE id = :user_id", "pyformat") ("SELECT * FROM users WHERE id = %(user_id)s", ('user_id',))
>>> # Pyformat input, convert to qmark >>> process_sql_parameters("SELECT * FROM users WHERE id = %(user_id)s", "qmark") ("SELECT * FROM users WHERE id = ?", ('user_id',))
- dbtk.utils.quote_identifier(identifier)[source]
Quote identifier, handling qualified names by splitting on dots.
- dbtk.utils.to_string(obj)[source]
Convert a value to string representation.
- Parameters:
obj (Any) – Value to convert
- Returns:
String representation
- Return type:
str
- dbtk.utils.validate_identifier(identifier, max_length=64, allow_temp=False)[source]
Validate that an identifier is safe for use (even if it needs quoting). Returns the identifier if valid, raises ValueError if invalid.
- Parameters:
identifier (str) – The identifier to validate
max_length (int) – Maximum length for identifier
allow_temp (bool) – If True, allow underscore or hash prefix for temp tables
ETL
ETL (Extract, Transform, Load) operations and utilities.
This module provides tools for managing database tables, resolving identities, and performing bulk operations:
Table: Schema-aware table operations with automatic SQL generation, field mapping, transformations, and per-operation error tracking vialast_error.DataSurge: Row-oriented bulk INSERT, UPDATE, DELETE, and MERGE usingexecutemany. Works with all drivers; supportsdb_exprcolumns.BulkSurge: High-throughput bulk loading via native database mechanisms (PostgreSQLCOPY, Oracle direct-path, SQL Serverbcp, etc.). Memory-efficient streaming; does not supportdb_exprcolumns.IdentityManager: Resumable source-to-target identity resolution with per-entity status, error, and message tracking. State can be saved/loaded as JSON.ValidationCollector: Callable collector for fn-pipelines that enriches and accumulates unique codes from source data.TableLookup: Cached SQL-backed lookup transform for use in fn-pipelines and as a resolver forIdentityManager.column_defs_from_db(): GenerateTablecolumn definitions by introspecting a live database table.
Example
from dbtk.etl import Table, DataSurge, BulkSurge, IdentityManager
# Define table structure
table = Table('users', columns={
'id': {'field': 'id', 'key': True},
'name': {'field': 'full_name', 'nullable': False},
'email': {'field': 'email', 'fn': 'email'}
}, cursor=cursor)
# Row-oriented operations (INSERT + UPDATE/MERGE in same run, or db_expr columns)
surge = DataSurge(table)
surge.upsert(new_records) # INSERT new, UPDATE existing
surge.delete(stale_records)
# High-throughput load (no db_expr columns, INSERT only)
bulk = BulkSurge(table)
bulk.insert(records) # Streams via COPY / direct-path / bcp
# Identity resolution
stmt = cursor.prepare_file('sql/resolve_user.sql')
im = IdentityManager('source_id', 'user_id', resolver=stmt)
for row in reader:
entity = im.resolve(row)
- class dbtk.etl.BulkSurge(table, batch_size=10000, pass_through=False)[source]
Bases:
BaseSurgeLightning-fast bulk loading using native database tools and streaming.
BulkSurge provides high-performance data loading by leveraging database-specific bulk loading mechanisms. It supports both direct streaming (zero temp files) and external tool-based loading (bcp, SQL*Loader) depending on the database and method chosen.
Supported Databases
PostgreSQL/Redshift: COPY FROM STDIN (streaming, no temp files)
Oracle: direct_path_load (streaming) or SQL*Loader (external tool)
MySQL/MariaDB: LOAD DATA LOCAL INFILE via temp file (external only)
SQL Server: bcp utility (external tool, requires named connection)
Loading Methods
- direct (default): Uses native Python drivers for streaming bulk loads
Postgres: COPY protocol with background writer thread
Oracle: direct_path_load API (requires python-oracledb 3.4+)
- external: Dumps CSV then loads via command-line tool or SQL command
Oracle: SQL*Loader (sqlldr) with auto-generated control file
MySQL/MariaDB: LOAD DATA LOCAL INFILE from temp CSV file
SQL Server: bcp utility (only external method available)
Performance Notes
BulkSurge is memory-efficient: uses batching and streaming to handle large datasets
Direct methods are typically faster and require no temp files
External tools require credentials from config file (see connection_name)
Tables with db_expr columns are incompatible (use DataSurge instead)
- param table:
Table instance with column definitions and cursor
- type table:
Table
- param batch_size:
Number of records per batch (default: 10,000)
- type batch_size:
int, optional
- param pass_through:
Skip transformation and validation, using source data directly (default: False).
When to use:
Database-to-database copies with identical schemas
Pre-transformed data from upstream pipelines (already validated)
Cursor-to-cursor streaming (maximum throughput)
Raw positional tuples pre-ordered for binding parameters
What’s skipped: Field mapping, type coercion, default values, null value handling, required field validation, and Table.set_values() overhead.
Warning: Do NOT use if records might have missing required fields, mismatched field names, need type transformations, or data quality is uncertain. All database constraints (primary keys, foreign keys) still apply.
- type pass_through:
bool, optional
- total_read
Total rows read from source. 1-based (first row = 1). Includes both loaded and skipped rows.
- Type:
int
- total_loaded
Total rows successfully loaded.
- Type:
int
- skipped
Total rows skipped due to missing required fields.
- Type:
int
- skip_details
Skip tracking grouped by reason. Key is a frozenset of missing required field names. Value is a dict with:
count: total rows skipped for this reasonsample: list of up to 20 1-based row numbers (for debugging)
Example:
{frozenset({'primary_name'}): {'count': 5, 'sample': [937887, 957847, ...]}}
- Type:
dict
- dump_path
Path of the last file written by dump(). Set after each dump() call.
- Type:
Path or None
Examples
PostgreSQL streaming (zero temp files):
with BulkSurge(table) as surge: surge.load(reader) # Uses COPY FROM STDIN
Oracle with SQL*Loader:
surge = BulkSurge(table) surge.load(reader, method='external') # Uses sqlldr
MySQL with custom dump location:
surge = BulkSurge(table) surge.load(reader, method='external', dump_path='/data/staging')
SQL Server with bcp (requires config):
db = dbtk.connect('prod_mssql') # Named connection required surge = BulkSurge(table) surge.load(reader) # Uses bcp
Fast database-to-database copy (matching schemas):
# Source and destination schemas match exactly surge = BulkSurge(dest_table, pass_through=True) surge.load(source_cursor.fetchall())
Pre-transformed data (already validated):
# Data already transformed and validated by upstream process surge = BulkSurge(table, pass_through=True) surge.load(validated_records)
See also
- dump(records, filename=None, write_headers=True, delimiter=',', encoding='utf-8', **csv_args)[source]
Export records to a delimited file.
Resolves the output path, writes all records via CSVWriter, and sets
self.dump_pathto the resolved path for callers that need it. For Oracle connections, automatically generates a SQL*Loader control file alongside the data file.- Parameters:
records (Iterable[Record]) – Records to export
filename (Union[str, Path], optional) – Target file path (directory or full path). See _resolve_file_path for resolution priority.
write_headers (bool, optional) – Include column headers (default: True)
delimiter (str, optional) – Field delimiter character (default: ‘,’). Extension is inferred: ‘t’ → .tsv, anything else → .csv
encoding (str, optional) – File encoding (default: ‘utf-8’)
**csv_args (optional) – Additional keyword arguments passed to csv.writer (e.g. quoting, quotechar, escapechar)
- Returns:
int – Number of records written
Side Effects
————
Sets
self.dump_pathto the resolved output Path.
- Return type:
int
Notes
Oracle Auto-generation: When connected to Oracle, automatically generates a SQL*Loader control file (.ctl) alongside the data file and logs the sqlldr command to run.
Examples
Export with auto-generated Oracle control file:
db = dbtk.connect('oracle_prod') surge = BulkSurge(table) surge.dump(reader, '/staging/export.csv') # Creates: /staging/export.csv + /staging/export_a1b2c3d4.ctl # Logs sqlldr command, e.g.: # sqlldr userid=USER/PASS@DB control=... data=...
Custom delimiter with no quoting (e.g. for bcp):
surge.dump(reader, '/staging/data.csv', delimiter='\x1f', quoting=csv.QUOTE_NONE, escapechar=None)
- load(records, method='direct', dump_path=None)[source]
Bulk load records using database-specific mechanisms.
Automatically selects the appropriate loading strategy based on database type and method parameter. Direct methods use streaming with zero temp files when possible. External methods use command-line tools and require a named connection.
- Parameters:
records (Iterable[Record]) – Iterator of Record objects to load
method (str, optional) – Loading method to use (default: ‘direct’) - ‘direct’: Stream data using native drivers (Postgres COPY, Oracle direct_path_load) - ‘external’: Dump CSV then load (Oracle sqlldr, SQL Server bcp, MySQL LOAD DATA LOCAL INFILE)
dump_path (str or Path, optional) – Path for temp CSV files (only used by external methods) - If file path: use exactly as specified - If directory: generate timestamped file in that directory - If None: use settings[‘data_dump_dir’] or temp directory
- Returns:
Number of records successfully loaded
- Return type:
int
- Raises:
RuntimeError – If external method requires credentials but connection_name is not set
NotImplementedError – If database type is not supported or required driver features unavailable
Notes
PostgreSQL/Redshift: - Only uses direct method (COPY FROM STDIN) - Streaming with background writer thread, no temp files - If you need to use psql copy, used BulkSurge.dump() to generate transformed CSV file
Oracle: - Direct: Uses direct_path_load (requires python-oracledb 3.4+) - External: Uses SQL*Loader (sqlldr) with auto-generated control file - External method requires named connection from config
MySQL/MariaDB: - Only external method supported (direct raises NotImplementedError) - Dumps CSV then executes LOAD DATA LOCAL INFILE - Requires local_infile=1 on server
SQL Server: - Only external method (uses bcp utility) - Requires named connection from config for credentials - Supports integrated auth (Windows) if no user/password in config
Examples
Direct streaming (default):
surge = BulkSurge(table) surge.load(reader) # Streams data, zero temp files
Oracle SQL*Loader:
surge = BulkSurge(table) surge.load(reader, method='external', dump_path='/staging')
SQL Server with bcp (requires config connection):
db = dbtk.connect('prod_mssql') # Must use named connection table = Table('dbo.orders', columns=..., cursor=db.cursor()) surge = BulkSurge(table) surge.load(reader) # Uses bcp with credentials from config
See also
dumpExport records to CSV file
- class dbtk.etl.DataSurge(table, batch_size=None, use_transaction=False, pass_through=False)[source]
Bases:
BaseSurgeHandles bulk ETL operations by delegating to a stateful Table instance.
Note: The Table instance’s state (self.values) is modified during processing. Ensure the Table is not used concurrently by other operations or threads.
- Parameters:
table (Table) – Table instance with column definitions and cursor
batch_size (int, optional) – Number of records per batch (default: cursor.batch_size or 1000)
use_transaction (bool, optional) – Wrap all operations in a transaction (default: False)
pass_through (bool, optional) –
Skip transformation and validation, using source data directly (default: False). Only compatible for inserts. Not compatible with columns with database expressions db_expr
When to use:
Database-to-database copies with identical schemas
Pre-transformed data from upstream pipelines (already validated)
Raw positional tuples pre-ordered for binding parameters
What’s skipped: Field mapping, type coercion, default values, null value handling, required field validation, and Table.set_values() overhead.
Warning: Do NOT use if records might have missing required fields, mismatched field names, need type transformations, or data quality is uncertain. All database constraints (primary keys, foreign keys) still apply.
- total_read
Total rows read from source. 1-based (first row = 1). Includes both loaded and skipped rows.
- Type:
int
- total_loaded
Total rows successfully loaded.
- Type:
int
- skipped
Total rows skipped due to missing required fields.
- Type:
int
- skip_details
Skip tracking grouped by reason. Key is a frozenset of missing required field names. Value is a dict with:
count: total rows skipped for this reasonsample: list of up to 20 1-based row numbers (for debugging)
Example:
{frozenset({'primary_name'}): {'count': 5, 'sample': [937887, 957847, ...]}}
- Type:
dict
Examples
Standard ETL with transformation:
table = Table(..., cursor=cursor) surge = DataSurge(table, batch_size=1000, use_transaction=True) errors = surge.insert(records, raise_error=False)
Fast database-to-database copy (matching schemas):
# Source and destination schemas match exactly surge = DataSurge(dest_table, batch_size=5000, pass_through=True) surge.insert(source_cursor)
Pre-transformed data (already validated):
# Data already transformed and validated by upstream process surge = DataSurge(table, pass_through=True) surge.insert(validated_records)
- __init__(table, batch_size=None, use_transaction=False, pass_through=False)[source]
Initialize DataSurge for bulk operations.
- Parameters:
table – Table instance with schema metadata
batch_size (int | None) – Number of records per batch
use_transaction (bool) – Use transaction for all operations (default: False)
pass_through (bool) – Skip transformation/validation for trusted data (default: False)
- load(records, operation=None, raise_error=True)[source]
Core bulk execution using executemany() — shared path for insert/update/delete/merge.
- dbtk.etl.Lookup(table, key_cols, return_cols, *, cache=1, missing=None)[source]
One-liner database lookup for Table column configs.
- class dbtk.etl.QueryLookup(query=None, filename=None, return_col=None, missing=None)[source]
Bases:
objectDeferred PreparedStatement lookup for use as a Table column transform.
Accepts a SQL query string or file path. Cursor binding is deferred until the Table is initialized with a cursor. Use with
field='*'to pass the full source row as bind variables — PreparedStatement ignores extra keys.- Parameters:
query (str, optional) – Inline SQL string.
filename (str or Path, optional) – Path to a SQL file.
return_col (str, optional) – Column name to extract from the result row. By default, the first column will be returned. Set to ‘*’ if you want to return multiple values to the next stage of the pipeline.
missing (any, optional) – Value to return when the query returns no rows. Default
None.
Example
id_lookup_query = ''' SELECT p.id, p.last_name, p.first_name FROM people p LEFT JOIN employees e ON e.id = p.id WHERE (p.email = :email OR e.tax_id = :tax_id) ''' emp_cols = { 'id': {'field': '*', 'primary_key': True, 'fn': dbtk.etl.QueryLookup(query=id_lookup_query)}, 'first_name': {'field': 'first_name', 'required': True},' ... }
- class dbtk.etl.Table(name, columns, cursor, null_values=('', 'NULL', '<null>', '\\N'), is_temp=False)[source]
Bases:
objectStateful table class for ETL operations with schema-aware SQL generation.
The Table class provides a high-level interface for database operations by maintaining table metadata and current record state. It automatically generates parameterized SQL statements and handles field mapping, data transformations, and requirement validation.
Key features: - Field mapping: Map source record fields to database columns - Transformations: Apply functions to clean/transform data before database operations - Database functions: Use database-side functions (e.g., CURRENT_TIMESTAMP) - Default values: Provide constant values for columns - Requirement validation: Track required/nullable columns and validate before operations - Automatic SQL generation: Generate INSERT, UPDATE, SELECT, DELETE, MERGE statements - Operation tracking: Count successful operations and incomplete records via self.counts
Column Configuration
Each column in the columns dict is configured with a dict containing. Shorthand: An empty dict
{}defaults the field name to the column name. For example:'email': {}is equivalent to'email': {'field': 'email'}.field (str or list of str or ‘*’): Source field name(s) from input records. If list, extracts multiple fields as a list value. If ‘*’, passes the entire record to the transformation function instead of a single field value. If omitted, column is populated via ‘default’ or ‘db_expr’.
default (any or callable, optional): Default value for this column. Applied when the source field is missing, empty, or None. If callable (e.g. a zero-argument lambda), it is called at
set_values()time so the value is resolved on each record rather than at column-definition time. Useful for values that come from runtime context (CLI args, job IDs, etc.) that aren’t available when columns are defined:conf_vars = {} # populated later after table is initialized columns = { 'user_id': {'default': lambda: conf_vars['user_id']}, }
fn (Union[callable, List[callable], str], optional): Transformation function(s) to apply to the source value. callable: applied directly; list: functions applied in order (pipeline); str: magic shorthand (e.g. ‘int’, ‘maxlen:255’, ‘lookup:table:col:val’). See
dbtk.etl.transforms.core.fn_resolver()for full shorthand reference.db_expr (str, optional): Database-side function call (e.g., ‘CURRENT_TIMESTAMP’, ‘UPPER(#)’). Use ‘#’ as placeholder for the bind parameter. If specified without ‘#’, no bind parameter is created (useful for CURRENT_TIMESTAMP, etc.).
primary_key (bool, optional, default False): Marks column as primary key. Automatically implies
required=True. Alias:key.key (bool, optional, default False): Alias for
primary_key. Either may be used interchangeably.auto_key (bool, optional, default False): Convenience flag: sets both
primary_key=Trueandauto_gen=True. Ideal for typical auto-increment primary keys.nullable (bool, optional, default True): Controls whether the column must have a value for INSERT/UPDATE/MERGE.
nullable=Falseis the anti-alias ofrequired=True— both mark the column as required.required (bool, optional, default False): Explicitly marks column as required. Anti-alias of
nullable=False.auto_gen (bool, optional, default False): If True, the column is omitted from INSERT statements. The database is expected to provide the value (e.g. AUTO_INCREMENT, DEFAULT CURRENT_TIMESTAMP, GENERATED ALWAYS, etc.). The column remains fully included in all other operations.
no_update (bool, optional, default False): If True, excludes column from UPDATE and MERGE operations.
bind_name (str, auto-generated): Sanitized parameter name for SQL bind variables. Automatically created from column name. Can not be specified in the column definition.
Example
import dbtk from dbtk.etl import Table with dbtk.connect('fire_nation_db') as db: cursor = db.cursor() soldiers = Table('fire_nation_army', { # Primary key from source 'recruit_id' field 'soldier_id': { 'field': 'recruit_id', 'primary_key': True }, # Required field with transformation 'enlistment_date': { 'field': 'join_date', 'fn': 'date', 'nullable': False }, # Optional field with chained transformations 'firebending_level': { 'field': 'flame_skill', 'fn': [str.strip, 'int'] # Clean then convert }, # Constant value for all records 'status': { 'default': 'active' }, # Database-side function with parameter 'combat_name': { 'field': 'full_name', 'db_expr': 'generate_callsign(#)' }, # Database-side function, no parameter 'created_at': { 'db_expr': 'CURRENT_TIMESTAMP' }, # Multiple source fields as list 'contact_methods': { 'field': ['email', 'phone', 'pigeon'] }, # Empty dict shorthand - field name matches column name 'rank': {}, # Equivalent to {'field': 'rank'} 'division': {}, # Whole record access for multi-field decisions 'vip_status': { 'field': '*', 'fn': lambda record: 'VIP' if record.get('years_service', 0) > 10 else 'Regular' } }, cursor=cursor) # Set values from source record soldiers.set_values({ 'recruit_id': 'FN001', 'join_date': '2024-03-15', 'flame_skill': ' 7 ', 'full_name': 'Zuko' }) # Execute operations soldiers.execute('insert') # Automatically validates requirements print(soldiers.counts) # {'insert': 1, 'update': 0, ...}
- values
Current record values keyed by bind name (populated by
set_values()).- Type:
dict
- counts
Operation counters with keys:
insert,update,delete,select,merge,records,incomplete.- Type:
dict
- last_error
The error detail from the most recent
execute()call. Set toNoneon success, or andbtk.utils.ErrorDetailonDatabaseError(whenraise_error=False). Cleared on every successful execution and oncursor()reassignment.- Type:
ErrorDetail or None
- OPERATIONS = ('insert', 'select', 'update', 'delete', 'merge')
- __init__(name, columns, cursor, null_values=('', 'NULL', '<null>', '\\N'), is_temp=False)[source]
Initialize Table with schema configuration and database cursor.
Creates a Table instance that manages the mapping between source data fields and database columns, along with all metadata needed for SQL generation and data validation.
- Parameters:
name (str) – Database table name. Must be a valid SQL identifier.
columns (Dict[str, Dict[str, Any]]) – Dictionary mapping database column names to their configuration. Each column is configured with a dict containing options like ‘field’, ‘fn’, ‘default’, ‘db_expr’, ‘primary_key’, ‘nullable’, etc. See class docstring for complete column configuration options.
cursor (Cursor) – Database cursor instance. Provides connection to database and determines SQL parameter style (qmark, named, format, pyformat, numeric).
null_values (Tuple[str, ...]) – Tuple of string values that should be treated as NULL. When set_values() encounters these strings, they are converted to None. Default: (‘’, ‘NULL’, ‘<null>’, ‘N’)
is_temp (bool) – If True, allows underscore or hash prefix for temporary table names. Default: False
- Raises:
ValueError – If table name or column names are invalid SQL identifiers.
Example
cursor = db.cursor() table = Table('users', { 'user_id': {'field': 'id', 'primary_key': True}, 'email': {'field': 'email_address', 'nullable': False}, 'created': {'db_expr': 'CURRENT_TIMESTAMP'} }, cursor=cursor)
- property columns: dict
- db_expr_cols()[source]
Return list of all columns that use database expressions. Having db_expr on columns will make the table incompatible with some features such as all BulkSurge operations and DataSurge operations in pass_through mode.
- execute(operation, raise_error=False)[source]
Execute the specified database operation using current record values.
- Parameters:
operation (str) – One of
'insert','select','update','delete','merge'.raise_error (bool, default False) – If True, re-raise
DatabaseErrorinstead of swallowing it. If False, the error is captured inlast_errorand 1 is returned.
- Returns:
int – 0 on success; 1 when requirements are unmet (incomplete record) or when a
DatabaseErroroccurs andraise_error=False.Side Effects
————
*
counts[operation]incremented on success.*
counts['incomplete']incremented when requirements are unmet.*
last_errorset toNoneon success or an –dbtk.utils.ErrorDetailon database error.
- Raises:
ValueError – If
operationis not valid, or required key columns are missing.DatabaseError – If the underlying execute fails and
raise_error=True.
- Return type:
int
- force_positional()[source]
If cursor paramstyle is named style, switch to the corresponding positional style then rebuild all SQL.
- get_column_definitions(all_cols=False)[source]
Introspect database table columns to get type information.
Executes a SELECT * query against the database table and returns type information for columns defined in this Table object. Validates that all Table columns exist in the database.
- Parameters:
all_cols (bool) – If True, return all columns from the database table. If False, return only columns on the Table object.
- Returns:
(column_name, type_obj, internal_size, precision, scale, sql_type_def) where sql_type_def is the SQL type string like ‘VARCHAR(100)’ or ‘NUMBER(10,2)’
- Return type:
List of tuples
- Raises:
ValueError – If a column defined in this Table doesn’t exist in the database
Example
>>> table = Table('users', {'id': {}, 'email': {}}, cursor=cursor) >>> col_defs = table.get_column_definitions() >>> for name, type_obj, size, prec, scale, sql_type in col_defs: ... print(f"{name}: {sql_type}")
- get_sql(operation)[source]
Returns SQL for an operation. Automatically generates SQL if it hasn’t been created yet (lazy).
- is_ready(operation)[source]
Fast O(1) check if the current record is ready for the given operation.
- property key_cols: Tuple[str]
- property name: str
- property param_config: Dict[str, Tuple[str, ...]]
- property paramstyle: str
- property req_cols: Tuple[str]
- property row_count: int
- class dbtk.etl.TableLookup(cursor, table, key_cols, return_cols=None, cache=1)[source]
Bases:
objectDatabase table lookup with configurable caching for ETL transformations.
Performs lookups against database tables or views using PreparedStatement for efficient repeated queries. Supports three caching strategies:
CACHE_NONE (0): No caching, always query database
CACHE_LAZY (1): Cache results as encountered (default)
CACHE_PRELOAD (2): Preload entire table into memory upfront
Can operate in three modes: - Validator: No return_cols specified, returns bool indicating existence - Single lookup: One return column, returns scalar value - Multi lookup: Multiple return columns, returns row object (type depends on cursor)
Examples
Validator mode:
temple_validator = TableLookup(cursor, 'temples', key_cols='temple_id') is_valid = temple_validator({'temple_id': 'eastern_air_temple'}) # True/False
Single value lookup:
state_lookup = TableLookup(cursor, 'states', key_cols='name', return_cols='abbreviation', cache=TableLookup.CACHE_PRELOAD) # Small table, preload it abbrev = state_lookup({'name': 'California'}) # 'CA'
Multi-value lookup:
address_lookup = TableLookup(cursor, 'addresses', key_cols='address_id', return_cols=['street', 'city', 'state', 'zip']) address = address_lookup({'address_id': 123}) # Record/dict/namedtuple
Multi-key lookup:
person_lookup = TableLookup(cursor, 'people', key_cols=['first_name', 'last_name'], return_cols='person_id', cache=TableLookup.CACHE_NONE) # Large table, don't cache person_id = person_lookup({'first_name': 'Aang', 'last_name': 'Avatar'})
Use in Table config:
table = Table('citizens', { 'state_abbrev': {'field': 'state_name', 'fn': state_lookup} }, cursor=cursor)
- CACHE_LAZY = 1
- CACHE_NONE = 0
- CACHE_PRELOAD = 2
- __init__(cursor, table, key_cols, return_cols=None, cache=1)[source]
Initialize TableLookup with table schema and caching strategy.
- Parameters:
cursor (Cursor) – Database cursor for executing queries
table (str) – Table or view name to query
key_cols (str or list of str) – Column name(s) to use in WHERE clause. Cannot be empty.
return_cols (str, list of str, or None) – Column(s) to return. If None, operates as validator (returns bool)
cache (int, default 1 (CACHE_LAZY)) – Caching strategy: - 0 (CACHE_NONE): No caching, always query database - 1 (CACHE_LAZY): Cache results as encountered - 2 (CACHE_PRELOAD): Preload entire table into memory
- Raises:
ValueError – If key_cols is empty or cache value is invalid
- property cursor
- dbtk.etl.Validate(table, key_cols, *, cache=1, on_fail='warn')[source]
One-liner validation — returns original value if key exists in table.
- dbtk.etl.column_defs_from_db(cursor, table_name, add_comments=False)[source]
Generate column definitions from database table schema.
Inspects the database table structure and returns a Python dictionary string representation of column configurations ready to use with the Table class.
- Parameters:
cursor – Database cursor (any cursor type)
table_name (str) – Name of table to analyze (supports schema.table format)
add_comments (bool) – Include table/column comments from database metadata
- Returns:
String containing Python dict of column definitions
- Return type:
str
Example
>>> print(column_defs_from_db(cursor, 'users')) { 'id': {'field': 'id', 'primary_key': True}, 'name': {'field': 'name', 'nullable': False}, 'email': {'field': 'email'}, 'created_at': {'db_expr': 'CURRENT_TIMESTAMP'} }
>>> # Copy output into your code: >>> table = Table('users', columns={ ... 'id': {'field': 'id', 'primary_key': True}, ... 'name': {'field': 'name', 'nullable': False}, ... 'email': {'field': 'email'}, ... 'created_at': {'db_expr': 'CURRENT_TIMESTAMP'} ... }, cursor=cursor)
ETL Managers
Orchestration tools for multi-stage, resumable ETL processes.
IdentityManager provides lightweight, incremental identity resolution for imports where a reliable source primary key exists in source data.
- class dbtk.etl.managers.EntityStatus[source]
Bases:
objectStatus constants for the entity resolution lifecycle.
- PENDING
Entity has been registered but resolution has not yet been attempted.
- Type:
str
- RESOLVED
Entity was successfully matched;
target_keyis populated.- Type:
str
- STAGED
Entity exists in a staging table but has not yet been matched to the target system (e.g. an ERP record not yet confirmed).
- Type:
str
- ERROR
An error occurred while creating or updating the entity. Possibly impacting downstream processing.
- Type:
str
- SKIPPED
Resolution was intentionally bypassed for this entity.
- Type:
str
- NOT_FOUND
Resolution was attempted but no matching record was found in the target.
- Type:
str
- ERROR = 'error'
- NOT_FOUND = 'not_found'
- PENDING = 'pending'
- RESOLVED = 'resolved'
- SKIPPED = 'skipped'
- STAGED = 'staged'
- VALUES = ('pending', 'resolved', 'staged', 'error', 'skipped', 'not_found')
- class dbtk.etl.managers.IdentityManager(source_key, target_key, resolver=None, alternate_keys=None)[source]
Bases:
objectLightweight, resumable identity-resolution cache for ETL imports.
Maps source-system primary keys to target-system identifiers using a SQL resolver query. Resolved entities are stored as
dbtk.record.Recordinstances keyed bysource_keyand enriched with status, messages, errors, and any configuredalternate_keys.State can be persisted to JSON between runs with
save_state()and restored withload_state(), allowing long-running and multi-stage imports to be resumed without re-querying already-resolved entities.- Parameters:
source_key (str) – Field name for the source-system primary key (e.g.
'student_id').target_key (str) – Field name for the target-system primary key that the resolver returns (e.g.
'erp_person_id').resolver (PreparedStatement or TableLookup, optional) – Query used to look up a
target_keyfrom asource_key. Can be set or replaced later via theresolverproperty.alternate_keys (list of str, optional) – Additional key fields to track per entity (e.g.
['staging_id', 'erp_vendor_id']). These are persisted alongsidetarget_keyin saved state.
- entities
Mapping of source_key value → resolved
dbtk.record.Record. Each Record contains all resolver columns plus_status,_errors,_messages, and anyalternate_keys.- Type:
dict
Example
stmt = cursor.prepare_file('sql/resolve_student.sql') im = IdentityManager('student_id', 'erp_person_id', resolver=stmt, alternate_keys=['banner_id']) for row in reader: entity = im.resolve(row) if entity['_status'] == EntityStatus.RESOLVED: table.set_values(row) if table.execute('insert'): # returns 1 on DB error im.add_error(row['student_id'], table.last_error) else: im.add_error(row['student_id'], ErrorDetail('Not found', field='student_id')) im.save_state('state/students.json') # to initialize from saved state em = IdentityManager.load_state('state/students.json', resolver=stmt) em.batch_resolve([EntityStatus.STAGED])
- __init__(source_key, target_key, resolver=None, alternate_keys=None)[source]
Initialize IdentityManager.
- Parameters:
source_key (str) – Field name of the source-system primary key.
target_key (str) – Field name of the target-system primary key. Must be returned by the resolver. Set equal to source_key to skip ID resolution.
resolver (PreparedStatement or TableLookup, optional) – Resolution query. Accepts either type;
TableLookupis unwrapped to its underlyingPreparedStatement.alternate_keys (list of str, optional) – Additional key fields to persist and track per entity.
- add_error(source_id, error)[source]
Append an
dbtk.utils.ErrorDetailto an entity’s_errorslist.- Parameters:
source_id (str) – Source-system key identifying the entity (must already be cached).
error (ErrorDetail) – Structured error to attach to the entity.
- add_message(source_id, message)[source]
Append an informational message to an entity’s
_messageslist.- Parameters:
source_id (str) – Source-system key identifying the entity (must already be cached).
message (str) – Message text to append.
- batch_resolve(additional_statuses=None)[source]
Re-run the resolver for all entities whose status is PENDING or NOT_FOUND.
Useful after bulk-loading staging data when some entities could not be resolved on first pass. Initializes the record factory from a dry-run resolver call if it has not yet been set up.
- Parameters:
additional_statuses (optional list of str) – Additional statuses to resolve in addition to EntityStatus.NOT_FOUND and EntityStatus.PENDING
- calc_stats()[source]
Count entities by status.
- Returns:
Mapping of each
EntityStatusvalue to the number of entities currently at that status.- Return type:
dict
Example
stats = im.calc_stats() print(stats) # {'pending': 0, 'resolved': 142, 'staged': 5, 'error': 3, ...}
- get_id(source_id, id_type)[source]
Retrieve a target or alternate key value for a cached entity.
- Parameters:
source_id (str) – Source-system key identifying the entity (must already be cached).
id_type (str) – Either
target_keyor one ofalternate_keys.
- Returns:
The stored identifier value, or
Noneif not yet set.- Return type:
str or None
- Raises:
ValueError – If
id_typeis not thetarget_keyor a registeredalternate_key.
- classmethod load_state(path, resolver=None)[source]
Restore an IdentityManager from a previously saved JSON file.
Re-creates the entity Record factory from
field_orderstored in the file. Deserializes_errorslists back todbtk.utils.ErrorDetailinstances.- Parameters:
path (str or Path) – Path to the JSON file written by
save_state().resolver (PreparedStatement or TableLookup, optional) – Resolver to attach to the restored instance. If the saved file has no
field_order, the resolver is used as a fallback to initialize the record factory.
- Returns:
Fully restored instance with all entities re-hydrated.
- Return type:
- resolve(value)[source]
Resolve a source key to a target entity, caching the result.
- Parameters:
value (scalar, dict, or Record) –
scalar — treated as the raw
source_keyvalue. The resolver is called with{source_key: value}and the returned entity is cached but the caller’s record is not mutated.dict or Record —
source_keyis extracted from the mapping. On a successful resolution thetarget_keyis written back into the caller’s record.
- Returns:
The cached/resolved entity Record, or
Noneifsource_keycannot be found invalue.- Return type:
Record or None
- Raises:
ValueError – If the resolved
target_keyconflicts with a value already present in the caller’s record.
- property resolver: PreparedStatement | None
The active PreparedStatement used for resolution queries.
- save_state(path)[source]
Persist the current entity cache to a JSON file.
The file captures
source_key,target_key,alternate_keys,field_order(for factory reconstruction), summary stats, and the full entity dict.dbtk.utils.ErrorDetailobjects are serialized to{"message": ..., "field": ..., "code": ...}dicts.- Parameters:
path (str or Path) – Destination file path. Parent directory must exist.
- set_id(source_id, id_type, value)[source]
Store a target or alternate key value for a cached entity.
- Parameters:
source_id (str) – Source-system key identifying the entity (must already be cached).
id_type (str) – Either
target_keyor one ofalternate_keys.value (str) – The identifier value to store.
- Raises:
ValueError – If
id_typeis not thetarget_keyor a registeredalternate_key.
- class dbtk.etl.managers.ValidationCollector(lookup=None, return_col=None)[source]
Bases:
objectCallable collector/enricher for fn pipelines.
- During row-wise processing:
Collects unique codes
Optionally enriches them with descriptions using TableLookup
Can return a specific field from the lookup result instead of the raw code
- Supports:
Preload mode: instant enrichment, perfect valid/new split
Lazy mode: enrich on first encounter
No lookup: pure collection
Set
return_colto the field name you want returned;None(default) returns the raw code.- added: Dict[Any, Any]
- collect_new(code, **fields)[source]
Attach extra fields to a newly-encountered code for later bulk insertion.
No-ops immediately when the preceding
__call__did not add a new code (_recently_addedis False), so it is safe to call unconditionally on every record. First annotation wins — subsequent calls for the same code are ignored.- Parameters:
code (Any) – The code value that was passed to the validator (used as a cross-check and as the key into
added).**fields – Extra columns to store, e.g.
stvcipc_desc=record.cip_discipline.
Example
cip_validator = ValidationCollector(lookup=cip_lookup) for record in reader: stvmajr.set_values(record) # triggers cip_validator(record.cip_code) cip_validator.collect_new(record.cip_code, stvcipc_desc=record.cip_discipline)
- existing: Dict[Any, Any]
- get_all()[source]
Get all codes (existing + added) as a set.
Useful for filtering with tools like polars that need a set/list of valid values rather than a callable.
- Returns:
Union of existing codes and added codes
- Return type:
set
Example
# Collect valid titles title_collector = ValidationCollector() for record in titles: title_collector(record['tconst']) # Use with polars filtering all_titles = title_collector.get_all() df = pl.scan_csv('principals.tsv.gz').filter( pl.col('tconst').is_in(all_titles) ) # Or with dbtk reader filtering reader.add_filter(lambda r: r.tconst in title_collector) # Uses __contains__
ETL Transforms
Data transformation and parsing functions.
This package provides utilities for: - Date/time parsing with timezone support (transforms.datetime) - Phone number validation and formatting (transforms.phone) - Email validation and cleaning (transforms.email) - Database-backed code validation and lookups (transforms.database) - Basic data type conversions and text manipulation (transforms.core - exported here)
Core functions are available directly from this module for convenience. Specialized functionality requires explicit submodule imports.
- Usage:
# Core functions - direct import from dbtk.etl.transforms import get_int, coalesce, capitalize
# Specialized functions - submodule import from dbtk.etl.transforms.datetime import parse_date, parse_datetime from dbtk.etl.transforms.phone import Phone, PhoneFormat from dbtk.etl.transforms.email import email_validate from dbtk.etl.transforms.database import CodeValidator
- Optional Dependencies:
- phonenumbers - For robust international phone number support
pip install phonenumbers
- dateutil - For additional date/time parsing flexibility
pip install python-dateutil
- usaddress - For parsing and cleaning address strings
pip install usaddress
- dbtk.etl.transforms.Lookup(table, key_cols, return_cols, *, cache=1, missing=None)[source]
One-liner database lookup for Table column configs.
- class dbtk.etl.transforms.TableLookup(cursor, table, key_cols, return_cols=None, cache=1)[source]
Bases:
objectDatabase table lookup with configurable caching for ETL transformations.
Performs lookups against database tables or views using PreparedStatement for efficient repeated queries. Supports three caching strategies:
CACHE_NONE (0): No caching, always query database
CACHE_LAZY (1): Cache results as encountered (default)
CACHE_PRELOAD (2): Preload entire table into memory upfront
Can operate in three modes: - Validator: No return_cols specified, returns bool indicating existence - Single lookup: One return column, returns scalar value - Multi lookup: Multiple return columns, returns row object (type depends on cursor)
Examples
Validator mode:
temple_validator = TableLookup(cursor, 'temples', key_cols='temple_id') is_valid = temple_validator({'temple_id': 'eastern_air_temple'}) # True/False
Single value lookup:
state_lookup = TableLookup(cursor, 'states', key_cols='name', return_cols='abbreviation', cache=TableLookup.CACHE_PRELOAD) # Small table, preload it abbrev = state_lookup({'name': 'California'}) # 'CA'
Multi-value lookup:
address_lookup = TableLookup(cursor, 'addresses', key_cols='address_id', return_cols=['street', 'city', 'state', 'zip']) address = address_lookup({'address_id': 123}) # Record/dict/namedtuple
Multi-key lookup:
person_lookup = TableLookup(cursor, 'people', key_cols=['first_name', 'last_name'], return_cols='person_id', cache=TableLookup.CACHE_NONE) # Large table, don't cache person_id = person_lookup({'first_name': 'Aang', 'last_name': 'Avatar'})
Use in Table config:
table = Table('citizens', { 'state_abbrev': {'field': 'state_name', 'fn': state_lookup} }, cursor=cursor)
- CACHE_LAZY = 1
- CACHE_NONE = 0
- CACHE_PRELOAD = 2
- __init__(cursor, table, key_cols, return_cols=None, cache=1)[source]
Initialize TableLookup with table schema and caching strategy.
- Parameters:
cursor (Cursor) – Database cursor for executing queries
table (str) – Table or view name to query
key_cols (str or list of str) – Column name(s) to use in WHERE clause. Cannot be empty.
return_cols (str, list of str, or None) – Column(s) to return. If None, operates as validator (returns bool)
cache (int, default 1 (CACHE_LAZY)) – Caching strategy: - 0 (CACHE_NONE): No caching, always query database - 1 (CACHE_LAZY): Cache results as encountered - 2 (CACHE_PRELOAD): Preload entire table into memory
- Raises:
ValueError – If key_cols is empty or cache value is invalid
- property cursor
- exhaustive: bool
- dbtk.etl.transforms.Validate(table, key_cols, *, cache=1, on_fail='warn')[source]
One-liner validation — returns original value if key exists in table.
- dbtk.etl.transforms.capitalize(val)[source]
Title case a string, but only if it’s already in all uppercase or all lowercase.
- Parameters:
val (Any) – String value to capitalize
- Returns:
Title-cased string if applicable, otherwise original value
- Return type:
Any
Example
capitalize(“HELLO WORLD”) # “Hello World” capitalize(“hello world”) # “Hello World” capitalize(“Hello World”) # “Hello World” (unchanged)
- dbtk.etl.transforms.coalesce(vals)[source]
Returns the first non-empty and non-None value from a list or tuple.
- Parameters:
vals (List[Any] | tuple) – List or tuple of values to check
- Returns:
First non-empty, non-None value, or the original input if not a list/tuple
- Return type:
Any
Example
coalesce([None, ‘’, ‘first’, ‘second’]) # “first” coalesce([0, 1, 2]) # 0 coalesce([None, None]) # None
- dbtk.etl.transforms.email_clean(val)[source]
Clean and normalize email address.
- Parameters:
val (str) – Email address string
- Returns:
Cleaned email address (lowercase, stripped) or empty string if invalid
- Return type:
str
Example
email_clean(” User@Example.COM “) # “user@example.com” email_clean(“invalid.email”) # “” email_clean(”user@domain.co”) # “user@domain.co”
- dbtk.etl.transforms.email_validate(val)[source]
Validate email address format.
Uses a practical regex that catches most valid emails and rejects obviously invalid ones. Not RFC 5322 compliant but good enough for most data validation needs.
- Parameters:
val (str) – Email address string to validate
- Returns:
True if email appears valid, False otherwise
- Return type:
bool
Example
email_validate(”user@example.com”) # True email_validate(”user.name@domain.co”) # True email_validate(“invalid.email”) # False email_validate(“@domain.com”) # False email_validate(“”) # False
- dbtk.etl.transforms.fn_resolver(shorthand)[source]
Convert a concise string shorthand into a transformation function.
Syntax
Shorthands take one of three forms:
name simple name, no arguments e.g. 'int' name:arg1:arg2 name with colon-separated args e.g. 'maxlen:200' type.method:arg1 cast to type, call method e.g. 'str.rjust:+9:0'
Parameters passed after the first
:are strings by default. Prefix an integer with+for non-negative or-for negative to force it to int (required where the called function expects an integer, e.g.str.rjust).True,False, andNoneare parsed to their Python equivalents.Names
- Type conversion — None-safe (empty string or None returns None):
int,float,booldigitsstrip non-digit charactersnumberparse formatted numbers (e.g.'1,234.56')- Date / time:
date,datetime,time,timestamp- String:
maxlen:50truncate to 50 characterssplit_and_get:0first comma-delimited fieldsplit_and_get:1: `` second tab-delimited field ``str.methodall string methods (capitalize, upper, lower, split) are available using cast and call format, see below.- List / Tuple:
nth:0first element of a sequence (nth:-1→ last)coalescefirst non-empty, non-None value- Boolean indicators:
indicatorTrue →'Y', False/None →Noneindicator:Y:NTrue →'Y', False →'N'indicator:N:YTrue →'N', False →'Y'(inverted)indicator:None:YTrue →None, False →'Y'(active-flag inversion)indicator:1:0True →'1', False →'0'- Cast and call (
type.method): str.upperstr(val).upper()str.strip:="strip=and"chars (e.g. Excel="val")str.lstrip:0strip leading zerosstr.split:,split on comma → liststr.rjust:+9:0right-justify to width 9, padded with'0'str.ljust:+10: `` left-justify to width 10, padded with space ``datetime.strftime:%Y-%m-%dformat a parsed datetimeint.to_bytes:+4:bigint(val).to_bytes(4, 'big')float.hexfloat(val).hex()Supported cast types:
int,float,str,datetime.- Database:
lookup:table:key_col:val_collook up val_col from table by key_colvalidate:table:key_colassert key_col exists in table
- returns:
Single-argument function; or a _DeferredTransform for
lookup:/validate:that must be bound to a cursor before use.- rtype:
callable or _DeferredTransform
- raises ValueError:
If the shorthand is not recognized.
Examples
>>> fn_resolver('int')('123') 123 >>> fn_resolver('maxlen:10')('supercalifragilistic') 'supercalif' >>> fn_resolver('indicator:N:Y')(True) 'N' >>> fn_resolver('str.rjust:+9:0')('123') '000000123' >>> fn_resolver('str.split:,')('a,b,c') ['a', 'b', 'c']
- dbtk.etl.transforms.format_digits(val, pattern)[source]
Format a number string according to a pattern.
Extracts digits from the value and applies the pattern if the number of digits matches the number of ‘#’ characters in the pattern.
- Parameters:
val (Any) – Number value to format (string or numeric)
pattern (str) – Format pattern using ‘#’ for digit positions
- Returns:
Formatted string if digit count matches, otherwise original string
- Return type:
str
Example
format_digits(‘8001234567’, ‘(###) ###-####’) # “(800) 123-4567” format_digits(‘012345678’, ‘###-##-####’) # “012-34-5678” format_digits(‘(800) 123-4567’, ‘###.###.####’) # “800.123.4567” format_digits(‘12345’, ‘###-##-####’) # “12345” (wrong length)
- dbtk.etl.transforms.get_bool(val)[source]
Parse a value as a boolean.
- Parameters:
val (Any) – Value to parse
- Returns:
True for truthy values, False for falsy values, None for None
- Return type:
bool | None
Truthy: True, ‘T’, ‘TRUE’, ‘YES’, ‘Y’, ‘1’, 1, 1.0, non-zero numbers Falsy: False, ‘F’, ‘FALSE’, ‘NO’, ‘N’, ‘0’, 0, 0.0, empty string None: None
Example
get_bool(True) # True get_bool(‘yes’) # True get_bool(‘Y’) # True get_bool(1) # True get_bool(1.0) # True get_bool(False) # False get_bool(‘no’) # False get_bool(0) # False get_bool(None) # None
- dbtk.etl.transforms.get_digits(val)[source]
Extract only numeric digits from a value, preserving leading zeros.
Removes all non-digit characters except leading +/- signs. Returns a string to preserve leading zeros.
- Parameters:
val (Any) – Value to extract digits from
- Returns:
String containing only digits (and possibly leading +/-), or empty string if no value
- Return type:
str
Example
get_digits(“(800) 123-4567”) # “8001234567” get_digits(“012-34-5678”) # “012345678” get_digits(“$-42.35”) # “4235” get_digits(“+1-555-123-4567”) # “15551234567”
- dbtk.etl.transforms.get_float(val)[source]
Convert value to float.
Handles currency symbols and formatting by using to_number().
- Parameters:
val (Any) – Value to convert
- Returns:
Float value or None if val is falsy or cannot be converted
- Return type:
float | None
Example
get_float(“123.45”) # 123.45 get_float(“$123.45”) # 123.45 get_float(“123”) # 123.0 get_float(None) # None
- dbtk.etl.transforms.get_int(val)[source]
Convert value to integer.
Handles string numbers with decimals and currency symbols by using to_number().
- Parameters:
val (Any) – Value to convert
- Returns:
Integer value or None if val is falsy or cannot be converted
- Return type:
int | None
Example
get_int(“123”) # 123 get_int(“123.45”) # 123 get_int(“$123.45”) # 123 get_int(123.45) # 123 get_int(None) # None
- dbtk.etl.transforms.indicator(val, true_val='Y', false_val=None)[source]
Convert a value to a boolean indicator.
Uses get_bool() for consistent boolean interpretation.
- Parameters:
val (Any) – Value to test for truthiness
true_val (str) – Value to return if val is truthy (default: ‘Y’)
false_val (Any) – Value to return if val is falsy (default: None)
- Returns:
true_val if val is truthy, false_val otherwise, or None if val is None
- Return type:
Any
Example
indicator(True) # “Y” indicator(‘yes’) # “Y” indicator(1) # “Y” indicator(False) # None indicator(0, ‘T’, ‘F’) # “F” indicator(None) # None
- dbtk.etl.transforms.normalize_whitespace(val)[source]
Normalize whitespace in a string.
Strips leading and trailing whitespace
Collapses multiple spaces/tabs/newlines into single spaces
- Parameters:
val (Any) – Value to normalize
- Returns:
String with normalized whitespace, or empty string if no value
- Return type:
str
Example
normalize_whitespace(” hello world “) # “hello world” normalize_whitespace(“hellonnworld”) # “hello world” normalize_whitespace(“hellottworld”) # “hello world”
- dbtk.etl.transforms.parse_date(val, default_tz=None)[source]
Parse various date formats to date object.
- Parameters:
val (Any) – Date string, datetime object, or other value
default_tz (str | None) – Default timezone (not used for dates, kept for consistency)
- Returns:
date object or None if parsing fails
- Return type:
date | None
Example
parse_date(“2024-01-15”) # -> date(2024, 1, 15) parse_date(“01/15/2024”) # -> date(2024, 1, 15) parse_date(“15 Jan 2024”) # -> date(2024, 1, 15)
- dbtk.etl.transforms.parse_datetime(val)[source]
Parse various datetime formats to datetime object.
Preserves timezone if present in the input string, otherwise returns naive datetime. Use parse_datetimetz() to automatically apply default timezone from settings.
- Parameters:
val (Any) – Datetime string, date object, or other value
- Returns:
datetime object or None if parsing fails
- Return type:
datetime | None
Example
parse_datetime(“2024-01-15 14:30:00”) # -> naive datetime parse_datetime(“2024-01-15T14:30:00Z”) # -> datetime with UTC parse_datetime(“01/15/2024 2:30 PM EST”) # -> datetime with EST
- dbtk.etl.transforms.phone_clean(val, country=None)[source]
Clean and format phone number.
- Parameters:
val (Any) – Phone number value to clean
country (str | None) – ISO country code (default from settings)
- Returns:
Formatted phone number or empty string if invalid
- Return type:
str
Example
phone_clean(“555-123-4567”) # “(555) 123-4567” phone_clean(” (555) 123-4567 “) # “(555) 123-4567” phone_clean(“invalid”) # “”
- dbtk.etl.transforms.phone_validate(val, country=None)[source]
Validate phone number.
- Parameters:
val (Any) – Phone number value to validate
country (str | None) – ISO country code (default from settings)
- Returns:
True if valid phone number, False otherwise
- Return type:
bool
Example
phone_validate(“(555) 123-4567”) # True phone_validate(“555-1234”) # False phone_validate(“invalid”) # False
- dbtk.etl.transforms.split_and_get(val, index, delimiter=',')[source]
Get an item from a delimited string list.
- Parameters:
val (str) – Delimited string
index (int) – Zero-based index of item to retrieve
delimiter (str) – Delimiter character (default: ‘,’)
- Returns:
Item at specified index (stripped), or None if index out of range
- Return type:
str | None
Example
split_and_get(“a,b,c”, 1) # “b” split_and_get(“a|b|c”, 0, “|”) # “a” split_and_get(“a,b”, 5) # None
- dbtk.etl.transforms.standardize_address(address, style='standard')[source]
Standardize address formatting.
- Parameters:
address (str) – Address string to standardize
style (str) – Format style (‘standard’, ‘single_line’, ‘multiline’)
- Returns:
Standardized address string
- Return type:
str
Example
standardize_address(“123 main street, springfield il 62701”) # “123 Main St, Springfield, IL 62701”
- dbtk.etl.transforms.to_number(val)[source]
Convert a value to a number by extracting digits and converting to float.
Strips out all non-numeric characters (except leading +/- and decimal point).
- Parameters:
val (Any) – Value to convert to number
- Returns:
Float value or None if no valid number can be extracted
- Return type:
float | None
Example
to_number(“$42.35”) # 42.35 to_number(“$-42.35”) # -42.35 to_number(“1,234.56”) # 1234.56 to_number(“N/A”) # None
- dbtk.etl.transforms.validate_us_address(address)[source]
Validate US address format and components.
- Parameters:
address (str) – Address string to validate
- Returns:
True if valid US address, False otherwise
- Return type:
bool
Example
validate_us_address(“123 Main St, Springfield, IL 62701”) # True validate_us_address(“123 Main St, Springfield, XX 12345”) # False
Logging Utilities
Integration script logging with timestamped files and error tracking:
Logging utilities for integration scripts.
Provides convenient logging setup for ETL and integration scripts that follow the pattern of creating timestamped log files like script_name_YYYYMMDD_HHMMSS.log
- class dbtk.logging_utils.ErrorCountHandler(error_log_path=None, formatter=None)[source]
Bases:
HandlerCustom handler that counts ERROR and CRITICAL level messages and lazily creates error log.
- dbtk.logging_utils.cleanup_old_logs(log_dir=None, retention_days=None, pattern='*.log', dry_run=False)[source]
Remove log files older than retention period.
- Parameters:
log_dir (str | None) – Directory to clean (defaults to config setting or ‘./logs’)
retention_days (int | None) – Keep logs newer than this many days (defaults to config or 30)
pattern (str) – Glob pattern for log files (default:
'*.log')dry_run (bool) – If True, only report what would be deleted without actually deleting
- Returns:
List of deleted (or would-be-deleted if dry_run) file paths
- Return type:
List[str]
Example
import dbtk # Clean logs older than 30 days (from config) deleted = dbtk.cleanup_old_logs() print(f"Deleted {len(deleted)} old log files") # Custom retention deleted = dbtk.cleanup_old_logs(retention_days=7) # Dry run to see what would be deleted would_delete = dbtk.cleanup_old_logs(dry_run=True) print(f"Would delete: {would_delete}") # Clean specific pattern deleted = dbtk.cleanup_old_logs(pattern="error_*.log")
- dbtk.logging_utils.errors_logged()[source]
Check if any ERROR or CRITICAL messages were logged during this run.
Returns the path to the log file containing errors if any were logged, None otherwise. This allows integration scripts to easily detect if errors occurred and take action (e.g., send notification emails).
- Returns:
Path to error log (if split_errors=True) or main log (if split_errors=False) when errors were logged. Returns None if no errors were logged or setup_logging() was not called.
- Return type:
str or None
Example
import dbtk import logging dbtk.setup_logging('my_integration') # ... do ETL work ... try: process_data() except Exception as e: logging.error(f"Processing failed: {e}") # Check if errors occurred error_log = dbtk.errors_logged() if error_log: print(f"Errors detected! See: {error_log}") # send_notification_email(subject="Integration errors", attachment=error_log) else: print("Integration completed successfully")
- dbtk.logging_utils.setup_logging(script_name=None, log_dir=None, level=None, split_errors=None, console=None)[source]
Configure logging for integration scripts.
Creates log files with pattern: {script_name}_{datetime}.log Optionally creates separate error log: {script_name}_{datetime}_error.log
- Parameters:
script_name (str | None) – Base name for log files (defaults to script filename without extension)
log_dir (str | None) – Directory for log files (defaults to config setting or ‘./logs’)
level (str | None) – Logging level string - DEBUG, INFO, WARNING, ERROR (defaults to config or ‘INFO’)
split_errors (bool | None) – Create separate error log file (defaults to config or True)
console (bool | None) – Also log to console/stdout (defaults to config or True)
- Returns:
Tuple of (log_file_path, error_log_path or None)
- Return type:
Tuple[str, str | None]
Example
import dbtk # Simple - uses defaults from config dbtk.setup_logging('fire_nation_etl') # Custom settings dbtk.setup_logging('my_script', log_dir='/var/log/etl', level='DEBUG') # Single log file per day (set filename_format in config to '%Y%m%d') dbtk.setup_logging('daily_job') # Single rolling log file (set filename_format in config to '') dbtk.setup_logging('rolling_log')
Note
To customize filename patterns, set ‘logging.filename_format’ in dbtk.yml: - ‘%Y%m%d_%H%M%S’ - One log per run with date and time (default) - ‘%Y%m%d’ - One log per day - ‘’ - Single rolling log file (overwrites)
Command Line Interface
DBTK provides command-line tools for managing encryption keys and configuration files.
# Check dependencies, drivers, and configuration
dbtk checkup
# Interactive configuration setup wizard
dbtk config-setup
# Generate encryption key
dbtk generate-key
# Store encryption key in system keyring
dbtk store-key [key] [--force]
# Encrypt passwords in config file
dbtk encrypt-config [config_file]
# Encrypt a single password
dbtk encrypt-password [password]
# Migrate config to new encryption key
dbtk migrate-config old_file new_file [--new-key KEY]
The CLI is implemented in dbtk.cli module:
Formats
Pre-defined column layouts for common multi-record fixed-width EDI-like formats.
Use with EDIReader or EDIWriter:
from dbtk.formats.edi import ACH_COLUMNS from dbtk.readers.fixed_width import EDIReader from dbtk.writers.fixed_width import EDIWriter
- with open(‘in.ach’) as fp, EDIWriter(‘out.ach’, ACH_COLUMNS) as w:
w.write_batch(EDIReader(fp, ACH_COLUMNS))