# dbtk/etl/transforms/core.py
"""
Core transformation functions for basic data manipulation.
Includes text processing, type conversion, and basic utility functions.
"""
import re
from .datetime import parse_datetime, parse_date, parse_time, parse_timestamp
from typing import Any, List, Optional, Union, Callable
# Number regex patterns
intsOnlyPattern = re.compile(r'^[\-\+]?\d+$')
numbersOnlyPattern = re.compile(r'^[\-\+]?\d+(\.\d+)?$')
[docs]
def capitalize(val: Any) -> Any:
"""
Title case a string, but only if it's already in all uppercase or all lowercase.
Args:
val: String value to capitalize
Returns:
Title-cased string if applicable, otherwise original value
Example:
capitalize("HELLO WORLD") # "Hello World"
capitalize("hello world") # "Hello World"
capitalize("Hello World") # "Hello World" (unchanged)
"""
if val and hasattr(val, 'title'):
if val in (val.upper(), val.lower()):
return val.title()
return val
[docs]
def coalesce(vals: Union[List[Any], tuple]) -> Any:
"""
Returns the first non-empty and non-None value from a list or tuple.
Args:
vals: List or tuple of values to check
Returns:
First non-empty, non-None value, or the original input if not a list/tuple
Example:
coalesce([None, '', 'first', 'second']) # "first"
coalesce([0, 1, 2]) # 0
coalesce([None, None]) # None
"""
if isinstance(vals, (list, tuple)):
for v in vals:
if v not in ('', None):
return v
[docs]
def get_bool(val: Any) -> Optional[bool]:
"""
Parse a value as a boolean.
Args:
val: Value to parse
Returns:
True for truthy values, False for falsy values, None for None
Truthy: True, 'T', 'TRUE', 'YES', 'Y', '1', 1, 1.0, non-zero numbers
Falsy: False, 'F', 'FALSE', 'NO', 'N', '0', 0, 0.0, empty string
None: None
Example:
get_bool(True) # True
get_bool('yes') # True
get_bool('Y') # True
get_bool(1) # True
get_bool(1.0) # True
get_bool(False) # False
get_bool('no') # False
get_bool(0) # False
get_bool(None) # None
"""
if val is None:
return None
# Handle boolean type directly
if isinstance(val, bool):
return val
# Handle numeric types
if isinstance(val, (int, float)):
return bool(val)
# Handle string values
val_str = str(val).strip().upper()
if val_str in ('T', 'TRUE', 'YES', 'Y', '1'):
return True
elif val_str in ('F', 'FALSE', 'NO', 'N', '0', ''):
return False
# Default: treat non-empty strings as truthy
return bool(val_str)
[docs]
def indicator(val: Any, true_val: str = 'Y', false_val: Any = None) -> Any:
"""
Convert a value to a boolean indicator.
Uses get_bool() for consistent boolean interpretation.
Args:
val: Value to test for truthiness
true_val: Value to return if val is truthy (default: 'Y')
false_val: Value to return if val is falsy (default: None)
Returns:
true_val if val is truthy, false_val otherwise, or None if val is None
Example:
indicator(True) # "Y"
indicator('yes') # "Y"
indicator(1) # "Y"
indicator(False) # None
indicator(0, 'T', 'F') # "F"
indicator(None) # None
"""
parsed = get_bool(val)
if parsed is None:
return None
return true_val if parsed else false_val
[docs]
def get_digits(val: Any) -> str:
"""
Extract only numeric digits from a value, preserving leading zeros.
Removes all non-digit characters except leading +/- signs.
Returns a string to preserve leading zeros.
Args:
val: Value to extract digits from
Returns:
String containing only digits (and possibly leading +/-), or empty string if no value
Example:
get_digits("(800) 123-4567") # "8001234567"
get_digits("012-34-5678") # "012345678"
get_digits("$-42.35") # "4235"
get_digits("+1-555-123-4567") # "15551234567"
"""
if not val:
return None
val_str = str(val)
# Extract only digits
digits = re.sub(r'\D', '', val_str)
return digits if digits else None
[docs]
def to_number(val: Any) -> Optional[float]:
"""
Convert a value to a number by extracting digits and converting to float.
Strips out all non-numeric characters (except leading +/- and decimal point).
Args:
val: Value to convert to number
Returns:
Float value or None if no valid number can be extracted
Example:
to_number("$42.35") # 42.35
to_number("$-42.35") # -42.35
to_number("1,234.56") # 1234.56
to_number("N/A") # None
"""
if not val:
return None
val_str = str(val).strip()
if not val_str:
return None
# Remove everything except digits, +, -, and .
cleaned = re.sub(r'[^\d+\-.]', '', val_str)
# Extract first number (same pattern as numbersOnlyPattern but without anchors)
match = re.search(r'[\-\+]?\d+(\.\d+)?', cleaned)
if match:
try:
return float(match.group())
except ValueError:
return None
return None
[docs]
def get_int(val: Any) -> Optional[int]:
"""
Convert value to integer.
Handles string numbers with decimals and currency symbols by using to_number().
Args:
val: Value to convert
Returns:
Integer value or None if val is falsy or cannot be converted
Example:
get_int("123") # 123
get_int("123.45") # 123
get_int("$123.45") # 123
get_int(123.45) # 123
get_int(None) # None
"""
num = to_number(val)
if num is not None:
return int(num)
return None
[docs]
def get_float(val: Any) -> Optional[float]:
"""
Convert value to float.
Handles currency symbols and formatting by using to_number().
Args:
val: Value to convert
Returns:
Float value or None if val is falsy or cannot be converted
Example:
get_float("123.45") # 123.45
get_float("$123.45") # 123.45
get_float("123") # 123.0
get_float(None) # None
"""
return to_number(val)
def maxlen(val:Any, limit:int) -> str:
limit = int(limit)
if limit < 1:
raise ValueError('Maxlen must be at least 1.')
if val:
return str(val)[:limit]
def nth(val:Any, n:int) -> Any:
n = int(n)
if val and -len(val) <= n < len(val):
return val[n]
[docs]
def normalize_whitespace(val: Any) -> str:
"""
Normalize whitespace in a string.
- Strips leading and trailing whitespace
- Collapses multiple spaces/tabs/newlines into single spaces
Args:
val: Value to normalize
Returns:
String with normalized whitespace, or empty string if no value
Example:
normalize_whitespace(" hello world ") # "hello world"
normalize_whitespace("hello\\n\\nworld") # "hello world"
normalize_whitespace("hello\\t\\tworld") # "hello world"
"""
if not val:
return ''
val_str = str(val)
# Replace all whitespace sequences with single space
normalized = re.sub(r'\s+', ' ', val_str)
return normalized.strip()
[docs]
def split_and_get(val: str, index: int, delimiter: str = ',') -> Optional[str]:
"""
Get an item from a delimited string list.
Args:
val: Delimited string
index: Zero-based index of item to retrieve
delimiter: Delimiter character (default: ',')
Returns:
Item at specified index (stripped), or None if index out of range
Example:
split_and_get("a,b,c", 1) # "b"
split_and_get("a|b|c", 0, "|") # "a"
split_and_get("a,b", 5) # None
"""
if val:
index = int(index)
values = val.split(delimiter)
if -len(values) <= index < len(values):
return values[index].strip()
return None
def _parse_param(p):
_sentinals = {'True': True, 'False': False, 'None': None}
if p in _sentinals:
return _sentinals[p]
if p.startswith('+') and p[1:].isdigit():
return int(p[1:])
if p.startswith('-') and len(p) > 1 and p[1:].isdigit():
return int(p)
return p
[docs]
def fn_resolver(shorthand: str) -> Callable:
"""
Convert a concise string shorthand into a transformation function.
Syntax
------
Shorthands take one of three forms::
name simple name, no arguments e.g. 'int'
name:arg1:arg2 name with colon-separated args e.g. 'maxlen:200'
type.method:arg1 cast to type, call method e.g. 'str.rjust:+9:0'
Parameters passed after the first ``:`` are strings by default.
Prefix an integer with ``+`` for non-negative or ``-`` for negative to force
it to int (required where the called function expects an integer, e.g.
``str.rjust``). ``True``, ``False``, and ``None`` are
parsed to their Python equivalents.
Names
-----
Type conversion — None-safe (empty string or None returns None):
``int``, ``float``, ``bool``
``digits`` strip non-digit characters
``number`` parse formatted numbers (e.g. ``'1,234.56'``)
Date / time:
``date``, ``datetime``, ``time``, ``timestamp``
String:
``maxlen:50`` truncate to 50 characters
``split_and_get:0`` first comma-delimited field
``split_and_get:1:\t`` second tab-delimited field
``str.method`` all string methods (capitalize, upper, lower, split) are available using cast and call format, see below.
List / Tuple:
``nth:0`` first element of a sequence (``nth:-1`` → last)
``coalesce`` first non-empty, non-None value
Boolean indicators:
``indicator`` True → ``'Y'``, False/None → ``None``
``indicator:Y:N`` True → ``'Y'``, False → ``'N'``
``indicator:N:Y`` True → ``'N'``, False → ``'Y'`` (inverted)
``indicator:None:Y`` True → ``None``, False → ``'Y'`` (active-flag inversion)
``indicator:1:0`` True → ``'1'``, False → ``'0'``
Cast and call (``type.method``):
``str.upper`` ``str(val).upper()``
``str.strip:="`` strip ``=`` and ``"`` chars (e.g. Excel ``="val"``)
``str.lstrip:0`` strip leading zeros
``str.split:,`` split on comma → list
``str.rjust:+9:0`` right-justify to width 9, padded with ``'0'``
``str.ljust:+10: `` left-justify to width 10, padded with space
``datetime.strftime:%Y-%m-%d`` format a parsed datetime
``int.to_bytes:+4:big`` ``int(val).to_bytes(4, 'big')``
``float.hex`` ``float(val).hex()``
Supported cast types: ``int``, ``float``, ``str``, ``datetime``.
Database:
``lookup:table:key_col:val_col`` look up val_col from table by key_col
``validate:table:key_col`` assert key_col exists in table
Returns
-------
callable or _DeferredTransform
Single-argument function; or a _DeferredTransform for ``lookup:``/``validate:``
that must be bound to a cursor before use.
Raises
------
ValueError
If the shorthand is not recognized.
Examples
--------
>>> fn_resolver('int')('123')
123
>>> fn_resolver('maxlen:10')('supercalifragilistic')
'supercalif'
>>> fn_resolver('indicator:N:Y')(True)
'N'
>>> fn_resolver('str.rjust:+9:0')('123')
'000000123'
>>> fn_resolver('str.split:,')('a,b,c')
['a', 'b', 'c']
"""
shorthand = shorthand.lstrip() # lstrip only — strip would eat tab delimiters
if shorthand.startswith(('lookup:', 'validate:')):
from .database import _DeferredTransform
return _DeferredTransform.from_string(shorthand)
parts = shorthand.split(':')
casts = {
'int': int, 'float': float, 'str': str,
'bool': bool, 'datetime': parse_datetime,
}
direct = {
'int': get_int, 'float': get_float, 'bool': get_bool,
'digits': get_digits, 'number': to_number,
'date': parse_date, 'datetime': parse_datetime,
'time': parse_time, 'timestamp': parse_timestamp,
'maxlen': maxlen, 'split_and_get': split_and_get,
'nth': nth, 'coalesce': coalesce, 'indicator': indicator,
}
if '.' in parts[0]:
cast, fn = parts[0].split('.', 1)
else:
fn = direct.get(parts[0])
cast = None
params = [_parse_param(p) for p in parts[1:]] if len(parts) > 1 else []
if cast and cast in casts:
t = casts[cast]
return lambda x, _t=t, _m=fn: None if x is None else getattr(_t(x), _m)(*params)
elif callable(fn):
return lambda x: None if x is None else fn(x, *params)
else:
raise ValueError(f"Unrecognized fn shorthand: '{shorthand}'")