# dbtk/etl/transforms/datetime.py
"""
Date and time parsing and transformation functions.
Supports various date/time formats with timezone awareness.
Uses dateutil parser when available for robust parsing, with custom
parsing as fallback.
"""
import datetime as dt
import re
from typing import Any, Optional
from ...defaults import settings
# Check for optional dateutil library
try:
from dateutil import parser as dateutil_parser
HAS_DATEUTIL = True
except ImportError:
HAS_DATEUTIL = False
dateutil_parser = None
# Module-level timezone variable
_default_timezone = None
# Enhanced regex patterns with timezone support
datePattern = re.compile(
r'((?P<y1>\d{4})[\-|\/|\.](?P<m1>\d{1,2})[\-|\/|\.](?P<d1>\d{1,2}))|'
r'((?P<m2>\d{1,2})[\-|\/|\.](?P<d2>\d{1,2})[\-|\/|\.](?P<y2>\d{4}))'
)
dateLongPattern = re.compile(
r'((?P<m1>[a-z]{3,9})[ |\-|\.]+(?P<d1>\d{1,2})[st|nd|rd|th]*[ |\-|\,]+(?P<y1>\d{4}))|'
r'((?P<d2>\d{1,2})*[ |\-|\.]*(?P<m2>[a-z]{3,9})[ |\-|\.|\,]+(?P<y2>\d{4}))',
re.I
)
timePattern = re.compile(
r'(?P<hr>[0-2]?\d):(?P<mi>[0-6]\d):?(?P<sec>[0-6]\d)?(?P<fsec>\.\d{1,9})?'
r'(?P<am> ?[A|P]M)?'
r'(?P<tz>[ ]?(?P<offset>[+-]\d{2}:?\d{2})|[ ]?(?P<tzname>Z|UTC|GMT|EST|CST|MST|PST|EDT|CDT|MDT|PDT))?',
re.I
)
# ISO 8601 datetime pattern with timezone
isoPattern = re.compile(
r'(?P<year>\d{4})-(?P<month>\d{2})-(?P<day>\d{2})[T ]'
r'(?P<hour>\d{2}):(?P<minute>\d{2}):(?P<second>\d{2})'
r'(?P<microsecond>\.\d{1,6})?'
r'(?P<timezone>Z|[+-]\d{2}:?\d{2})?'
)
# Month name constants
MONTHS_SHORT = ['', 'JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN',
'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC']
MONTHS_LONG = ['', 'JANUARY', 'FEBRUARY', 'MARCH', 'APRIL', 'MAY', 'JUNE',
'JULY', 'AUGUST', 'SEPTEMBER', 'OCTOBER', 'NOVEMBER', 'DECEMBER']
# Timezone mappings
TIMEZONE_OFFSETS = {
'Z': dt.timezone.utc,
'UTC': dt.timezone.utc,
'GMT': dt.timezone.utc,
'EST': dt.timezone(dt.timedelta(hours=-5)),
'EDT': dt.timezone(dt.timedelta(hours=-4)),
'CST': dt.timezone(dt.timedelta(hours=-6)),
'CDT': dt.timezone(dt.timedelta(hours=-5)),
'MST': dt.timezone(dt.timedelta(hours=-7)),
'MDT': dt.timezone(dt.timedelta(hours=-6)),
'PST': dt.timezone(dt.timedelta(hours=-8)),
'PDT': dt.timezone(dt.timedelta(hours=-7)),
}
# Timezone mapping for dateutil
TZINFOS = {k: int(v.utcoffset(None).total_seconds()) for k, v in TIMEZONE_OFFSETS.items()}
def set_default_timezone(timezone_name: str):
"""
Set the default timezone for date/time parsing.
Args:
timezone_name: Timezone name (e.g., 'UTC', 'EST', 'America/New_York')
or offset string (e.g., '+05:00', '-08:00')
Raises:
ValueError: If timezone format is unrecognized
Example:
set_default_timezone('UTC')
set_default_timezone('EST')
set_default_timezone('+05:00')
set_default_timezone('America/New_York') # Requires pytz or dateutil
"""
global _default_timezone
if timezone_name and timezone_name.upper() in TIMEZONE_OFFSETS:
_default_timezone = TIMEZONE_OFFSETS[timezone_name.upper()]
elif timezone_name:
# Try to parse as offset like +05:00 or -0800
try:
offset_match = re.match(r'([+-])(\d{2}):?(\d{2})', timezone_name)
if offset_match:
sign = 1 if offset_match.group(1) == '+' else -1
hours = int(offset_match.group(2))
minutes = int(offset_match.group(3))
total_minutes = sign * (hours * 60 + minutes)
_default_timezone = dt.timezone(dt.timedelta(minutes=total_minutes))
else:
raise ValueError(f"Unknown timezone format: {timezone_name}")
except Exception:
# Try dateutil/pytz as fallback if available
try:
import pytz
_default_timezone = pytz.timezone(timezone_name)
except ImportError:
try:
from dateutil.tz import gettz
_default_timezone = gettz(timezone_name)
except ImportError:
raise ValueError(f"Unknown timezone: {timezone_name}")
else:
_default_timezone = None
def get_default_timezone():
"""
Get the current default timezone.
Returns:
Current default timezone object or None
"""
return _default_timezone
def _normalize_timezone(tz):
"""Convert dateutil timezone objects to stdlib timezone objects."""
if tz is None:
return None
# Already a stdlib timezone
if isinstance(tz, dt.timezone):
return tz
# Convert dateutil timezone to stdlib
try:
# Get the UTC offset
offset = tz.utcoffset(None)
if offset is None:
return None
# Check if it's UTC
if offset.total_seconds() == 0:
return dt.timezone.utc
# Create stdlib timezone with same offset
return dt.timezone(offset)
except (AttributeError, TypeError):
return tz
def _parse_timezone_offset(tz_str: str) -> Optional[dt.timezone]:
"""
Parse timezone offset string into timezone object.
Args:
tz_str: Timezone string (e.g., 'Z', 'UTC', '+05:00', '-0800')
Returns:
Timezone object or None if parsing fails
"""
if not tz_str:
return None
tz_str = tz_str.strip().upper()
# Check known timezone abbreviations
if tz_str in TIMEZONE_OFFSETS:
return TIMEZONE_OFFSETS[tz_str]
# Parse offset format like +05:00, -0800, +05:30
offset_match = re.match(r'([+-])(\d{2}):?(\d{2})', tz_str)
if offset_match:
sign = 1 if offset_match.group(1) == '+' else -1
hours = int(offset_match.group(2))
minutes = int(offset_match.group(3))
total_minutes = sign * (hours * 60 + minutes)
return dt.timezone(dt.timedelta(minutes=total_minutes))
return None
[docs]
def parse_date(val: Any, default_tz: Optional[str] = None) -> Optional[dt.date]:
"""
Parse various date formats to date object.
Args:
val: Date string, datetime object, or other value
default_tz: Default timezone (not used for dates, kept for consistency)
Returns:
date object or None if parsing fails
Example:
parse_date("2024-01-15") # -> date(2024, 1, 15)
parse_date("01/15/2024") # -> date(2024, 1, 15)
parse_date("15 Jan 2024") # -> date(2024, 1, 15)
"""
if not val or val == '':
return None
# Check datetime first (it's a subclass of date)
if isinstance(val, dt.datetime):
return val.date()
if isinstance(val, dt.date):
return val
val_str = str(val).strip()
if not val_str:
return None
# Try dateutil first if available
if HAS_DATEUTIL:
try:
parsed = dateutil_parser.parse(val_str, default=dt.datetime(1900, 1, 1))
return parsed.date()
except (ValueError, TypeError):
pass
# Fall back to custom parsing
# Try standard date patterns first
match = datePattern.search(val_str)
if not match:
match = dateLongPattern.search(val_str)
if match:
mdict = match.groupdict()
yr = int(mdict.get('y1') or mdict.get('y2'))
mon = str(mdict.get('m1') or mdict.get('m2')).upper()
dy = int(mdict.get('d1') or mdict.get('d2') or 1)
if mon.isdigit():
mon = int(mon)
elif mon in MONTHS_SHORT:
mon = MONTHS_SHORT.index(mon)
elif mon in MONTHS_LONG:
mon = MONTHS_LONG.index(mon)
else:
return None
if yr and mon and dy:
try:
return dt.date(yr, mon, dy)
except ValueError:
return None
# Try ISO format
iso_match = isoPattern.search(val_str)
if iso_match:
try:
year = int(iso_match.group('year'))
month = int(iso_match.group('month'))
day = int(iso_match.group('day'))
return dt.date(year, month, day)
except ValueError:
return None
return None
def parse_time(val: Any) -> Optional[dt.time]:
"""
Parse various time formats to time object.
Args:
val: Time string or other value
Returns:
time object or None if parsing fails
Example:
parse_time("14:30:00") # -> time(14, 30, 0)
parse_time("2:30 PM") # -> time(14, 30, 0)
parse_time("14:30:00-05:00") # -> time(14, 30, 0, tzinfo=...)
parse_time("14:30:00 EST") # -> time(14, 30, 0, tzinfo=EST)
"""
if not val or val == '':
return None
if isinstance(val, dt.time):
return val
if isinstance(val, dt.datetime):
return val.time()
val_str = str(val).strip()
if not val_str:
return None
# Try dateutil first if available
if HAS_DATEUTIL:
try:
parsed = dateutil_parser.parse(val_str, default=dt.datetime(1900, 1, 1), tzinfos=TZINFOS)
normalized_tz = _normalize_timezone(parsed.tzinfo)
result_time = parsed.time().replace(tzinfo=normalized_tz)
return result_time
except (ValueError, TypeError):
pass
# Fall back to custom parsing
match = timePattern.search(val_str)
if match:
mdict = match.groupdict()
hr = int(mdict.get('hr') or 0)
mi = int(mdict.get('mi') or 0)
sec = int(mdict.get('sec') or 0)
# Handle fractional seconds
fsec_str = mdict.get('fsec') or '0'
if fsec_str.startswith('.'):
fsec_str = fsec_str[1:]
# Pad or truncate to 6 digits (microseconds)
fsec_str = fsec_str.ljust(6, '0')[:6]
msec = int(fsec_str)
# Handle AM/PM
am_pm = (mdict.get('am') or '').strip().upper()
if am_pm == 'PM' and hr < 12:
hr += 12
elif am_pm == 'AM' and hr == 12:
hr = 0
# Handle timezone
tz_info = None
tz_str = mdict.get('tz')
if tz_str:
tz_info = _parse_timezone_offset(tz_str)
try:
return dt.time(hr, mi, sec, msec, tzinfo=tz_info)
except ValueError:
return None
return None
[docs]
def parse_datetime(val: Any) -> Optional[dt.datetime]:
"""
Parse various datetime formats to datetime object.
Preserves timezone if present in the input string, otherwise returns naive datetime.
Use parse_datetimetz() to automatically apply default timezone from settings.
Args:
val: Datetime string, date object, or other value
Returns:
datetime object or None if parsing fails
Example:
parse_datetime("2024-01-15 14:30:00") # -> naive datetime
parse_datetime("2024-01-15T14:30:00Z") # -> datetime with UTC
parse_datetime("01/15/2024 2:30 PM EST") # -> datetime with EST
"""
if not val or val == '':
return None
if isinstance(val, dt.datetime):
return val
if isinstance(val, dt.date):
return dt.datetime.combine(val, dt.time.min)
val_str = str(val).strip()
if not val_str:
return None
# Try dateutil first if available
if HAS_DATEUTIL:
try:
parsed = dateutil_parser.parse(val_str, tzinfos=TZINFOS)
# Normalize timezone from dateutil to stdlib
if parsed.tzinfo is not None:
parsed = parsed.replace(tzinfo=_normalize_timezone(parsed.tzinfo))
return parsed
except (ValueError, TypeError):
pass
# Fall back to custom parsing
# Try ISO 8601 format first
iso_match = isoPattern.search(val_str)
if iso_match:
try:
year = int(iso_match.group('year'))
month = int(iso_match.group('month'))
day = int(iso_match.group('day'))
hour = int(iso_match.group('hour'))
minute = int(iso_match.group('minute'))
second = int(iso_match.group('second'))
# Handle microseconds
microsecond = 0
if iso_match.group('microsecond'):
msec_str = iso_match.group('microsecond')[1:] # Remove the dot
msec_str = msec_str.ljust(6, '0')[:6] # Pad or truncate to 6 digits
microsecond = int(msec_str)
# Handle timezone
tz_info = None
if iso_match.group('timezone'):
tz_info = _parse_timezone_offset(iso_match.group('timezone'))
result = dt.datetime(year, month, day, hour, minute, second, microsecond, tzinfo=tz_info)
return result
except ValueError:
pass
# Try combining date and time parsing
d = parse_date(val_str)
t = parse_time(val_str)
if d and t:
result = dt.datetime.combine(d, t.replace(tzinfo=None))
# Add timezone info from time if it had any
if t.tzinfo:
result = result.replace(tzinfo=t.tzinfo)
return result
elif d:
result = dt.datetime.combine(d, dt.time.min)
return result
return None
def parse_timestamp(val: Any) -> Optional[dt.datetime]:
"""
Parse timestamp with timezone support.
Args:
val: Timestamp string or other value
Returns:
timezone-aware datetime object or None if parsing fails
Example:
parse_timestamp("2024-01-15 14:30:00+00:00") # -> datetime with UTC
parse_timestamp("2024-01-15T14:30:00Z") # -> datetime with UTC
parse_timestamp("1642262200") # -> datetime from Unix timestamp
"""
if not val or val == '':
return None
val_str = str(val).strip()
# Check if it's a Unix timestamp (10 digits for seconds, more for milliseconds)
if re.match(r'^\d{10}(\.\d+)?$', val_str):
try:
timestamp = float(val_str)
return dt.datetime.fromtimestamp(timestamp, tz=dt.timezone.utc)
except (ValueError, OSError):
pass
# Use parse_datetime for everything else
return parse_datetime(val)
def parse_datetimetz(val: Any) -> Optional[dt.datetime]:
"""
Parse datetime and apply default timezone from settings if datetime is naive.
This is a convenience function that applies the timezone set via
set_default_timezone() to naive datetimes. If the datetime already has
a timezone, it is preserved.
Args:
val: Datetime string, date object, or other value
Returns:
datetime object or None if parsing fails
Example:
set_default_timezone('UTC')
parse_datetimetz("2024-01-15 14:30:00") # -> datetime with UTC
parse_datetimetz("2024-01-15T14:30:00Z") # -> datetime with UTC (preserved)
"""
dt_obj = parse_datetime(val)
if dt_obj and dt_obj.tzinfo is None and _default_timezone:
dt_obj = dt_obj.replace(tzinfo=_default_timezone)
return dt_obj
# Initialize default timezone from settings if present
_default_tz_setting = settings.get('default_timezone', None)
if _default_tz_setting:
set_default_timezone(_default_tz_setting)