ETL: Tools & Logging

Utilities for orchestrating and monitoring production ETL imports: identity resolution, code validation/collection, and structured logging.


IdentityManager

IdentityManager is a lightweight, resumable cache that maps source-system primary keys to target-system identifiers. It tracks every entity’s resolution status, messages, and errors across an entire import run, and can persist that state to JSON so multi-stage pipelines can resume without re-querying resolved records.

The source_key should be the primary key of the source system (i.e. CRM_ID) and should be present on every record coming from the source system.

The target_key should be the primary key of the target system (i.e. ERP_ID). The resolver may match additional keys but the entity will not be marked as resolved until a target_key has been found.

The alternate_keys parameter can be used to define additional identifiers either used to aid identification (i.e. SSN, email, username) or used during processing (i.e. staging_id). When set_id method is used to manually set an identifier, the id_type must be the value of target_key or listed in alternate_keys.

from dbtk.etl import IdentityManager, EntityStatus

EntityStatus

Every entity in the cache carries a _status value from these constants:

Status

Value

Meaning

PENDING

"pending"

Registered but resolution not yet attempted

RESOLVED

"resolved"

Successfully matched; target_key is populated

STAGED

"staged"

Exists in a staging table; not yet confirmed in target

NOT_FOUND

"not_found"

Resolution attempted but no match found

ERROR

"error"

An error occurred while creating or updating the entity — downstream table processing for this entity should be skipped

SKIPPED

"skipped"

Resolution intentionally bypassed

Instantiation

stmt = cursor.prepare_file('sql/resolve_student.sql')

# With a resolver — looks up target_key via SQL on first encounter
im = IdentityManager(
    source_key='student_id',
    target_key='erp_person_id',
    resolver=stmt,
    alternate_keys=['banner_id'],   # Optional extra keys to track per entity
)

# Without a resolver — tracking/logging mode only
# Use when the target_key is already present in incoming records (e.g. crosswalk file),
# or when you only need per-entity status/error tracking with no DB lookup.
im = IdentityManager('source_id', 'target_id')   # no resolver

When source_key == target_key, pass the same string for both — IdentityManager recognises this and marks entities as RESOLVED immediately without staging them.

resolve()

The primary method. Accepts a scalar, dict, or Record:

# Scalar — treated as the raw source_key value.
# The resolver is called; the caller's record is NOT mutated.
entity = im.resolve(row['student_id'])

# dict or Record — source_key extracted from the mapping.
# On success, target_key is written back into the caller's record.
entity = im.resolve(row)

if entity is None:
    pass  # source_key not present in value

if entity['_status'] == EntityStatus.RESOLVED:
    # entity[target_key] is populated; row[target_key] has been set too
    table.set_values(row)
    if table.execute('insert'):
        entity['_status'] = EntityStatus.ERROR
        im.add_error(row['student_id'], table.last_error)
elif entity['_status'] == EntityStatus.NOT_FOUND:
    im.add_error(row['student_id'],
                 ErrorDetail('Student not found', field='student_id'))

Cache behaviour: already-RESOLVED entities are returned from cache without querying the database again. Non-RESOLVED entities (PENDING, NOT_FOUND, STAGED) are re-attempted on every resolve() call.

alternate_keys

Track additional identifiers per entity alongside target_key:

im = IdentityManager('crm_id', 'erp_id', resolver=stmt,
                     alternate_keys=['staging_id', 'legacy_id'])

entity = im.resolve(row)
# entity['banner_id'] is populated if the resolver returns it

# Read or write any tracked key directly
im.set_id(source_id, 'staging_id', 'B00123')
banner = im.get_id(source_id, 'staging_id')

add_error / add_message

Attach structured errors and informational messages to a cached entity:

from dbtk.utils import ErrorDetail

im.add_error(source_id, ErrorDetail('Insert failed', field='name', code='DB_ERROR'))
im.add_message(source_id, 'Mapped via legacy crosswalk')

Both lists (_errors, _messages) are preserved in save_state() / load_state().

batch_resolve()

Re-run the resolver for all PENDING and NOT_FOUND entities. Useful after a staging table has been populated and you want to retry resolution in bulk.

im.batch_resolve()

# Also retry STAGED entities (e.g. after a staging→target promotion step)
im.batch_resolve(additional_statuses=[EntityStatus.STAGED])

# Retry multiple additional statuses
im.batch_resolve(additional_statuses=[EntityStatus.STAGED, EntityStatus.ERROR])

PENDING and NOT_FOUND are always retried. additional_statuses extends that set — it does not replace it.

save_state / load_state

Persist the full entity cache to JSON and restore it later:

# At end of run
im.save_state('state/students.json')

# Next run — restore cache, attach a (possibly updated) resolver
im = IdentityManager.load_state('state/students.json', resolver=stmt)

# Retry anything that failed or wasn't found last time
im.batch_resolve()

# Or add STAGED entities to the retry set
im.batch_resolve(additional_statuses=[EntityStatus.STAGED])

