# ETL: Table, Transforms & Lookups
**The problem:** Production ETL pipelines need field mapping, data validation, type conversions, database function integration, and error handling. Building all of this from scratch for each pipeline is time-consuming and error-prone. **The solution:** DBTK's `Table` class provides everything you need for production data pipelines, from simple inserts to complex merge operations with validation and transformation. ## Quick Start Define a `Table`, map your source fields to database columns, and start loading data: ```python import dbtk cursor = dbtk.connect('mydb').cursor() titles_table = dbtk.etl.Table('imdb_titles', { 'tconst': {'field': 'tconst', 'primary_key': True}, 'title_type': {'field': 'titleType', 'fn': 'maxlen:50', 'required': True}, 'primary_title': {'field': 'primaryTitle', 'fn': 'maxlen:500', 'no_update': True}, 'original_title': {'field': 'originalTitle', 'fn': 'maxlen:500'}, 'is_adult': {'field': 'isAdult', 'fn': 'indicator:Y:N'}, 'start_year': {'field': 'startYear', 'fn': 'int'}, 'end_year': {'field': 'endYear', 'fn': 'int'}, 'runtime_minutes': {'field': 'runtimeMinutes', 'db_expr': 'MINUTES_TO_HRS(#)'}, 'first_genre': {'field': 'genres', 'fn': 'split_and_get:0'}, 'all_genres': {'field': 'genres', 'fn': 'str.split:,'}, }, cursor=cursor) with open('title.basics.tsv') as f: reader = dbtk.readers.CSVReader(f, delimiter='\t') for record in reader: titles_table.set_values(record) titles_table.execute('insert') ``` Each column maps a source field to a database column, with optional `fn` for transformation. The `'int'`, `'maxlen:50'`, `'indicator:Y:N'` values are **string shorthand** — see [String Shorthand](#string-shorthand-for-transformations) for the full reference. ## Column Configuration Schema Each database column is configured with a dictionary specifying how to source and transform its value. ```python { 'database_column_name': { # DATA SOURCE 'field': 'source_field_name', # Map from input record field 'default': 'static_value', # Use a static or callable default for all records 'fn': transform_function, # Python function (no parens!) to transform field value or supported string shorthand 'db_expr': 'DATABASE_FUNCTION(#)', # Call database function (e.g., CURRENT_TIMESTAMP, UPPER(#)) # VALIDATION - optional: 'nullable': False, # Column cannot be NULL (anti-alias of required=True) 'required': True, # Column is required (anti-alias of nullable=False) 'primary_key': True, # Mark as primary key (alias: key) 'key': True, # Alias for primary_key # UPDATE CONTROL - optional: 'no_update': True, # Exclude from UPDATE operations (or UPDATE branch of merge) (default: False) } } ``` **Column configuration examples:** ```python columns_config = { # Simple field mapping 'user_id': {'field': 'id', 'primary_key': True}, # Empty dict shorthand - field name matches column name 'first_name': {}, # Equivalent to {'field': 'first_name'} 'last_name': {}, 'email': {}, # Field with transformation 'email_clean': {'field': 'email_address', 'fn': email_clean}, # Field with validation 'full_name': {'field': 'name', 'nullable': False}, # Multiple transformations (compose your own function) 'phone': {'field': 'phone_number', 'fn': lambda x: phone_format(phone_clean(x))}, # same as above but with a pipeline 'phone': {'field': 'phone_number', 'fn': [phone_clean, phone_format]} # Whole record access for multi-field decisions 'vip_status': { 'field': '*', # Asterisk passes entire record to function 'fn': lambda record: 'VIP' if record.get('age', 0) > 65 or record.get('purchases', 0) > 100 else 'Regular' }, # Whole record in pipelines - first function gets record, rest get values 'discount': { 'field': '*', 'fn': [ lambda record: 0.25 if record.get('loyalty_years', 0) > 10 else 0.10, lambda x: round(x, 2) ] }, # Static value for all records 'status': {'default': 'active'}, # Callable default — resolved at set_values() time, not column-definition time. # Useful when the value comes from runtime context (CLI args, job config, etc.) # that isn't available yet when columns are defined. 'user_id': {'default': lambda: conf_vars['user_id']}, 'import_job': {'default': lambda: conf_vars.get('job_id', 'unknown')}, 'import_date': {'db_expr': 'CURRENT_DATE'}, # Database function with parameter (# is placeholder for field value) 'full_name_upper': {'field': 'name', 'db_expr': 'UPPER(#)'}, # Computed value using database function 'age': {'field': 'birthdate', 'db_expr': 'EXTRACT(YEAR FROM AGE(#))'}, # Primary key that never updates (redundant - primary keys never update) 'record_id': {'field': 'id', 'primary_key': True, 'no_update': True}, # Field that inserts but never updates (useful for created_at timestamps) 'created_at': {'db_expr': 'CURRENT_TIMESTAMP', 'no_update': True}, } ``` ## String Shorthand for Transformations **The problem:** Writing transformation functions for Table columns means imports, lambdas, and verbose syntax. **The solution:** DBTK supports **string shorthand** for transformations — just write `'fn': 'int'` and it works. No imports, no lambdas, just clean configuration. ```python import dbtk # OLD WAY - verbose, needs imports from dbtk.etl.transforms import get_int, Lookup table = dbtk.etl.Table('movies', { 'year': {'field': 'startYear', 'fn': lambda x: get_int(x)}, 'title_short': {'field': 'primaryTitle', 'fn': lambda x: str(x or '')[:255]}, 'first_genre': {'field': 'genres', 'fn': lambda x: x.split(',')[0] if x else None}, 'state_abbrev': {'field': 'location', 'fn': Lookup('states', 'name', 'abbrev')}, }, cursor=cur) # String shorthand - clean, no imports needed table = dbtk.etl.Table('movies', { 'year': {'field': 'startYear', 'fn': 'int'}, 'title_short': {'field': 'primaryTitle', 'fn': 'maxlen:255'}, 'first_genre': {'field': 'genres', 'fn': 'split_and_get:0'}, 'state_abbrev': {'field': 'location', 'fn': 'lookup:states:name:abbrev'}, }, cursor=cur) ``` **Supported shorthands:** | Shorthand | Function | Example | |------------------------------------|------------------------|----------------------------------------------------| | `'int'` | Parse integer | `'123'` → `123`, `''` → `None` | | `'float'` | Parse float | `'$1,234.56'` → `1234.56` | | `'bool'` | Parse boolean | `'yes'` → `True` | | `'digits'` | Extract digits only | `'(800) 123-4567'` → `'8001234567'` | | `'number'` | Convert to number | `'$42.35'` → `42.35` | | `'maxlen:n'` | Truncate to n chars | `'maxlen:10'` on `'Avatar Aang'` → `'Avatar Aan'` | | `'split_and_get:0'` | First delimited field | `'action,comedy,drama'` → `'action'` | | `'split_and_get:1:\t'` | Second tab field | `'a\tb\tc'` → `'b'` | | `'nth:0'` | First list element | `['action', 'comedy']` → `'action'` | | `'coalesce'` | First non-empty value | `[None, '', 'first']` → `'first'` | | `'indicator'` | Boolean → Y/None | `True` → `'Y'`, `False` → `None` | | `'indicator:Y:N'` | Custom true/false | `True` → `'Y'`, `False` → `'N'` | | `'indicator:None:Y'` | Inverted indicator | `False` → `'Y'`, `True` → `None` | | `'lookup:...'` | Database lookup | See [Database Lookups](#database-lookups-and-validation) ↓ | | `'validate:...'` | Database validation | See [Database Lookups](#database-lookups-and-validation) ↓ | **Cast-and-call (`type.method`) shorthand:** cast the value to a type then call any method on it. Pass arguments with `:` — prefix integers with `+` or `-` when the method expects an `int`. | Shorthand | Equivalent | |--------------------------------|--------------------------------------------| | `'str.lower'` | `str(val).lower()` | | `'str.upper'` | `str(val).upper()` | | `'str.strip'` | `str(val).strip()` | | `'str.strip:="'` | `str(val).strip('="')` | | `'str.lstrip:0'` | `str(val).lstrip('0')` | | `'str.split:,'` | `str(val).split(',')` | | `'str.rjust:+9:0'` | `str(val).rjust(9, '0')` | | `'str.ljust:+10: '` | `str(val).ljust(10, ' ')` | | `'datetime.strftime:%Y-%m-%d'` | `parse_datetime(val).strftime('%Y-%m-%d')` | | `'int.to_bytes:+4:big'` | `int(val).to_bytes(4, 'big')` | | `'datetime.isoformat'` | `parse_datetime(val).isoformat()` | | `'float.hex'` | `float(val).hex()` | Supported cast types: `int`, `float`, `str`, `datetime`. **Chaining transformations:** ```python # Works in lists - functions are applied in order table = dbtk.etl.Table('users', { 'username': {'field': 'email', 'fn': ['str.lower', 'str.strip', 'maxlen:50']}, 'is_admin': {'field': 'role', 'fn': ['str.upper', 'indicator:Y:N']}, }, cursor=cursor) ``` ## Data Transformations Built-in transformation functions handle common data cleaning tasks: ```python from dbtk.etl import transforms as tx # Date and time parsing with flexible formats tx.parse_date("20 May 2025") tx.parse_datetime("2025-05-20T18:13:27Z") # With timezone support # International phone number handling (requires phonenumbers library) tx.phone_clean("5551234567") # -> "(555) 123-4567" tx.phone_validate("+1-800-AVATAR") # Validation # For advanced phone operations, import from the submodule from dbtk.etl.transforms.phone import phone_format, phone_get_type, PhoneFormat phone_format("+44 20 7946 0958", PhoneFormat.NATIONAL) # UK format phone_get_type("+1-800-CABBAGES") # -> "toll_free" # Email validation and cleaning tx.email_validate("guru.pathik@eastern.air.temple") # -> True tx.email_clean(" TOPH@BEIFONG.EARTHKINGDOM ") # -> "toph@beifong.earthkingdom" # Utility functions tx.coalesce([None, "", "Jasmine Tea", "Ginseng Tea"]) # -> "Jasmine Tea" tx.indicator("Firebender", true_val="Fire Nation Citizen") # Conditional values tx.get_int("123.45 gold pieces") # -> 123 ``` ### Transform Functions Reference Complete list of built-in transform functions in `dbtk.etl.transforms`: #### Type Conversions | Function | Description | Example | Returns | |----------------------------------|------------------------------------------|--------------------------------|---------| | `get_int(value, default=None)` | Parse integer, return default if invalid | `get_int('123')` | `123` | | | | `get_int('abc', 0)` | `0` | | `get_float(value, default=None)` | Parse float, return default if invalid | `get_float('12.34')` | `12.34` | | | | `get_float('$1,234.56')` | `1234.56` | | `get_bool(value)` | Parse boolean from various formats | `get_bool('yes')` | `True` | | | | `get_bool('0')` | `False` | | `get_digits(value)` | Extract digits only | `get_digits('(555) 123-4567')` | `'5551234567'` | | `to_number(value)` | Auto-detect int/float | `to_number('42')` | `42` (int) | | | | `to_number('3.14')` | `3.14` (float) | #### String Operations | Function | Description | Example | Returns | |-------------------------------|--------------------------|------------------------------------|---------| | `capitalize(value)` | Capitalize first letter | `capitalize('john')` | `'John'` | | `normalize_whitespace(value)` | Collapse multiple spaces | `normalize_whitespace('a b c')` | `'a b c'` | #### Date and Time | Function | Description | Example | Notes | |--------------------------|--------------------------------------|------------------------------------------|-----------------------------| | `parse_date(value)` | Parse date from various formats | `parse_date('2024-01-15')` | Auto-detects format | | `parse_datetime(value)` | Parse datetime with timezone support | `parse_datetime('2024-01-15T10:30:00Z')` | Returns `datetime.datetime` | #### Email | Function | Description | Example | Returns | |-------------------------|----------------------------|--------------------------------------|-----------------------| | `email_validate(value)` | Validate email format | `email_validate('user@example.com')` | `True` | | | | `email_validate('invalid')` | `False` | | `email_clean(value)` | Clean and lowercase email | `email_clean(' USER@EXAMPLE.COM ')` | `'user@example.com'` | #### Phone Numbers Requires `phonenumbers` library: `pip install phonenumbers` | Function | Description | Example | Notes | |----------------------------------------------------|----------------------------|-----------------------------------------------------|--------------------| | `phone_validate(value, country='US')` | Validate phone number | `phone_validate('555-1234')` | Returns bool | | `phone_clean(value, country='US')` | Clean and format phone | `phone_clean('5551234567')` | `'(555) 123-4567'` | | `phone_format(value, format=PhoneFormat.NATIONAL)` | Format with specific style | `phone_format('+1-555-123-4567', PhoneFormat.E164)` | `'+15551234567'` | | `phone_get_type(value)` | Get phone type | `phone_get_type('+1-800-555-0100')` | `'toll_free'` | **PhoneFormat options:** `E164`, `INTERNATIONAL`, `NATIONAL`, `RFC3966` #### Address Validation Requires `usaddress` library: `pip install usaddress` | Function | Description | Example | Notes | |-------------------------------|-----------------------------|--------------------------------------------|-----------------| | `validate_us_address(value)` | Validate US address format | `validate_us_address('123 Main St')` | Returns bool | | `standardize_address(value)` | Standardize address format | `standardize_address('123 main street')` | `'123 Main St'` | #### Utilities | Function | Description | Example | Returns | |--------------------------------------------------|------------------------------------------------|----------------------------------------------|-----------------| | `coalesce(*values)` | Return first non-None value | `coalesce(None, '', 'first', 'second')` | `'first'` | | `indicator(value, true_val='Y', false_val=None)` | Boolean to indicator | `indicator(True)` | `'Y'` | | | | `indicator(False)` | `None` | | `format_digits(value, pattern)` | Extract digits and format according to pattern | `format_digits('123456789', '###-##-####')` | `'123-45-6789'` | | `split_and_get(val, index, delimiter=',')` | Split string and get nth item | `split_and_get('a,b,c', 1)` | `'b'` | #### Using in Table Definitions ```python from dbtk.etl import Table, transforms as tx # Direct function reference table = Table('users', { 'age': {'field': 'age_str', 'fn': tx.get_int}, 'email': {'field': 'email_raw', 'fn': tx.email_clean}, 'amount': {'field': 'price', 'fn': tx.get_float}, }, cursor=cursor) # Custom function combining transforms def clean_phone(value): digits = tx.get_digits(value) return tx.phone_format(digits) if digits else None table = Table('contacts', { 'phone': {'field': 'phone_raw', 'fn': clean_phone}, }, cursor=cursor) # Transform pipelines (executed in order) table = Table('users', { 'username': {'field': 'email', 'fn': [tx.email_clean, lambda x: x.split('@')[0]]}, }, cursor=cursor) ``` **Custom transformations:** Create your own transformation functions for domain-specific logic: ```python def standardize_nation(val): nation_map = { 'Fire Nation Colonies': 'Earth Kingdom', 'Foggy Swamp Tribe': 'Earth Kingdom', 'Kyoshi Warriors': 'Earth Kingdom' } return nation_map.get(val, val) four_nations_census = dbtk.etl.Table('population_registry', { 'nation': {'field': 'home_nation', 'fn': standardize_nation}, # ... other fields }) ``` ## Database Lookups and Validation **The power move:** `TableLookup` transforms any database table into a reusable lookup function with intelligent caching. Use it directly or via string shorthand for zero-boilerplate data enrichment and validation. `TableLookup` uses `PreparedStatement` internally, so queries are portable across databases — write the lookup once, run it anywhere. Use the high-level `Lookup()` and `Validate()` factories directly in your Table column definitions to resolve codes, enrich records, or enforce referential integrity with almost no code. ```python import dbtk from dbtk.etl.transforms import TableLookup, Lookup, Validate db = dbtk.connect('states_db') cur = db.cursor() # TableLookup requires an active cursor state_lookup = TableLookup(cursor=cur, table='states', key_cols='state', return_cols='abbrev', cache=TableLookup.CACHE_PRELOAD) state_lookup({'state': 'Pennsylvania'}) # -> 'PA' # Multiple return_cols - return type matches cursor type (Record, dict, namedtuple, list) state_details = TableLookup(cursor=cur, table='states', key_cols='code', return_cols=['state', 'capital', 'region']) state_details({'code': 'CA'}) # -> Record('California', 'Sacramento', 'West') # String shorthand makes lookups clean customer_etl = dbtk.etl.Table('customers', { # Enrich with state data 'state_code': {'field': 'state_name', 'fn': 'lookup:states:name:code'}, 'state_capital': {'field': 'state_name', 'fn': 'lookup:states:name:capital'}, 'state_region': {'field': 'state_name', 'fn': 'lookup:states:name:region'}, # Validate against reference tables 'country': {'field': 'country_name', 'fn': 'validate:countries:name'}, # Warns if invalid 'industry': {'field': 'industry_code', 'fn': 'validate:industries:code'}, # Multiple keys and caching strategies 'product_name': {'field': ['vendor_id', 'sku'], 'fn': 'lookup:products:vendor_id,sku:name:preload'}, }, cursor=cur) # OLD WAY (still supported): customer_etl = dbtk.etl.Table('customers', { 'state_code': {'field': 'state_name', 'fn': Lookup('states', 'name', 'code')}, 'country': {'field': 'country_name', 'fn': Validate('countries', 'name')}, }, cursor=cur) ``` **Lookup/Validate string syntax:** ``` # Lookup syntax 'lookup:table:key_col:return_col[:cache]' Examples: 'lookup:states:name:code' # Basic lookup 'lookup:states:name:code:preload' # With preloading (small tables) 'lookup:states:name:code:lazy' # Lazy caching (default) 'lookup:states:name:code:no_cache' # No caching (large tables) 'lookup:products:id,sku:name' # Multiple key columns (comma-separated) # Validate syntax 'validate:table:key_col[:cache]' Examples: 'validate:countries:country_code' # Basic validation 'validate:regions:name:preload' # With preloading 'validate:users:email,dept:no_cache' # Multiple keys, no caching ``` **Caching strategies:** - **`preload`** (CACHE_PRELOAD): Load entire table into memory upfront. Best for small lookup tables (<10k rows) - **`lazy`** (CACHE_LAZY): Cache results as encountered. Best for medium tables or selective lookups - **`no_cache`** (CACHE_NONE): Query database every time. Best for large tables or frequently changing data ```python # Practical example: Customer data enrichment orders_etl = dbtk.etl.Table('orders', { 'order_id': {'field': 'id', 'primary_key': True}, # Small table - preload it 'state_name': {'field': 'state_code', 'fn': 'lookup:states:code:name:preload'}, # Medium table - lazy cache 'customer_name': {'field': 'customer_id', 'fn': 'lookup:customers:id:name:lazy'}, # Large table - no cache 'product_desc': {'field': 'product_id', 'fn': 'lookup:products:id:description:no_cache'}, # Validate referential integrity 'category': {'field': 'category_code', 'fn': 'validate:categories:code:preload'}, }, cursor=cursor) # Missing lookup keys raise clear errors immediately: # ValueError: TableLookup for 'states' missing required keys: ['code']. Provided keys: ['state'] ``` ### Complex Lookups with QueryLookup When a lookup requires a join, subquery, or any SQL the `'lookup:...'` shorthand can't express, use `QueryLookup` — a deferred wrapper around `PreparedStatement` that plugs directly into the `fn` pipeline. Like `TableLookup`, cursor binding is deferred until the `Table` is initialized. ```python from dbtk.etl import QueryLookup # Multi-table join — impossible with the string shorthand person_sql = """ SELECT p.person_id FROM people p LEFT JOIN employees e ON e.person_id = p.person_id WHERE p.email = :email OR e.tax_id = :tax_id """ etl_table = dbtk.etl.Table('payroll', { # field='*' passes the entire source row as bind vars; # PreparedStatement uses only the params its SQL declares. 'person_id': { 'field': '*', 'fn': QueryLookup(query=person_sql, return_col='person_id') }, ... }, cursor=cur) ``` **Parameters:** | Parameter | Description | |-----------|-------------| | `query` | Inline SQL string | | `filename` | Path to a SQL file | | `return_col` | Column to extract from the result row. Omit to return the first column. Use `'*'` to pass the whole row to the next pipeline step. | | `missing` | Value returned when the query finds no row (default `None`) | **Return value rules:** - `return_col` omitted — returns `row[0]` (first column, the common single-column case) - `return_col='col_name'` — returns `row['col_name']` by name - `return_col='*'` — returns the full row object (when a subsequent pipeline step will extract fields) **Using a SQL file:** ```python {'field': '*', 'fn': QueryLookup(filename='sql/region_lookup.sql', return_col='region_code')} ``` **Chaining with other transforms:** ```python # QueryLookup returns a code; 'str.upper' normalizes it 'fn': [QueryLookup(filename='sql/region_lookup.sql', return_col='region_code'), 'str.upper'] ``` > **Note:** `QueryLookup` does not cache results. For simple single-table lookups, `TableLookup` / > `Lookup()` / `'lookup:...'` offer lazy and preload caching. ## Value Resolution Process For each column, `set_values()` processes data in this order: ### 1. Value Sourcing - **field**: Extract from source record. - If `field` is a list, value will also be a list. - If `field` is `'*'`, the entire record is passed to the transformation function instead of extracting a specific field. ### 2. Null Conversion The value matches any entries in table.null_values it will be set to `None`. This is configurable but the default is: `('', 'NULL', '', '\\N')` ### 3. Default Fallback If value is `None` or `''`, apply **default** if defined. If `default` is callable, it is called with no arguments at this point — the return value is used. This lets you bind a column to a runtime value (CLI arg, job context, etc.) without knowing it at column-definition time: ```python conf_vars = {} # create before column defs, populate later columns = { 'user_id': {'default': lambda: conf_vars['user_id']}, 'name': {'field': 'name'}, } table = Table('my_table', columns, cursor=cursor) # Populate conf_vars after arg parsing, before processing records conf_vars['user_id'] = args.user_id ``` ### 4. Transformation Apply **fn** if defined. Functions can: - Transform existing values - Generate new values from scratch - If `fn` is a list, execute in order (pipeline). - See [Provided Data Transformations](#data-transformations) and [String Shorthand](#string-shorthand-for-transformations) sections above. ### 5. Database Expression If **db_expr** is defined: - **With `#`**: Pass value from steps 1-4 as parameter Example: `{'field': 'name', 'db_expr': 'UPPER(#)'}` - **Without `#`**: Standalone function (ignores steps 1-4) Example: `{'db_expr': 'CURRENT_TIMESTAMP'}` **Key features:** - Field mapping and renaming - Data type transformations - Database function integration (`db_expr` lets you leverage database capabilities) - Required field validation with clear error messages - Primary key management - Automatic UPDATE exclusions - Support for INSERT, UPDATE, DELETE, MERGE operations - Incomplete record tracking with `counts['incomplete']` - `fetch()` method to retrieve existing record by primary key after `set_values()` - `last_error` attribute — set to an `ErrorDetail` on `DatabaseError` (when `raise_error=False`), `None` on success; useful for feeding errors directly into `IdentityManager.add_error()` ## Handling Incomplete Records DBTK supports three patterns for handling incomplete records: ```python # Pattern 1: Tables expected to be complete - let execute() handle validation # Use this when you want to track all incomplete records for record in records: soldier_table.set_values(record) # execute() automatically validates keys and required columns have values soldier_table.execute('insert', raise_error=False) # Track incomplete, don't raise print(f"Inserted: {soldier_table.counts['insert']}") print(f"Skipped (incomplete data): {soldier_table.counts['incomplete']}") # Pattern 2: "Optional" tables, check requirements before executing DML # Use this when missing data is expected and you want to skip incomplete records. # If you call execute(raise_error=False) with many incomplete records you will flood # your logs. for record in records: recruit_table.set_values(record) if recruit_table.is_ready('insert'): # Fast cached check recruit_table.execute('insert') # Records with missing data are silently skipped. print(f"Inserted: {recruit_table.counts['insert']}") # Pattern 3: Strict mode - raise errors on incomplete data # Use this when all data must be complete for record in records: critical_table.set_values(record) critical_table.execute('insert', raise_error=True) # Raises ValueError if incomplete ``` ### Readiness Checking Methods Table provides three methods for checking if a record is ready for execution: **`is_ready(operation)` → bool** - Fast O(1) cached readiness check (RECOMMENDED) ```python # ✅ Use this in loops and hot paths table.set_values(record) if table.is_ready('insert'): table.execute('insert') # Conditional operations based on data completeness if table.is_ready('update'): table.execute('update') elif table.is_ready('insert'): table.execute('insert') ``` - Returns True if all required columns have values for the operation - Calculated automatically every time `set_values()` is called - **Use this for performance-critical code** If you directly modify table.values, you must recalculate requirements **`refresh_readiness()`** Recalculates requirements and caches results **`reqs_met(operation)`** Recalculates requirements no caching ```python table.set_values(record) # reqs_met calculates requirements but does not cache results if table.reqs_met('insert'): table.execute('insert') else: table.values['status'] = 'active' # Direct modification table.refresh_readiness() # Recalculates requirements and caches results if table.is_ready('insert'): table.execute('insert') ``` **`reqs_missing(operation)` → Set[str]** - Get missing column names ```python # Useful for error reporting and diagnostics table.set_values(record) if not table.is_ready('insert'): missing = table.reqs_missing('insert') logger.warning(f"Cannot insert record {record.id}: missing {missing}") # Output: Cannot insert record 123: missing {'email', 'status'} # Debug incomplete records for record in reader: table.set_values(record) missing = table.reqs_missing('insert') if missing: print(f"Row {record._row_num}: missing required fields: {missing}") ``` - Returns set of column names that are missing values - Empty set means record is ready - Perfect for error messages and validation reporting **Complete example — update-or-insert with readiness checking:** ```python import dbtk from dbtk.etl import transforms cursor = dbtk.connect('intel_prod').cursor() phoenix_king_army = dbtk.etl.Table('fire_nation_soldiers', { 'soldier_id': {'field': 'recruit_number', 'primary_key': True}, 'name': {'field': 'full_name', 'nullable': False}, 'home_village': {'field': 'birthplace', 'nullable': False}, 'firebending_skill': {'field': 'flame_control_level', 'fn': 'int'}, 'enlistment_date': {'field': 'joined_army', 'fn': 'date'}, 'combat_name': {'field': 'full_name', 'db_expr': 'generate_fire_nation_callsign(#)'}, 'last_drill': {'db_expr': 'CURRENT_TIMESTAMP'}, 'conscription_source': {'default': 'Sozin Recruitment Drive'}}, cursor=cursor) with dbtk.readers.get_reader('fire_nation_conscripts.csv') as reader: for recruit in reader: phoenix_king_army.set_values(recruit) if phoenix_king_army.is_ready('insert'): existing_soldier = phoenix_king_army.fetch() if existing_soldier: phoenix_king_army.execute('update') else: phoenix_king_army.execute('insert') else: missing = phoenix_king_army.reqs_missing('insert') print(f"Recruit {phoenix_king_army.values['name']} rejected: missing {missing}") print(f"Processed {phoenix_king_army.counts['insert'] + phoenix_king_army.counts['update']} soldiers") ``` ### Fetching Existing Records The `fetch()` method retrieves an existing record from the database using the primary key values from the current `table.values`. This is essential for update-or-insert logic. ```python # Standard update-or-insert pattern for record in reader: table.set_values(record) existing = table.fetch() # SELECT using primary key if existing: # Record exists - update it table.execute('update') else: # Record doesn't exist - insert it table.execute('insert') # Check specific fields before deciding for record in reader: table.set_values(record) existing = table.fetch() if existing and existing.status == 'archived': logger.info(f"Skipping archived record: {existing.id}") continue elif existing: table.execute('update') else: table.execute('insert') ``` **Key behaviors:** - Returns a Record object if found, `None` if not found - Uses primary key columns from `table.values` for the SELECT - Raises `ValueError` if primary key values are missing or None - Executes immediately (not batched) - Returns the Record with all columns from the database **Common pattern with IdentityManager:** ```python from dbtk.etl import IdentityManager, EntityStatus im = IdentityManager('source_id', 'target_id', resolver=stmt) for record in reader: entity = im.resolve(record) # Gets target_id if entity['_status'] != EntityStatus.RESOLVED: continue table.set_values(record) # Now has target_id populated if table.fetch(): # Check if exists in database table.execute('update') else: table.execute('insert') ``` ### Error Tracking with last_error The `last_error` attribute is automatically set when database operations fail with `raise_error=False`. It contains an `ErrorDetail` object for structured error handling. ```python from dbtk.utils import ErrorDetail # Pattern 1: Simple error tracking for record in reader: table.set_values(record) if table.execute('insert', raise_error=False): # Returns 1 on error print(f"Insert failed: {table.last_error}") # Output: Insert failed: ErrorDetail(message='duplicate key', field='email', ...) # Pattern 2: Feed errors to IdentityManager from dbtk.etl import IdentityManager, EntityStatus im = IdentityManager('student_id', 'person_id', resolver=stmt) for record in reader: entity = im.resolve(record) if entity['_status'] != EntityStatus.RESOLVED: continue table.set_values(record) if table.execute('insert', raise_error=False): # On error: mark entity as ERROR and attach error details entity['_status'] = EntityStatus.ERROR im.add_error(record['student_id'], table.last_error) # Pattern 3: Collect errors for reporting errors = [] for record in reader: table.set_values(record) if table.execute('insert', raise_error=False): errors.append({ 'row': record._row_num, 'id': record.id, 'error': table.last_error }) # Generate error report for err in errors: print(f"Row {err['row']} (ID: {err['id']}): {err['error'].message}") ``` **ErrorDetail attributes:** - `message` - Error description - `field` - Column/field that caused the error (if applicable) - `code` - Error code (database-specific) - `value` - The value that caused the error (if applicable) **Key behaviors:** - Set to `ErrorDetail` object on DatabaseError - Set to `None` on successful execution - Only populated when `raise_error=False` - Preserved until next `execute()` call - Perfect for integration with IdentityManager error tracking ## See Also - [Database Connections](03-database-connections.md) - SQL file execution and PreparedStatement - [ETL: DataSurge & BulkSurge](08-datasurge.md) - High-performance bulk loading - [ETL: Tools & Logging](09-etl-tools.md) - IdentityManager, ValidationCollector, logging - [Record Objects](04-record.md) - DBTK's universal data structure