ETL: Tools & Logging
Utilities for orchestrating and monitoring production ETL imports: identity resolution, code validation/collection, and structured logging.
IdentityManager
IdentityManager is a lightweight, resumable cache that maps source-system primary keys
to target-system identifiers. It tracks every entity’s resolution status, messages, and
errors across an entire import run, and can persist that state to JSON so multi-stage
pipelines can resume without re-querying resolved records.
The source_key should be the primary key of the source system (i.e. CRM_ID) and should be
present on every record coming from the source system.
The target_key should be the primary key of the target system (i.e. ERP_ID). The resolver may
match additional keys but the entity will not be marked as resolved until a target_key has been found.
The alternate_keys parameter can be used to define additional identifiers either used to aid
identification (i.e. SSN, email, username) or used during processing (i.e. staging_id). When set_id
method is used to manually set an identifier, the id_type must be the value of target_key or
listed in alternate_keys.
from dbtk.etl import IdentityManager, EntityStatus
EntityStatus
Every entity in the cache carries a _status value from these constants:
Status |
Value |
Meaning |
|---|---|---|
|
|
Registered but resolution not yet attempted |
|
|
Successfully matched; |
|
|
Exists in a staging table; not yet confirmed in target |
|
|
Resolution attempted but no match found |
|
|
An error occurred while creating or updating the entity — downstream table processing for this entity should be skipped |
|
|
Resolution intentionally bypassed |
Instantiation
stmt = cursor.prepare_file('sql/resolve_student.sql')
# With a resolver — looks up target_key via SQL on first encounter
im = IdentityManager(
source_key='student_id',
target_key='erp_person_id',
resolver=stmt,
alternate_keys=['banner_id'], # Optional extra keys to track per entity
)
# Without a resolver — tracking/logging mode only
# Use when the target_key is already present in incoming records (e.g. crosswalk file),
# or when you only need per-entity status/error tracking with no DB lookup.
im = IdentityManager('source_id', 'target_id') # no resolver
When source_key == target_key, pass the same string for both — IdentityManager
recognises this and marks entities as RESOLVED immediately without staging them.
resolve()
The primary method. Accepts a scalar, dict, or Record:
# Scalar — treated as the raw source_key value.
# The resolver is called; the caller's record is NOT mutated.
entity = im.resolve(row['student_id'])
# dict or Record — source_key extracted from the mapping.
# On success, target_key is written back into the caller's record.
entity = im.resolve(row)
if entity is None:
pass # source_key not present in value
if entity['_status'] == EntityStatus.RESOLVED:
# entity[target_key] is populated; row[target_key] has been set too
table.set_values(row)
if table.execute('insert'):
entity['_status'] = EntityStatus.ERROR
im.add_error(row['student_id'], table.last_error)
elif entity['_status'] == EntityStatus.NOT_FOUND:
im.add_error(row['student_id'],
ErrorDetail('Student not found', field='student_id'))
Cache behaviour: already-RESOLVED entities are returned from cache without querying the
database again. Non-RESOLVED entities (PENDING, NOT_FOUND, STAGED) are re-attempted on
every resolve() call.
alternate_keys
Track additional identifiers per entity alongside target_key:
im = IdentityManager('crm_id', 'erp_id', resolver=stmt,
alternate_keys=['staging_id', 'legacy_id'])
entity = im.resolve(row)
# entity['banner_id'] is populated if the resolver returns it
# Read or write any tracked key directly
im.set_id(source_id, 'staging_id', 'B00123')
banner = im.get_id(source_id, 'staging_id')
add_error / add_message
Attach structured errors and informational messages to a cached entity:
from dbtk.utils import ErrorDetail
im.add_error(source_id, ErrorDetail('Insert failed', field='name', code='DB_ERROR'))
im.add_message(source_id, 'Mapped via legacy crosswalk')
Both lists (_errors, _messages) are preserved in save_state() / load_state().
batch_resolve()
Re-run the resolver for all PENDING and NOT_FOUND entities. Useful after a staging
table has been populated and you want to retry resolution in bulk.
im.batch_resolve()
# Also retry STAGED entities (e.g. after a staging→target promotion step)
im.batch_resolve(additional_statuses=[EntityStatus.STAGED])
# Retry multiple additional statuses
im.batch_resolve(additional_statuses=[EntityStatus.STAGED, EntityStatus.ERROR])
PENDING and NOT_FOUND are always retried. additional_statuses extends that set —
it does not replace it.
save_state / load_state
Persist the full entity cache to JSON and restore it later:
# At end of run
im.save_state('state/students.json')
# Next run — restore cache, attach a (possibly updated) resolver
im = IdentityManager.load_state('state/students.json', resolver=stmt)
# Retry anything that failed or wasn't found last time
im.batch_resolve()
# Or add STAGED entities to the retry set
im.batch_resolve(additional_statuses=[EntityStatus.STAGED])
The JSON file includes source_key, target_key, alternate_keys, field order (for
factory reconstruction), summary stats, and every entity with its errors and messages.
ErrorDetail objects are serialised/deserialised automatically.
calc_stats()
stats = im.calc_stats()
# {'pending': 0, 'resolved': 142, 'staged': 5, 'error': 3, 'skipped': 0, 'not_found': 11}
Complete examples
Pattern 1 — Standard resolver-based import:
import dbtk
from dbtk.etl import IdentityManager, EntityStatus
from dbtk.utils import ErrorDetail
dbtk.setup_logging()
with dbtk.connect('erp_db') as db:
cursor = db.cursor()
stmt = cursor.prepare_file('sql/resolve_student.sql')
im = IdentityManager('student_id', 'erp_person_id', resolver=stmt,
alternate_keys=['banner_id'])
student_table = dbtk.etl.Table('students', columns_config, cursor)
with dbtk.readers.get_reader('incoming/students.csv.gz') as reader:
for row in reader:
entity = im.resolve(row) # Looks up erp_person_id; writes it into row
if entity['_status'] == EntityStatus.ERROR:
continue # Skip downstream tables
if entity['_status'] != EntityStatus.RESOLVED:
im.add_error(row['student_id'],
ErrorDetail('Not found', field='student_id'))
continue
student_table.set_values(row)
if student_table.execute('insert', raise_error=False):
entity['_status'] = EntityStatus.ERROR
im.add_error(row['student_id'], student_table.last_error)
im.save_state('state/students.json')
print(im.calc_stats())
Pattern 2 — Crosswalk pre-load (no resolver needed):
# crosswalk.csv has both crm_id and erp_id — no DB lookup required
im = IdentityManager('crm_id', 'erp_id') # no resolver
with dbtk.readers.get_reader('crosswalk.csv') as reader:
for row in reader:
im.resolve(row) # Caches entity as RESOLVED (erp_id present) or STAGED
with dbtk.readers.get_reader('data.csv') as reader:
for row in reader:
entity = im.resolve(row) # Cache hit — no DB call
if entity and entity['_status'] == EntityStatus.RESOLVED:
table.set_values(row)
table.execute('insert')
ValidationCollector
A callable that collects and optionally enriches coded values during row-by-row processing. Useful for accumulating all values seen in a field (e.g. title codes, department codes) and validating or enriching them against a reference table.
from dbtk.etl import ValidationCollector
from dbtk.etl.transforms import TableLookup
# Pure collection — no lookup
title_collector = ValidationCollector()
for record in reader:
title_collector(record['tconst']) # Collects the value; returns it unchanged
# With lookup — enrich codes with descriptions on first encounter
genre_lookup = TableLookup(cursor=cur, table='genres', key_cols='code',
return_cols='title', cache=TableLookup.CACHE_PRELOAD)
genre_collector = ValidationCollector(lookup=genre_lookup)
title_cols = {
'tconst': {'field': 'tconst', 'primary_key': True, 'fn': title_collector},
'title_type': {'field': 'titleType', 'nullable': False},
'primary_title': {'field': 'primaryTitle', 'nullable': False},
'start_year': {'field': 'startYear'},
'end_year': {'field': 'endYear'},
'is_adult': {'field': 'isAdult', 'fn': 'bool'},
'runtime_minutes': {'field': 'runtimeMinutes', 'fn': 'int'},
'genre': {'field': 'genre_code', 'fn': genre_collector}
}
titles = Table('titles_subset', columns=title_cols, cursor=cur)
for record in reader:
titles.set_values(record)
# collect_new annotates a newly-seen code with extra fields from the source row.
# Call it right after set_values — the collector's _recently_added flag reflects
# the most recent __call__ and will be cleared on the next row.
genre_collector.collect_new(record.genre_code, title=record.genre_description)
For detailed TableLookup documentation including caching strategies and string shorthand syntax, see Table Lookups and Validation.
Filtering with in operator:
# Collect valid title IDs in first pass
title_collector = ValidationCollector()
for record in titles_reader:
title_collector(record['tconst'])
# Filter a second file to only matching titles
with dbtk.readers.get_reader('title.principals.tsv.gz') as reader:
reader.add_filter(lambda r: r.tconst in title_collector)
for record in reader:
process(record)
Reporting:
# Valid codes (seen in reference table)
mapping = collector.get_valid_mapping() # {code: description}
# New codes (not in reference table)
new_codes = sorted(collector.added) # codes not in reference table
# All codes as a set (useful for polars filtering)
all_codes = collector.get_all()
df = pl.scan_csv('data.tsv').add_filter(pl.col('code').is_in(all_codes))
Logging for Integration Scripts
The problem: Integration scripts need proper logging with timestamped files, separate error logs, and easy cleanup. Setting this up manually is repetitive and error-prone.
The solution: DBTK provides setup_logging() and cleanup_old_logs() to handle the
common pattern of creating timestamped log files like script_name_20251031_154505.log.
import dbtk
import logging
# One-line setup with automatic script name detection
dbtk.setup_logging() # Log settings in config (dbtk.yml), see below
# Or specify name and options
dbtk.setup_logging('fire_nation_etl', log_dir='/var/log/etl', level='DEBUG')
# Now use standard Python logging
logger = logging.getLogger(__name__)
logger.info("Starting ETL process...")
logger.error("Failed to process record")
Configuration options (via dbtk.yml or function parameters):
settings:
logging:
directory: ./logs # Where to write logs
level: INFO # DEBUG, INFO, WARNING, ERROR, CRITICAL
format: '%(asctime)s [%(levelname)s] %(name)s: %(message)s'
timestamp_format: '%Y-%m-%d %H:%M:%S' # For log message timestamps
filename_format: '%Y%m%d_%H%M%S' # For log filenames
split_errors: true # Separate _error.log for errors
console: true # Also output to console
retention_days: 30 # For cleanup_old_logs()
Filename patterns:
# One log per run (default)
# filename_format: '%Y%m%d_%H%M%S'
# Creates: script_20251031_154505.log
# One log per day
# filename_format: '%Y%m%d'
# Creates: script_20251031.log
# Single rolling log file
# filename_format: ''
# Creates: script.log (overwrites each run)
Automatic log cleanup:
# Clean logs older than retention period (default: 30 days)
deleted = dbtk.cleanup_old_logs()
print(f"Deleted {len(deleted)} old log files")
# Custom retention
dbtk.cleanup_old_logs(retention_days=7)
# Dry run to see what would be deleted
would_delete = dbtk.cleanup_old_logs(dry_run=True)
Error detection for notifications:
When running unattended integration scripts, you often want to send notification emails
if errors occurred. The errors_logged() function makes this trivial:
import dbtk
import logging
# Setup logging with split_errors=True (default)
dbtk.setup_logging('fire_nation_etl')
logger = logging.getLogger(__name__)
# ... do your ETL work ...
try:
process_data()
except Exception as e:
logger.error(f"Processing failed: {e}")
# Check if any errors were logged
error_log = dbtk.errors_logged()
if error_log:
print(f"Errors detected! See: {error_log}")
# send_notification_email(subject="ETL errors", attachment=error_log)
else:
print("Integration completed successfully")
How it works:
Returns
Noneif no errors were loggedReturns error log path if
split_errors=True(separate _error.log file)Returns main log path if
split_errors=False(errors in combined log)Automatically tracks ERROR and CRITICAL level messages
Works regardless of logging configuration
What DBTK logs automatically:
DBTK logs all operations without you writing any log statements:
Database connections and queries
File reading operations and errors
Table operations (INSERT/UPDATE/MERGE counts, validation failures)
Data transformation errors
Parameter conversions and SQL generation
You only need to add custom logging for your specific business logic.
Complete integration script example:
#!/usr/bin/env python3
"""Fire Nation intelligence ETL with custom validation logging."""
import dbtk
import logging
# Set up logging - creates dated log files automatically
dbtk.setup_logging()
# Optional: Create logger only if you need custom log messages
logger = logging.getLogger(__name__)
def validate_combat_readiness(soldier_data):
"""Custom business rule - log only your specific logic."""
if soldier_data['missions_completed'] < 5 and soldier_data['rank'] == 'General':
logger.warning(f"General {soldier_data['name']} has insufficient mission experience")
return False
return True
def main():
with dbtk.connect('fire_nation_db') as db:
cursor = db.cursor()
soldier_table = dbtk.etl.Table('soldiers', config, cursor)
with dbtk.readers.get_reader('conscripts.csv') as reader:
for record in reader:
soldier_table.set_values(record)
# Custom validation - log only when YOU need to
if soldier_table.is_ready('insert') and not validate_combat_readiness(record):
continue # Skip this record
soldier_table.execute('insert', raise_error=False)
# ↑ DBTK automatically logs all insert operations, errors, validation failures
# Summary output (or log it if you prefer)
print(f"Processed {soldier_table.counts['insert']} soldiers")
print(f"Skipped {soldier_table.counts['incomplete']} incomplete records")
db.commit()
if __name__ == '__main__':
main()
dbtk.cleanup_old_logs()
# Check if errors occurred (DBTK tracked them automatically)
error_log = dbtk.errors_logged()
if error_log:
print(f"Errors occurred - check {error_log}")
# send_notification_email(subject="ETL Errors", attachment=error_log)
Key takeaway: DBTK does the logging heavy lifting. You only add custom log statements for your specific business logic, not for database operations, file reading, or ETL mechanics.
Benefits:
Automatic setup - Sample config created at
~/.config/dbtk.ymlon first useTimestamped files - Never overwrite important logs
Split error logs - Easy monitoring and alerting
Standard logging - Works with all Python logging features
Configurable - Control via config file or function arguments
See Also
ETL: Table & Transforms - Table configuration, transforms, TableLookup
ETL: DataSurge & BulkSurge - High-performance bulk loading
Database Connections - Connections, cursors, SQL file execution, PreparedStatement
Examples:
examples/data_load_imdb_subset.py—ValidationCollectorandIdentityManagerused together in a complete ETL pipeline
examples/data_load_names.py— profession normalization withTableLookupand array column handling