The JSON file includes source_key, target_key, alternate_keys, field order (for factory reconstruction), summary stats, and every entity with its errors and messages. ErrorDetail objects are serialised/deserialised automatically.

calc_stats()

stats = im.calc_stats()
# {'pending': 0, 'resolved': 142, 'staged': 5, 'error': 3, 'skipped': 0, 'not_found': 11}

Complete examples

Pattern 1 — Standard resolver-based import:

import dbtk
from dbtk.etl import IdentityManager, EntityStatus
from dbtk.utils import ErrorDetail

dbtk.setup_logging()

with dbtk.connect('erp_db') as db:
    cursor = db.cursor()
    stmt = cursor.prepare_file('sql/resolve_student.sql')

    im = IdentityManager('student_id', 'erp_person_id', resolver=stmt,
                         alternate_keys=['banner_id'])

    student_table = dbtk.etl.Table('students', columns_config, cursor)

    with dbtk.readers.get_reader('incoming/students.csv.gz') as reader:
        for row in reader:
            entity = im.resolve(row)          # Looks up erp_person_id; writes it into row
            if entity['_status'] == EntityStatus.ERROR:
                continue                       # Skip downstream tables
            if entity['_status'] != EntityStatus.RESOLVED:
                im.add_error(row['student_id'],
                             ErrorDetail('Not found', field='student_id'))
                continue

            student_table.set_values(row)
            if student_table.execute('insert', raise_error=False):
                entity['_status'] = EntityStatus.ERROR
                im.add_error(row['student_id'], student_table.last_error)

    im.save_state('state/students.json')
    print(im.calc_stats())

Pattern 2 — Crosswalk pre-load (no resolver needed):

# crosswalk.csv has both crm_id and erp_id — no DB lookup required
im = IdentityManager('crm_id', 'erp_id')   # no resolver

with dbtk.readers.get_reader('crosswalk.csv') as reader:
    for row in reader:
        im.resolve(row)   # Caches entity as RESOLVED (erp_id present) or STAGED

with dbtk.readers.get_reader('data.csv') as reader:
    for row in reader:
        entity = im.resolve(row)   # Cache hit — no DB call
        if entity and entity['_status'] == EntityStatus.RESOLVED:
            table.set_values(row)
            table.execute('insert')

ValidationCollector

A callable that collects and optionally enriches coded values during row-by-row processing. Useful for accumulating all values seen in a field (e.g. title codes, department codes) and validating or enriching them against a reference table.

from dbtk.etl import ValidationCollector
from dbtk.etl.transforms import TableLookup

# Pure collection — no lookup
title_collector = ValidationCollector()
for record in reader:
    title_collector(record['tconst'])   # Collects the value; returns it unchanged

# With lookup — enrich codes with descriptions on first encounter
genre_lookup = TableLookup(cursor=cur, table='genres', key_cols='code',
                           return_cols='title', cache=TableLookup.CACHE_PRELOAD)
genre_collector = ValidationCollector(lookup=genre_lookup)
title_cols = {
        'tconst': {'field': 'tconst', 'primary_key': True, 'fn': title_collector},
        'title_type': {'field': 'titleType', 'nullable': False},
        'primary_title': {'field': 'primaryTitle', 'nullable': False},
        'start_year': {'field': 'startYear'},
        'end_year': {'field': 'endYear'},
        'is_adult': {'field': 'isAdult', 'fn': 'bool'},
        'runtime_minutes': {'field': 'runtimeMinutes', 'fn': 'int'},
        'genre': {'field': 'genre_code', 'fn': genre_collector}
    }
titles = Table('titles_subset', columns=title_cols, cursor=cur)
for record in reader:
    titles.set_values(record)
    # collect_new annotates a newly-seen code with extra fields from the source row.
    # Call it right after set_values — the collector's _recently_added flag reflects
    # the most recent __call__ and will be cleared on the next row.
    genre_collector.collect_new(record.genre_code, title=record.genre_description)

For detailed TableLookup documentation including caching strategies and string shorthand syntax, see Table Lookups and Validation.

Filtering with in operator:

# Collect valid title IDs in first pass
title_collector = ValidationCollector()
for record in titles_reader:
    title_collector(record['tconst'])

# Filter a second file to only matching titles
with dbtk.readers.get_reader('title.principals.tsv.gz') as reader:
    reader.add_filter(lambda r: r.tconst in title_collector)
    for record in reader:
        process(record)

Reporting:

# Valid codes (seen in reference table)
mapping = collector.get_valid_mapping()  # {code: description}

# New codes (not in reference table)
new_codes = sorted(collector.added)    # codes not in reference table

# All codes as a set (useful for polars filtering)
all_codes = collector.get_all()
df = pl.scan_csv('data.tsv').add_filter(pl.col('code').is_in(all_codes))

Logging for Integration Scripts

The problem: Integration scripts need proper logging with timestamped files, separate error logs, and easy cleanup. Setting this up manually is repetitive and error-prone.

The solution: DBTK provides setup_logging() and cleanup_old_logs() to handle the common pattern of creating timestamped log files like script_name_20251031_154505.log.

import dbtk
import logging

