# dbtk/etl/table.py
"""
Schema-aware table operations and SQL generation.
Provides the Table class which manages table metadata and generates
parameterized SQL statements for common operations.
"""
import logging
from typing import Union, Tuple, Optional, Set, Dict, Any
from ..cursors import Cursor
from ..database import ParamStyle
from ..utils import wrap_at_comma, process_sql_parameters, validate_identifier, quote_identifier, sanitize_identifier, RecordLike, ErrorDetail
from .transforms.core import fn_resolver
from .transforms.database import _DeferredTransform
logger = logging.getLogger(__name__)
DB_CONSTANTS = frozenset(['sysdate', 'systimestamp', 'user', 'current_timestamp', 'current_date', 'current_time'])
[docs]
class Table:
"""
Stateful table class for ETL operations with schema-aware SQL generation.
The Table class provides a high-level interface for database operations by maintaining
table metadata and current record state. It automatically generates parameterized SQL
statements and handles field mapping, data transformations, and requirement validation.
Key features:
- Field mapping: Map source record fields to database columns
- Transformations: Apply functions to clean/transform data before database operations
- Database functions: Use database-side functions (e.g., CURRENT_TIMESTAMP)
- Default values: Provide constant values for columns
- Requirement validation: Track required/nullable columns and validate before operations
- Automatic SQL generation: Generate INSERT, UPDATE, SELECT, DELETE, MERGE statements
- Operation tracking: Count successful operations and incomplete records via self.counts
Column Configuration
--------------------
Each column in the columns dict is configured with a dict containing.
**Shorthand:** An empty dict ``{}`` defaults the field name to the column name.
For example: ``'email': {}`` is equivalent to ``'email': {'field': 'email'}``.
* **field** (str or list of str or '*'):
Source field name(s) from input records. If list, extracts multiple fields
as a list value. If '*', passes the entire record to the transformation
function instead of a single field value. If omitted, column is populated
via 'default' or 'db_expr'.
* **default** (any or callable, optional):
Default value for this column. Applied when the source field is missing,
empty, or None. If callable (e.g. a zero-argument lambda), it is called
at ``set_values()`` time so the value is resolved on each record rather
than at column-definition time. Useful for values that come from runtime
context (CLI args, job IDs, etc.) that aren't available when columns are
defined::
conf_vars = {} # populated later after table is initialized
columns = {
'user_id': {'default': lambda: conf_vars['user_id']},
}
* **fn** (Union[callable, List[callable], str], optional):
Transformation function(s) to apply to the source value.
callable: applied directly; list: functions applied in order (pipeline);
str: magic shorthand (e.g. 'int', 'maxlen:255', 'lookup:table:col:val').
See ``dbtk.etl.transforms.core.fn_resolver()`` for full shorthand reference.
* **db_expr** (str, optional):
Database-side function call (e.g., 'CURRENT_TIMESTAMP', 'UPPER(#)').
Use '#' as placeholder for the bind parameter. If specified without '#',
no bind parameter is created (useful for CURRENT_TIMESTAMP, etc.).
* **primary_key** (bool, optional, default False):
Marks column as primary key. Automatically implies ``required=True``.
Alias: ``key``.
* **key** (bool, optional, default False):
Alias for ``primary_key``. Either may be used interchangeably.
* **auto_key** (bool, optional, default False):
Convenience flag: sets both ``primary_key=True`` and ``auto_gen=True``.
Ideal for typical auto-increment primary keys.
* **nullable** (bool, optional, default True):
Controls whether the column must have a value for INSERT/UPDATE/MERGE.
``nullable=False`` is the **anti-alias** of ``required=True`` — both
mark the column as required.
* **required** (bool, optional, default False):
Explicitly marks column as required. Anti-alias of ``nullable=False``.
* **auto_gen** (bool, optional, default False):
If True, the column is omitted from INSERT statements.
The database is expected to provide the value (e.g. AUTO_INCREMENT,
DEFAULT CURRENT_TIMESTAMP, GENERATED ALWAYS, etc.).
The column remains fully included in all other operations.
* **no_update** (bool, optional, default False):
If True, excludes column from UPDATE and MERGE operations.
* **bind_name** (str, auto-generated):
Sanitized parameter name for SQL bind variables. Automatically created from
column name. Can not be specified in the column definition.
Example
-------
::
import dbtk
from dbtk.etl import Table
with dbtk.connect('fire_nation_db') as db:
cursor = db.cursor()
soldiers = Table('fire_nation_army', {
# Primary key from source 'recruit_id' field
'soldier_id': {
'field': 'recruit_id',
'primary_key': True
},
# Required field with transformation
'enlistment_date': {
'field': 'join_date',
'fn': 'date',
'nullable': False
},
# Optional field with chained transformations
'firebending_level': {
'field': 'flame_skill',
'fn': [str.strip, 'int'] # Clean then convert
},
# Constant value for all records
'status': {
'default': 'active'
},
# Database-side function with parameter
'combat_name': {
'field': 'full_name',
'db_expr': 'generate_callsign(#)'
},
# Database-side function, no parameter
'created_at': {
'db_expr': 'CURRENT_TIMESTAMP'
},
# Multiple source fields as list
'contact_methods': {
'field': ['email', 'phone', 'pigeon']
},
# Empty dict shorthand - field name matches column name
'rank': {}, # Equivalent to {'field': 'rank'}
'division': {},
# Whole record access for multi-field decisions
'vip_status': {
'field': '*',
'fn': lambda record: 'VIP' if record.get('years_service', 0) > 10 else 'Regular'
}
}, cursor=cursor)
# Set values from source record
soldiers.set_values({
'recruit_id': 'FN001',
'join_date': '2024-03-15',
'flame_skill': ' 7 ',
'full_name': 'Zuko'
})
# Execute operations
soldiers.execute('insert') # Automatically validates requirements
print(soldiers.counts) # {'insert': 1, 'update': 0, ...}
Attributes
----------
values : dict
Current record values keyed by bind name (populated by :meth:`set_values`).
counts : dict
Operation counters with keys: ``insert``, ``update``, ``delete``,
``select``, ``merge``, ``records``, ``incomplete``.
last_error : ErrorDetail or None
The error detail from the most recent :meth:`execute` call.
Set to ``None`` on success, or an :class:`dbtk.utils.ErrorDetail`
on ``DatabaseError`` (when ``raise_error=False``). Cleared on
every successful execution and on :meth:`cursor` reassignment.
"""
OPERATIONS = ('insert', 'select', 'update', 'delete', 'merge')
[docs]
def __init__(
self,
name: str,
columns: Dict[str, Dict[str, Any]],
cursor: Cursor,
null_values: Tuple[str, ...] = ('', 'NULL', '<null>', r'\N'),
is_temp: bool = False,
):
"""
Initialize Table with schema configuration and database cursor.
Creates a Table instance that manages the mapping between source data fields
and database columns, along with all metadata needed for SQL generation and
data validation.
Args:
name: Database table name. Must be a valid SQL identifier.
columns: Dictionary mapping database column names to their configuration.
Each column is configured with a dict containing options like 'field',
'fn', 'default', 'db_expr', 'primary_key', 'nullable', etc.
See class docstring for complete column configuration options.
cursor: Database cursor instance. Provides connection to database and
determines SQL parameter style (qmark, named, format, pyformat, numeric).
null_values: Tuple of string values that should be treated as NULL.
When set_values() encounters these strings, they are converted to None.
Default: ('', 'NULL', '<null>', '\\N')
is_temp: If True, allows underscore or hash prefix for temporary table names.
Default: False
Raises:
ValueError: If table name or column names are invalid SQL identifiers.
Example
-------
::
cursor = db.cursor()
table = Table('users', {
'user_id': {'field': 'id', 'primary_key': True},
'email': {'field': 'email_address', 'nullable': False},
'created': {'db_expr': 'CURRENT_TIMESTAMP'}
}, cursor=cursor)
"""
validate_identifier(name, allow_temp=is_temp)
self._name = name
self._cursor = cursor
self._paramstyle = cursor.connection.driver.paramstyle
validated_columns = {}
req_cols = []
key_cols = []
gen_cols = []
for col, col_def in columns.items():
# Empty dict shorthand: default field to column name
if col_def == {}:
col_def['field'] = col
validate_identifier(col)
if col_def.get('auto_key'):
col_def['primary_key'] = True
col_def['auto_gen'] = True
if col_def.get('primary_key'):
col_def['key'] = True
bind_name = sanitize_identifier(col)
col_def['bind_name'] = bind_name
if col_def.get('key'):
key_cols.append(bind_name)
if col_def.get('key') or bool(col_def.get('nullable', True)) is False or col_def.get('required'):
req_cols.append(bind_name)
if col_def.get('auto_gen'):
gen_cols.append(bind_name)
validated_columns[col] = col_def
self._columns = validated_columns
self.null_values = tuple(null_values)
self._req_cols = tuple(req_cols)
self._key_cols = tuple(key_cols)
self._gen_cols = tuple(gen_cols)
self._bind_name_map = {col_def['bind_name']: col for col, col_def in columns.items()}
self._sql_statements: Dict[str, Optional[str]] = {op: None for op in self.OPERATIONS}
self._param_config: Dict[str, Tuple[str, ...]] = {op: () for op in self.OPERATIONS}
self.counts: Dict[str, int] = {op: 0 for op in self.OPERATIONS}
self.counts['records'] = 0
self.counts['incomplete'] = 0
self._record_fields = set()
self._update_excludes: Set[str] = {
col_def['bind_name']
for col_def in validated_columns.values()
if col_def.get('no_update')
}
self._update_excludes_calculated = False
self.values: Dict[str, Any] = {}
self._ops_ready: int = 0
self._generate_sql('insert')
for col_name, col_def in self._columns.items():
fn = col_def.get('fn')
if fn is None:
continue
# Convert strings to transform functions
if isinstance(fn, str):
try:
col_def['fn'] = fn_resolver(fn)
except ValueError as e:
logger.debug(f"Column {col_name}: {e}")
continue
elif isinstance(fn, (list, tuple)):
# Process list/tuple of transforms
pipeline = []
for f in fn:
if isinstance(f, str):
try:
pipeline.append(fn_resolver(f))
except ValueError as e:
logger.debug(f"Column {col_name}: {e}")
continue
else:
pipeline.append(f)
col_def['fn'] = pipeline
# Bind any deferred transforms (lookup/validate) to cursor
new_fn = col_def['fn']
if callable(getattr(new_fn, 'bind', None)):
col_def['fn'] = new_fn.bind(self._cursor)
elif isinstance(new_fn, (list, tuple)):
pipeline = []
for f in new_fn:
if isinstance(f, str) and f.startswith(('lookup:', 'validate:')):
xt = _DeferredTransform.from_string(f)
pipeline.append(xt.bind(self._cursor))
elif callable(getattr(f, 'bind', None)):
pipeline.append(f.bind(self._cursor))
else:
pipeline.append(f)
col_def['fn'] = pipeline
@property
def name(self) -> str:
return self._name
@property
def columns(self) -> dict:
return self._columns
@property
def paramstyle(self) -> str:
return self._paramstyle
@property
def param_config(self) -> Dict[str, Tuple[str, ...]]:
return self._param_config
@property
def cursor(self) -> Cursor:
return self._cursor
@cursor.setter
def cursor(self, value: Cursor):
old_paramstyle = self._paramstyle
self._cursor = value
self._paramstyle = value.connection.driver.paramstyle
if old_paramstyle != self._paramstyle:
self._reset()
logger.info(
f"Table {self._name}: paramstyle changed from {old_paramstyle} "
f"to {self._paramstyle}, cache reset"
)
else:
self._reset_counts()
logger.info(f"Table {self._name}: cursor changed, counts reset")
@property
def req_cols(self) -> Tuple[str]:
return self._req_cols
@property
def key_cols(self) -> Tuple[str]:
return self._key_cols
@property
def row_count(self) -> int:
return self.counts['records']
[docs]
def reqs_met(self, operation: str) -> bool:
if operation == 'insert':
required = [col for col in self._req_cols if col not in self._gen_cols]
elif operation in ('update', 'merge'):
required = list(set(self._req_cols) | set(self._key_cols))
elif operation in ('select', 'delete'):
required = list(self._key_cols)
else:
raise ValueError(f"Invalid operation '{operation}'")
return all(self.values.get(col) not in (None, '') for col in required)
[docs]
def reqs_missing(self, operation: str) -> Set[str]:
if operation == 'insert':
required = [col for col in self._req_cols if col not in self._gen_cols]
elif operation in ('update', 'merge'):
required = list(set(self._req_cols) | set(self._key_cols))
elif operation in ('select', 'delete'):
required = list(self._key_cols)
else:
raise ValueError(f"Invalid operation '{operation}'")
return {col for col in required if self.values.get(col) in (None, '')}
[docs]
def is_ready(self, operation: str) -> bool:
"""Fast O(1) check if the current record is ready for the given operation."""
if operation not in self.OPERATIONS:
raise ValueError(f"Invalid operation '{operation}'")
bit = 1 << self.OPERATIONS.index(operation)
return bool(self._ops_ready & bit)
[docs]
def refresh_readiness(self) -> None:
"""Re-evaluate which operations can be executed based on current values."""
if self.reqs_met('update'):
self._ops_ready = 0b11111 # all 5 operations ready
return
ready = 0
if self.reqs_met('insert'):
ready |= 1 << self.OPERATIONS.index('insert')
if self.reqs_met('select'):
ready |= (
(1 << self.OPERATIONS.index('select')) |
(1 << self.OPERATIONS.index('delete'))
)
if self.reqs_met('merge'):
ready |= 1 << self.OPERATIONS.index('merge')
self._ops_ready = ready
def _wrap_db_expr(self, col_name: str, db_expr: str = None) -> str:
"""Wrap column placeholder with database function if provided."""
if db_expr in (None, ''):
return f':{col_name}'
db_expr = db_expr.strip()
if not db_expr:
return f':{col_name}'
if '#' in db_expr:
return db_expr.replace('#', f':{col_name}')
if db_expr.lower() in DB_CONSTANTS:
return db_expr
if '(' in db_expr and ')' in db_expr:
return db_expr
return f'{db_expr}(:{col_name})'
def _finalize_sql(self, operation: str, sql: str) -> None:
self._sql_statements[operation], self._param_config[operation] = process_sql_parameters(sql, self._paramstyle)
def _create_select(self) -> str:
"""Generate SELECT statement with named parameters."""
if not self._key_cols:
raise ValueError(f"Cannot create SELECT for table {self._name}: no key columns defined")
table_name = quote_identifier(self._name)
quoted_cols = []
conditions = []
for col, col_def in self._columns.items():
ident = quote_identifier(col)
quoted_cols.append(ident)
bind_name = col_def['bind_name']
if bind_name in self._key_cols:
db_expr = col_def.get('db_expr')
placeholder = self._wrap_db_expr(bind_name, db_expr)
conditions.append(f"{ident} = {placeholder}")
cols_str = ', '.join(quoted_cols)
if len(quoted_cols) > 4:
cols_str = wrap_at_comma(cols_str)
sql = f"SELECT {cols_str} \nFROM {table_name}"
if conditions:
conditions_str = '\n AND '.join(conditions)
sql += f"\nWHERE {conditions_str}"
logger.debug(f"Generated select SQL for {self._name}:\n{sql}")
return sql
def _create_insert(self) -> str:
table_name = quote_identifier(self._name)
insert_cols = []
placeholders = []
for col, col_def in self._columns.items():
bind_name = col_def['bind_name']
if bind_name in self._gen_cols:
continue
insert_cols.append(col)
db_expr = col_def.get('db_expr')
placeholders.append(self._wrap_db_expr(bind_name, db_expr))
if not insert_cols:
raise ValueError(f"Table {self._name} has no columns to insert (all auto_gen?)")
cols_str = ', '.join(quote_identifier(col) for col in insert_cols)
placeholders_str = ', '.join(placeholders)
if len(insert_cols) > 4:
cols_str = wrap_at_comma(cols_str)
placeholders_str = wrap_at_comma(placeholders_str)
sql = f"INSERT INTO {table_name} ({cols_str})\nVALUES\n({placeholders_str})"
logger.debug(f"Generated insert SQL for {self._name}:\n{sql}")
return sql
def _create_update(self) -> str:
"""Generate UPDATE statement with named parameters."""
if not self._key_cols:
raise ValueError(f"Cannot create UPDATE for table {self._name}: no key columns defined")
table_name = quote_identifier(self._name)
update_cols = []
conditions = []
for col, col_def in self._columns.items():
ident = quote_identifier(col)
bind_name = col_def['bind_name']
db_expr = col_def.get('db_expr')
placeholder = self._wrap_db_expr(bind_name, db_expr)
if bind_name in self._key_cols:
conditions.append(f'{ident} = {placeholder}')
elif bind_name not in self._update_excludes:
update_cols.append(f'{ident} = {placeholder}')
set_clause_str = ', '.join(update_cols)
if len(update_cols) > 4:
set_clause_str = wrap_at_comma(set_clause_str)
conditions_str = '\n AND '.join(conditions)
sql = f"UPDATE {table_name} SET {set_clause_str} \nWHERE {conditions_str}"
logger.debug(f"Generated update SQL for {self._name}:\n{sql}")
return sql
def _create_delete(self) -> str:
"""Generate DELETE statement with named parameters."""
if not self._key_cols:
raise ValueError(f"Cannot create DELETE for table {self._name}: no key columns defined")
table_name = quote_identifier(self._name)
conditions = []
for col, col_def in self._columns.items():
bind_name = col_def['bind_name']
if bind_name not in self._key_cols:
continue
quoted_col = quote_identifier(col)
db_expr = col_def.get('db_expr')
placeholder = self._wrap_db_expr(bind_name, db_expr)
conditions.append(f"{quoted_col} = {placeholder}")
conditions_str = '\n AND '.join(conditions)
sql = f"DELETE FROM {table_name} \nWHERE {conditions_str}"
logger.debug(f"Generated delete SQL for {self._name}:\n{sql}")
return sql
def _should_use_upsert(self) -> bool:
"""Determine whether to use upsert syntax vs MERGE statement."""
return self._cursor.connection.dialect.use_upsert
def _create_upsert(self) -> str:
"""Create INSERT ... ON DUPLICATE KEY/CONFLICT statement with named parameters."""
table_name = quote_identifier(self._name)
cols = []
placeholders = []
key_cols = []
update_cols = []
for col, col_def in self._columns.items():
ident = quote_identifier(col)
bind_name = col_def['bind_name']
db_expr = col_def.get('db_expr')
placeholder = self._wrap_db_expr(bind_name, db_expr)
cols.append(ident)
placeholders.append(placeholder)
if bind_name in self._key_cols:
key_cols.append(ident)
elif bind_name not in self._update_excludes:
update_cols.append((col, ident, bind_name, db_expr))
cols_str = ', '.join(cols)
placeholders_str = ', '.join(placeholders)
if len(cols) > 4:
cols_str = wrap_at_comma(cols_str)
placeholders_str = wrap_at_comma(placeholders_str)
sql = self._cursor.connection.dialect.upsert_sql(
table_name, cols_str, placeholders_str, key_cols, update_cols
)
logger.debug(f"Generated upsert SQL for {self._name}:\n{sql}")
return sql
def _create_merge_statement(self) -> str:
"""Create traditional MERGE statement with named parameters."""
table_name = quote_identifier(self._name)
all_cols = []
key_conditions = []
update_cols = []
for col, col_def in self._columns.items():
ident = quote_identifier(col)
bind_name = col_def['bind_name']
db_expr = col_def.get('db_expr')
placeholder = self._wrap_db_expr(bind_name, db_expr)
all_cols.append((col, ident, placeholder))
if bind_name in self._key_cols:
key_conditions.append(f"t.{ident} = s.{ident}")
elif bind_name not in self._update_excludes:
update_cols.append((col, ident))
sql = self._cursor.connection.dialect.merge_sql(
table_name, all_cols, key_conditions, update_cols
)
logger.debug(f"Generated merge SQL for {self._name}:\n{sql}")
return sql
def _create_merge(self) -> str:
"""Generate MERGE or upsert statement for the cursor's database type."""
if not self._key_cols:
raise ValueError(f"Cannot create MERGE for table {self._name}: no key columns defined")
use_upsert = self._should_use_upsert()
if use_upsert:
return self._create_upsert()
else:
return self._create_merge_statement()
def _generate_sql(self, operation: str) -> None:
if operation not in self.OPERATIONS:
raise ValueError(f"Invalid operation '{operation}'. Must be one of {self.OPERATIONS}")
if operation in ('select', 'update', 'delete', 'merge') and not self._key_cols:
raise ValueError(f"Cannot generate {operation} SQL for table {self._name}: no key columns defined")
if operation == 'insert':
sql = self._create_insert()
elif operation == 'select':
sql = self._create_select()
elif operation == 'update':
sql = self._create_update()
elif operation == 'delete':
sql = self._create_delete()
elif operation == 'merge':
sql = self._create_merge()
self._finalize_sql(operation, sql)
[docs]
def get_sql(self, operation: str) -> str:
"""Returns SQL for an operation. Automatically generates SQL if it hasn't been created yet (lazy)."""
if operation not in self.OPERATIONS:
raise ValueError(f"Invalid operation '{operation}'. Must be one of {self.OPERATIONS}")
if self._sql_statements[operation] is None:
self._generate_sql(operation)
return self._sql_statements[operation]
[docs]
def get_bind_params(self, operation: str, mode: str = None) -> Union[dict, tuple]:
# unchanged original implementation
if operation not in self.OPERATIONS:
raise ValueError(f"Invalid operation '{operation}'. Must be one of {self.OPERATIONS}")
if operation in ('update', 'delete', 'merge') and not self._key_cols:
raise ValueError(f"Cannot get {operation} params: no key columns defined")
param_names = self._param_config[operation]
if not param_names:
_ = self.get_sql(operation)
param_names = self._param_config[operation]
if mode is None or mode not in ('positional', 'named'):
mode = 'positional' if self._paramstyle in ParamStyle.positional_styles() else 'named'
if not param_names:
return () if self._paramstyle in ParamStyle.positional_styles() else {}
filtered_values = {}
for bind_name in param_names:
if bind_name in self.values:
filtered_values[bind_name] = self.values[bind_name]
if mode == 'positional':
return tuple(filtered_values.get(param, None) for param in param_names)
else:
return filtered_values
[docs]
def set_values(self, record: RecordLike):
self.counts['records'] += 1
warn_missing = self.counts['records'] == 1
if not self._record_fields:
# Cache fields so we can calculate missing fields to exclude from updates/merges
# Include both original and normalized field names for dual access support
try:
# Union of original and normalized field names
self._record_fields = set(record.keys()) | set(record.keys(normalized=True))
except TypeError:
# Not a Record object or doesn't support normalized parameter
self._record_fields = set(record.keys())
values = {}
for col, col_def in self.columns.items():
val = None
field = col_def.get('field')
if field == '*':
# Pass whole record to function - no field extraction
val = record
elif isinstance(field, list):
val = list()
for f in field:
if f in record:
val.append(record.get(f))
elif warn_missing:
logger.warning(f'Table {self.name}: field "{f}" not found in record')
elif field:
if warn_missing and field not in record:
if col_def.get('bind_name') in self._req_cols:
raise ValueError(f'Table {self.name}: field "{field}" is required but was not found in record')
else:
logger.warning(f'Table {self.name}: field "{field}" not in record')
val = record.get(field)
# Only apply null_values conversion if val is not the whole record
if field != '*' and isinstance(val, str) and val in self.null_values:
val = None
if val in ('', None) and 'default' in col_def:
default = col_def['default']
val = default() if callable(default) else default
if 'fn' in col_def:
fn = col_def['fn']
if isinstance(fn, (list, tuple)):
for func in fn:
val = func(val)
else:
val = fn(val)
# Store values using bind_name as key (not column name)
bind_name = col_def['bind_name']
values[bind_name] = val
self.values = values
# Automatically update readiness after normal record processing
self.refresh_readiness()
def _reset_counts(self):
"""Reset all operation counters and clear last_error."""
self.counts = {op: 0 for op in self.OPERATIONS}
self.counts['records'] = 0
self.counts['incomplete'] = 0
self.last_error: Optional[ErrorDetail] = None
def _reset(self):
self._sql_statements = {op: None for op in self.OPERATIONS}
self._param_config = {op: () for op in self.OPERATIONS}
self._update_excludes = {
col_def['bind_name']
for col_def in self._columns.values()
if col_def.get('no_update')
}
self._update_excludes_calculated = False
self._record_fields = set()
self._reset_counts()
self.values = {}
self._ops_ready = 0
[docs]
def fetch(self) -> Dict[str, Any]:
err = self.execute('select')
if not err:
return self._cursor.fetchone()
[docs]
def get_column_definitions(self, all_cols: bool = False) -> list:
"""
Introspect database table columns to get type information.
Executes a SELECT * query against the database table and returns type
information for columns defined in this Table object. Validates that
all Table columns exist in the database.
Parameters:
all_cols (bool): If True, return all columns from the database table.
If False, return only columns on the Table object.
Returns:
List of tuples: (column_name, type_obj, internal_size, precision, scale, sql_type_def)
where sql_type_def is the SQL type string like 'VARCHAR(100)' or 'NUMBER(10,2)'
Raises:
ValueError: If a column defined in this Table doesn't exist in the database
Example:
>>> table = Table('users', {'id': {}, 'email': {}}, cursor=cursor)
>>> col_defs = table.get_column_definitions()
>>> for name, type_obj, size, prec, scale, sql_type in col_defs:
... print(f"{name}: {sql_type}")
"""
# Query all columns from database table
self._cursor.execute(f"SELECT * FROM {self._name} WHERE 1=0")
# Build case-insensitive map of column name to type info
db_columns = {
desc[0].upper(): (desc[0], desc[1], desc[3], desc[4], desc[5])
for desc in self._cursor.description
}
dialect = self._cursor.connection.dialect
# Validate and collect type info for Table-defined columns
result = []
if all_cols:
col_list = db_columns.keys()
else:
col_list = self._columns.keys()
for col_name in col_list:
col_name_upper = col_name.upper()
if col_name_upper not in db_columns:
raise ValueError(
f"Column '{col_name}' defined in Table but not found in database table '{self._name}'"
)
db_col_name, type_obj, internal_size, precision, scale = db_columns[col_name_upper]
sql_type_def = dialect.sql_type(type_obj, internal_size, precision, scale)
# Return with database's actual column name for accurate SQL generation
result.append((db_col_name, type_obj, internal_size, precision, scale, sql_type_def))
return result
[docs]
def bind_name_column(self, bind_name):
return self._bind_name_map.get(bind_name)
[docs]
def calc_update_excludes(self, record_fields: Optional[Set[str]] = None):
if record_fields is None:
record_fields = self._record_fields
if not record_fields:
logger.debug(f"No record_fields available for {self.name}, skipping missing-field exclude calculation")
return
excludes = []
for col, col_def in self._columns.items():
bind_name = col_def['bind_name']
field = col_def.get('field')
if col_def.get('no_update'):
excludes.append(bind_name)
continue
if field and field != '*':
if isinstance(field, list):
missing_fields = [f for f in field if f not in record_fields]
if missing_fields:
if bind_name in self.key_cols:
raise ValueError(
f"A key column {col} is sourced from {field}, "
f"but {missing_fields} are missing from source."
)
else:
excludes.append(bind_name)
else:
if field not in record_fields:
if bind_name in self.key_cols:
raise ValueError(
f"A key column {col} is sourced from {field}, "
f"but is missing from source."
)
else:
excludes.append(bind_name)
if excludes:
logger.debug(
f"Columns excluded from update/merge because source field is missing or no_update attribute was set:\n{excludes}"
)
current_excludes = self._update_excludes.copy()
self._update_excludes = set(excludes)
self._update_excludes_calculated = True
if current_excludes != self._update_excludes:
self._sql_statements['update'] = None
self._sql_statements['merge'] = None
def _exec_sql(self, sql: str, params: Union[dict, tuple],
operation: str, raise_error: bool) -> int:
"""
Execute a single SQL statement and update counts and last_error.
On success: increments ``counts[operation]``, sets ``last_error = None``,
returns 0.
On ``DatabaseError``: logs the error, stores an :class:`dbtk.utils.ErrorDetail`
in ``last_error``, re-raises if ``raise_error=True``, otherwise returns 1.
Parameters
----------
sql : str
The SQL statement to execute.
params : dict or tuple
Bind parameters for the statement.
operation : str
One of ``OPERATIONS`` — used to update the correct counter.
raise_error : bool
If True, re-raise the ``DatabaseError`` after logging.
Returns
-------
int
0 on success, 1 on error (when raise_error=False).
"""
try:
self._cursor.execute(sql, params)
self.counts[operation] += 1
self.last_error = None
return 0
except self._cursor.connection.driver.DatabaseError as e:
error_msg = f"SQL failed: {sql}\nParams: {params}\nError: {str(e)}"
logger.error(error_msg)
self.last_error = ErrorDetail(message=str(e), code=getattr(e, 'pgcode', None))
if raise_error:
raise
return 1
[docs]
def execute(self, operation: str, raise_error: bool = False) -> int:
"""
Execute the specified database operation using current record values.
Parameters
----------
operation : str
One of ``'insert'``, ``'select'``, ``'update'``, ``'delete'``, ``'merge'``.
raise_error : bool, default False
If True, re-raise ``DatabaseError`` instead of swallowing it.
If False, the error is captured in ``last_error`` and 1 is returned.
Returns
-------
int
0 on success; 1 when requirements are unmet (incomplete record)
or when a ``DatabaseError`` occurs and ``raise_error=False``.
Side Effects
------------
* ``counts[operation]`` incremented on success.
* ``counts['incomplete']`` incremented when requirements are unmet.
* ``last_error`` set to ``None`` on success or an
:class:`dbtk.utils.ErrorDetail` on database error.
Raises
------
ValueError
If ``operation`` is not valid, or required key columns are missing.
DatabaseError
If the underlying execute fails and ``raise_error=True``.
"""
if operation not in self.OPERATIONS:
raise ValueError(f"Invalid operation '{operation}'")
if operation in ('select', 'update', 'delete', 'merge') and not self._key_cols:
msg = f"Cannot {operation} table {self._name}: no key columns defined"
logger.error(msg)
raise ValueError(msg)
if operation in ('update', 'merge') and self._record_fields and not self._update_excludes_calculated:
self.calc_update_excludes(self._record_fields)
if not self.is_ready(operation):
missing = self.reqs_missing(operation)
msg = f"{operation} requirements not met: columns {missing} are null"
if raise_error:
logger.error(f"Cannot {operation} table {self._name}: {msg}")
raise ValueError(msg)
else:
logger.warning(f"Skipping {operation} on table {self._name}: {msg}")
self.counts['incomplete'] += 1
return 1
sql = self.get_sql(operation)
params = self.get_bind_params(operation)
return self._exec_sql(sql, params, operation, raise_error)
[docs]
def force_positional(self):
""" If cursor paramstyle is named style, switch to the corresponding positional style then rebuild all SQL. """
if self._paramstyle in ParamStyle.positional_styles():
return
self._paramstyle = ParamStyle.get_positional_style(self._paramstyle)
# rebuild SQL and parameter maps
self._reset()
[docs]
def db_expr_cols(self) -> list:
"""
Return list of all columns that use database expressions.
Having db_expr on columns will make the table incompatible with some features such as all BulkSurge operations
and DataSurge operations in pass_through mode.
"""
expr_cols = []
for col, info in self.columns.items():
if info.get('db_expr', None) is not None:
expr_cols.append(col)
return expr_cols
def __repr__(self) -> str:
return f"Table('{self.name}', {len(self.columns)} columns, {self.paramstyle})"