Source code for dbtk.logging_utils

# dbtk/logging_utils.py
"""
Logging utilities for integration scripts.

Provides convenient logging setup for ETL and integration scripts that follow
the pattern of creating timestamped log files like script_name_YYYYMMDD_HHMMSS.log
"""

import logging
import sys
from pathlib import Path
from datetime import datetime, timedelta
from typing import Optional, Tuple, List

logger = logging.getLogger(__name__)

# Module-level state for error tracking
_error_handler: Optional['ErrorCountHandler'] = None
_main_log_path: Optional[str] = None
_error_log_path: Optional[str] = None
_split_errors: bool = False


[docs] class ErrorCountHandler(logging.Handler): """Custom handler that counts ERROR and CRITICAL level messages and lazily creates error log."""
[docs] def __init__(self, error_log_path: Optional[str] = None, formatter: Optional[logging.Formatter] = None): super().__init__() self.error_count = 0 self.error_log_path = error_log_path self.formatter = formatter self._error_file_handler = None
[docs] def emit(self, record): """Count errors and lazily create error log file on first error.""" if record.levelno >= logging.ERROR: self.error_count += 1 # Lazily create error file handler on first error if self.error_log_path and self._error_file_handler is None: try: self._error_file_handler = logging.FileHandler(self.error_log_path, encoding='utf-8') self._error_file_handler.setLevel(logging.ERROR) if self.formatter: self._error_file_handler.setFormatter(self.formatter) # Add to root logger logging.getLogger().addHandler(self._error_file_handler) logger.debug(f"Created error log file: {self.error_log_path}") except Exception as e: logger.warning(f"Failed to create error log file: {e}")
[docs] def setup_logging( script_name: Optional[str] = None, log_dir: Optional[str] = None, level: Optional[str] = None, split_errors: Optional[bool] = None, console: Optional[bool] = None ) -> Tuple[str, Optional[str]]: """ Configure logging for integration scripts. Creates log files with pattern: {script_name}_{datetime}.log Optionally creates separate error log: {script_name}_{datetime}_error.log Args: script_name: Base name for log files (defaults to script filename without extension) log_dir: Directory for log files (defaults to config setting or './logs') level: Logging level string - DEBUG, INFO, WARNING, ERROR (defaults to config or 'INFO') split_errors: Create separate error log file (defaults to config or True) console: Also log to console/stdout (defaults to config or True) Returns: Tuple of (log_file_path, error_log_path or None) Example ------- :: import dbtk # Simple - uses defaults from config dbtk.setup_logging('fire_nation_etl') # Custom settings dbtk.setup_logging('my_script', log_dir='/var/log/etl', level='DEBUG') # Single log file per day (set filename_format in config to '%Y%m%d') dbtk.setup_logging('daily_job') # Single rolling log file (set filename_format in config to '') dbtk.setup_logging('rolling_log') Note: To customize filename patterns, set 'logging.filename_format' in dbtk.yml: - '%Y%m%d_%H%M%S' - One log per run with date and time (default) - '%Y%m%d' - One log per day - '' - Single rolling log file (overwrites) """ from dbtk.config import get_setting # Get script name from command line if not provided if script_name is None: script_name = Path(sys.argv[0]).stem # Load logging config dict logging_config = get_setting('logging', {}) # Get settings with fallbacks log_dir = log_dir or logging_config.get('directory', './logs') level = level or logging_config.get('level', 'INFO') split_errors = split_errors if split_errors is not None else logging_config.get('split_errors', True) console = console if console is not None else logging_config.get('console', True) log_format = logging_config.get('format', '%(asctime)s [%(levelname)s] %(name)s: %(message)s') timestamp_format = logging_config.get('timestamp_format', '%Y-%m-%d %H:%M:%S') filename_format = logging_config.get('filename_format', '%Y%m%d_%H%M%S') # Create log directory log_dir_path = Path(log_dir) log_dir_path.mkdir(parents=True, exist_ok=True) # Generate filename with optional timestamp if filename_format: timestamp = datetime.now().strftime(filename_format) log_file = log_dir_path / f"{script_name}_{timestamp}.log" error_file = log_dir_path / f"{script_name}_{timestamp}_error.log" if split_errors else None else: # No timestamp - single rolling log file log_file = log_dir_path / f"{script_name}.log" error_file = log_dir_path / f"{script_name}_error.log" if split_errors else None # Configure root logger root_logger = logging.getLogger() root_logger.setLevel(getattr(logging, level.upper())) # Remove existing handlers to avoid duplicates # Close handlers first to release file handles (critical for Windows) for handler in root_logger.handlers[:]: handler.close() root_logger.removeHandler(handler) # Create formatter formatter = logging.Formatter(log_format, datefmt=timestamp_format) # Create and add error counting handler with lazy error log creation global _error_handler _error_handler = ErrorCountHandler( error_log_path=str(error_file) if split_errors and error_file else None, formatter=formatter ) _error_handler.setLevel(logging.ERROR) root_logger.addHandler(_error_handler) # Main log file handler (all messages) file_handler = logging.FileHandler(log_file, encoding='utf-8') file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(formatter) root_logger.addHandler(file_handler) # Note: Error log file handler is created lazily by ErrorCountHandler on first error # Console handler if console: console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(getattr(logging, level.upper())) console_handler.setFormatter(formatter) root_logger.addHandler(console_handler) # Log startup message logging.info(f"Logging initialized: {log_file}") if split_errors and error_file: logging.info(f"Error log will be created at: {error_file} (if errors occur)") # Store state for errors_logged() function global _main_log_path, _error_log_path, _split_errors _main_log_path = str(log_file) _error_log_path = str(error_file) if error_file else None _split_errors = split_errors return (str(log_file), str(error_file) if error_file else None)
[docs] def errors_logged() -> Optional[str]: """ Check if any ERROR or CRITICAL messages were logged during this run. Returns the path to the log file containing errors if any were logged, None otherwise. This allows integration scripts to easily detect if errors occurred and take action (e.g., send notification emails). Returns ------- str or None Path to error log (if split_errors=True) or main log (if split_errors=False) when errors were logged. Returns None if no errors were logged or setup_logging() was not called. Example ------- :: import dbtk import logging dbtk.setup_logging('my_integration') # ... do ETL work ... try: process_data() except Exception as e: logging.error(f"Processing failed: {e}") # Check if errors occurred error_log = dbtk.errors_logged() if error_log: print(f"Errors detected! See: {error_log}") # send_notification_email(subject="Integration errors", attachment=error_log) else: print("Integration completed successfully") """ if _error_handler is None: logger.warning("errors_logged() called but setup_logging() was not called") return None if _error_handler.error_count == 0: return None # Errors were logged - return the appropriate log file if _split_errors and _error_log_path: return _error_log_path else: return _main_log_path
[docs] def cleanup_old_logs( log_dir: Optional[str] = None, retention_days: Optional[int] = None, pattern: str = "*.log", dry_run: bool = False ) -> List[str]: """ Remove log files older than retention period. Args: log_dir: Directory to clean (defaults to config setting or './logs') retention_days: Keep logs newer than this many days (defaults to config or 30) pattern: Glob pattern for log files (default: ``'*.log'``) dry_run: If True, only report what would be deleted without actually deleting Returns: List of deleted (or would-be-deleted if dry_run) file paths Example ------- :: import dbtk # Clean logs older than 30 days (from config) deleted = dbtk.cleanup_old_logs() print(f"Deleted {len(deleted)} old log files") # Custom retention deleted = dbtk.cleanup_old_logs(retention_days=7) # Dry run to see what would be deleted would_delete = dbtk.cleanup_old_logs(dry_run=True) print(f"Would delete: {would_delete}") # Clean specific pattern deleted = dbtk.cleanup_old_logs(pattern="error_*.log") """ from dbtk.config import get_setting # Load logging config dict logging_config = get_setting('logging', {}) # Get settings with fallbacks log_dir = log_dir or logging_config.get('directory', './logs') retention_days = retention_days or logging_config.get('retention_days', 30) log_dir_path = Path(log_dir) if not log_dir_path.exists(): logger.warning(f"Log directory does not exist: {log_dir_path}") return [] cutoff = datetime.now() - timedelta(days=retention_days) deleted = [] for log_file in log_dir_path.glob(pattern): if not log_file.is_file(): continue file_mtime = datetime.fromtimestamp(log_file.stat().st_mtime) if file_mtime < cutoff: if dry_run: logger.info(f"Would delete: {log_file}") else: try: log_file.unlink() logger.info(f"Deleted old log: {log_file}") except Exception as e: logger.warning(f"Failed to delete {log_file}: {e}") continue deleted.append(str(log_file)) if not dry_run and deleted: logger.info(f"Cleaned up {len(deleted)} old log files") return deleted