# One-line setup with automatic script name detection
dbtk.setup_logging()  # Log settings in config (dbtk.yml), see below

# Or specify name and options
dbtk.setup_logging('fire_nation_etl', log_dir='/var/log/etl', level='DEBUG')

# Now use standard Python logging
logger = logging.getLogger(__name__)
logger.info("Starting ETL process...")
logger.error("Failed to process record")

Configuration options (via dbtk.yml or function parameters):

settings:
  logging:
    directory: ./logs                   # Where to write logs
    level: INFO                          # DEBUG, INFO, WARNING, ERROR, CRITICAL
    format: '%(asctime)s [%(levelname)s] %(name)s: %(message)s'
    timestamp_format: '%Y-%m-%d %H:%M:%S'  # For log message timestamps
    filename_format: '%Y%m%d_%H%M%S'    # For log filenames
    split_errors: true                   # Separate _error.log for errors
    console: true                        # Also output to console
    retention_days: 30                   # For cleanup_old_logs()

Filename patterns:

# One log per run (default)
# filename_format: '%Y%m%d_%H%M%S'
# Creates: script_20251031_154505.log

# One log per day
# filename_format: '%Y%m%d'
# Creates: script_20251031.log

# Single rolling log file
# filename_format: ''
# Creates: script.log (overwrites each run)

Automatic log cleanup:

# Clean logs older than retention period (default: 30 days)
deleted = dbtk.cleanup_old_logs()
print(f"Deleted {len(deleted)} old log files")

# Custom retention
dbtk.cleanup_old_logs(retention_days=7)

# Dry run to see what would be deleted
would_delete = dbtk.cleanup_old_logs(dry_run=True)

Error detection for notifications:

When running unattended integration scripts, you often want to send notification emails if errors occurred. The errors_logged() function makes this trivial:

import dbtk
import logging

# Setup logging with split_errors=True (default)
dbtk.setup_logging('fire_nation_etl')

logger = logging.getLogger(__name__)

# ... do your ETL work ...
try:
    process_data()
except Exception as e:
    logger.error(f"Processing failed: {e}")

# Check if any errors were logged
error_log = dbtk.errors_logged()
if error_log:
    print(f"Errors detected! See: {error_log}")
    # send_notification_email(subject="ETL errors", attachment=error_log)
else:
    print("Integration completed successfully")

How it works:

  • Returns None if no errors were logged

  • Returns error log path if split_errors=True (separate _error.log file)

  • Returns main log path if split_errors=False (errors in combined log)

  • Automatically tracks ERROR and CRITICAL level messages

  • Works regardless of logging configuration

What DBTK logs automatically:

DBTK logs all operations without you writing any log statements:

  • Database connections and queries

  • File reading operations and errors

  • Table operations (INSERT/UPDATE/MERGE counts, validation failures)

  • Data transformation errors

  • Parameter conversions and SQL generation

You only need to add custom logging for your specific business logic.

Complete integration script example:

#!/usr/bin/env python3
"""Fire Nation intelligence ETL with custom validation logging."""

import dbtk
import logging

# Set up logging - creates dated log files automatically
dbtk.setup_logging()

# Optional: Create logger only if you need custom log messages
logger = logging.getLogger(__name__)

def validate_combat_readiness(soldier_data):
    """Custom business rule - log only your specific logic."""
    if soldier_data['missions_completed'] < 5 and soldier_data['rank'] == 'General':
        logger.warning(f"General {soldier_data['name']} has insufficient mission experience")
        return False
    return True

def main():
    with dbtk.connect('fire_nation_db') as db:
        cursor = db.cursor()

        soldier_table = dbtk.etl.Table('soldiers', config, cursor)

        with dbtk.readers.get_reader('conscripts.csv') as reader:
            for record in reader:
                soldier_table.set_values(record)

                # Custom validation - log only when YOU need to
                if soldier_table.is_ready('insert') and not validate_combat_readiness(record):
                    continue  # Skip this record

                soldier_table.execute('insert', raise_error=False)
                # ↑ DBTK automatically logs all insert operations, errors, validation failures

        # Summary output (or log it if you prefer)
        print(f"Processed {soldier_table.counts['insert']} soldiers")
        print(f"Skipped {soldier_table.counts['incomplete']} incomplete records")

        db.commit()

if __name__ == '__main__':
    main()
    dbtk.cleanup_old_logs()

    # Check if errors occurred (DBTK tracked them automatically)
    error_log = dbtk.errors_logged()
    if error_log:
        print(f"Errors occurred - check {error_log}")
        # send_notification_email(subject="ETL Errors", attachment=error_log)

Key takeaway: DBTK does the logging heavy lifting. You only add custom log statements for your specific business logic, not for database operations, file reading, or ETL mechanics.

Benefits:

  • Automatic setup - Sample config created at ~/.config/dbtk.yml on first use

  • Timestamped files - Never overwrite important logs

  • Split error logs - Easy monitoring and alerting

  • Standard logging - Works with all Python logging features

  • Configurable - Control via config file or function arguments


See Also

Examples: