Source code for dbtk.writers.excel

# dbtk/writers/excel.py
"""
Excel writer for database results using openpyxl.
"""
import logging
from typing import Any, Callable, Union, List, Optional, Iterable, Dict, TYPE_CHECKING
from pathlib import Path
from datetime import datetime, date, time
from zipfile import BadZipFile
import fnmatch
import hashlib

from .base import BatchWriter, RecordLike

if TYPE_CHECKING:
    from openpyxl.worksheet.worksheet import Worksheet

try:
    from openpyxl import Workbook, load_workbook
    from openpyxl.styles import Font, NamedStyle, Alignment
    from openpyxl.comments import Comment
    from openpyxl.utils.exceptions import InvalidFileException
    from openpyxl.utils import get_column_letter
    HAS_OPENPYXL = True
except ImportError:
    HAS_OPENPYXL = False
    InvalidFileException = Exception  # fallback so except clause is valid

logger = logging.getLogger(__name__)

MIDNIGHT = time(0, 0)

_BUILTIN_STYLE_NAMES = frozenset({
    'date_style', 'datetime_style', 'hyperlink_style',
    'bold_style', 'header_vert_style',
    'currency_style', 'percent_style', 'comma_style',
})


[docs] class ColumnRule: """Per-column formatting rule for use with :class:`ExcelFormat`. Interchangeable with a plain ``dict`` — ``ExcelWriter`` accepts either. The order of rules in ``ExcelFormat.columns`` is significant: later rules override earlier ones on a per-property basis. Parameters ---------- style : str or dict, optional Static style applied to every data cell in the column. Either a registered style name (str) or an inline property dict (``bg_color``, ``font``, ``number_format``, ``alignment``). header_style : str or dict, optional Static style applied to the header cell only (owns the cell entirely — include ``font={'bold': True}`` if you still want bold). width : float, optional Explicit column width in Excel units; bypasses auto-sizing. hidden : bool, default False Hide the column. comment : str, optional Comment/note added to the header cell. filter : bool, default False Show the auto-filter dropdown on this column only; hides dropdowns on all other columns. group_label : str, optional Label for a group super-header spanning this column range. Only valid on range patterns (``'col_a:col_z'``). conditional_style : callable or list[callable], optional ``lambda record: style_name_or_None`` — evaluated per row; overrides ``style`` and row styles when non-``None``. Multiple callables are composed in order. """
[docs] def __init__(self, style=None, header_style=None, width=None, hidden=False, comment=None, filter=False, group_label=None, conditional_style=None): self.style = style self.header_style = header_style self.width = width self.hidden = hidden self.comment = comment self.filter = filter self.group_label = group_label self.conditional_style = conditional_style
[docs] def to_dict(self): d: dict = {} if self.style is not None: d['style'] = self.style if self.header_style is not None: d['header_style'] = self.header_style if self.width is not None: d['width'] = self.width if self.hidden: d['hidden'] = self.hidden if self.comment is not None: d['comment'] = self.comment if self.filter: d['filter'] = self.filter if self.group_label is not None: d['group_label'] = self.group_label if self.conditional_style is not None: d['conditional_style'] = self.conditional_style return d
[docs] class ExcelFormat: """Formatting configuration for :class:`ExcelWriter`. Interchangeable with a plain ``dict`` — ``ExcelWriter`` accepts either. Using ``ExcelFormat`` gives full IDE autocomplete and type checking. The ``columns`` dict is **order-sensitive**: patterns are applied in definition order and later rules override earlier ones per property. Parameters ---------- styles : dict, optional Named style definitions. Keys are style names, values are property dicts (``bg_color``, ``font``, ``number_format``, ``alignment``). Styles defined here take priority over the built-in styles, so you can redefine ``comma_style``, ``currency_style``, etc. columns : dict, optional Pattern → :class:`ColumnRule` (or plain dict). Patterns are case-insensitive fnmatch globs, literals, or ``'start:end'`` ranges. rows : dict, optional Row-type formatting. Recognised keys: * ``'*'`` — applied to **all** rows (lowest priority). Supports ``height`` and ``style`` (str or dict). * ``'group_header'`` — the group-label row (only present when column ranges carry ``group_label``). Supports ``height`` and ``style`` (str or dict). * ``'header'`` — the column-header row. Supports ``height``. * ``'data'`` — data rows. Nested keys: - ``'style'``: str or dict applied to all data rows. - ``'odd'`` / ``'even'``: ``{'style': style_name}`` for striping. - ``'conditional_style'``: callable or list of callables ``lambda record: style_name_or_None``. Multiple callables are composed in order. - ``'height'``: uniform row height for data rows. Cascade (lowest → highest priority): ``'*'`` → ``'odd'``/``'even'`` → ``'conditional_style'`` callable(s). min_column_width : float, default 6 Minimum auto-sized column width. max_column_width : float, default 60 Maximum auto-sized column width. auto_filter : bool, default False Enable Excel auto-filter dropdowns on the header row. freeze : str or False, optional Freeze-panes cell reference (e.g. ``'D2'``). ``None`` uses the automatic default (``'A2'`` or ``'A3'`` when group headers are present). Pass ``False`` to disable freezing entirely. header_auto_rotate : float or dict, optional Auto-rotate column headers that are significantly wider than their data. Pass a float ratio or ``{'ratio': 1.5, 'min_length': 8, 'height_factor': 6.5}``. tab_color : str, optional Worksheet tab color as a hex string (``'#FF0000'`` or ``'FF0000'``). """
[docs] def __init__(self, styles=None, columns=None, rows=None, min_column_width=6, max_column_width=60, auto_filter=False, freeze=None, header_auto_rotate=None, tab_color=None): self.styles = styles if styles is not None else {} self.columns = columns if columns is not None else {} self.rows = rows if rows is not None else {} self.min_column_width = min_column_width self.max_column_width = max_column_width self.auto_filter = auto_filter self.freeze = freeze self.header_auto_rotate = header_auto_rotate self.tab_color = tab_color
[docs] def to_dict(self): d: dict = { 'min_column_width': self.min_column_width, 'max_column_width': self.max_column_width, } if self.styles: d['styles'] = self.styles if self.columns: d['columns'] = { k: v.to_dict() if isinstance(v, ColumnRule) else v for k, v in self.columns.items() } if self.rows: d['rows'] = self.rows if self.auto_filter: d['auto_filter'] = self.auto_filter if self.freeze is not None: d['freeze'] = self.freeze if self.header_auto_rotate is not None: d['header_auto_rotate'] = self.header_auto_rotate if self.tab_color is not None: d['tab_color'] = self.tab_color return d
[docs] class ExcelWriter(BatchWriter): """ Stateful Excel writer using openpyxl. Keeps the workbook open across multiple write_batch() calls and saves only on context exit. Designed for both single-sheet legacy use and multi-sheet reports. Supports all 3 BatchWriter modes: 1. Complete write from __init__ + write() 2. Batch write (no data on init) + write_batch() 3. Hybrid: data on init + write() + write_batch() Usage examples:: # Mode 1: Traditional single-shot write ExcelWriter(cursor, 'report.xlsx').write() # Mode 2: Pure streaming with write_batch() with ExcelWriter(file='report.xlsx') as writer: writer.write_batch(cursor) # goes to sheet 'Data' # Mode 3: Hybrid - initial data + streaming with ExcelWriter(first_batch, 'report.xlsx') as writer: writer.write() # Write initial batch writer.write_batch(second_batch) # Stream additional batches # Multi-sheet report with ExcelWriter(file='report.xlsx', sheet_name='Summary') as writer: writer.write_batch(summary_data, sheet_name='Summary') writer.write_batch(users_data, sheet_name='Users') writer.write_batch(orders_data, sheet_name='Orders') # Streaming / batch mode with ExcelWriter(file='large.xlsx') as writer: for batch in large_generator: writer.write_batch(batch, sheet_name='Data') # appends to 'Data' """ accepts_file_handle = False preserve_types = True
[docs] def __init__( self, data: Optional[Iterable[RecordLike]] = None, file: Optional[Union[str, Path]] = None, sheet_name: Optional[str] = None, headers: Optional[List[str]] = None, write_headers: bool = True, formatting: Optional[Union[Dict, ExcelFormat]] = None, ): """ Initialize the Excel writer. Parameters ---------- data : Iterable[RecordLike], optional Initial data to write. If None, use write_batch() for streaming mode. file : str or Path, optional Output Excel file (.xlsx). Required for Excel output. sheet_name : str, optional Default/active sheet name to use for write_batch() calls without an explicit sheet_name. headers : List[str], optional Header row text. If None, checks data.description for original column names, then falls back to detected column names. write_headers : bool, default True Whether to write column headers (only when the sheet is empty). formatting : ExcelFormat or dict, optional Worksheet formatting rules. Prefer :class:`ExcelFormat` for IDE autocomplete; a plain ``dict`` is also accepted. See :class:`ExcelFormat` for the full reference. """ if not HAS_OPENPYXL: raise ImportError("ExcelWriter requires openpyxl: pip install openpyxl") if file is None: raise ValueError("ExcelWriter requires an output file path") if isinstance(formatting, ExcelFormat): formatting = formatting.to_dict() super().__init__(data=data, file=file, headers=headers, write_headers=write_headers) self.output_path = Path(file) self.active_sheet: Optional[str] = sheet_name self.workbook: Optional[Workbook] = None self._sheets_written_this_session: set = set() self.formatting: dict = formatting or {} self._link_mapping: dict = {} self._style_cache: dict = {} self._named_style_objects: dict = {} self._col_rules: list = [ (k.lower(), k, v.to_dict() if isinstance(v, ColumnRule) else v) for k, v in self.formatting.get('columns', {}).items() ] self._load_or_create_workbook()
def _load_or_create_workbook(self) -> None: """Load existing workbook or create a new one.""" try: if self.output_path.exists(): with open(self.output_path, mode='r+b'): # make sure it is actually writable (not open in Excel) pass self.workbook = load_workbook(self.output_path) logger.info(f"Loaded existing workbook: {self.output_path}") else: # make sure path is writable with open(self.output_path, mode='wb'): pass # clean up empty file that was just created so an exception doesn't leave us with an invalid Excel file # self.output_path.unlink() self.workbook = Workbook() if 'Sheet' in self.workbook.sheetnames: self.workbook.remove(self.workbook['Sheet']) except (InvalidFileException, BadZipFile, ValueError) as e: raise ValueError( f"File '{self.output_path}' exists but is not a valid Excel workbook. " f"Original error: {e}" ) from e except PermissionError: raise PermissionError( f"Cannot write to '{self.output_path}' - file may be open in Excel or another application. " "Please close the file and try again." ) self._register_styles() def _add_named_style(self, style: 'NamedStyle') -> None: """Register a NamedStyle if not already in the workbook, and cache the object.""" if style.name not in self.workbook.named_styles: self.workbook.add_named_style(style) self._named_style_objects[style.name] = style def _register_styles(self) -> None: """Register styles for this workbook. User-defined styles in ``formatting['styles']`` are registered first so they take precedence over the built-ins. Built-in styles that already exist (e.g. because the user redefined ``comma_style``) are silently skipped by :meth:`_add_named_style`. Built-in styles --------------- date_style Date number format: ``YYYY-MM-DD``. datetime_style Datetime number format: ``YYYY-MM-DD HH:MM:SS``. hyperlink_style Blue underlined font (used automatically by LinkedExcelWriter). bold_style Bold font. Useful as a ``header_style`` when you only want emphasis. header_vert_style Bold font + 90° text rotation. Pair with ``rows={'header': {'height': 120}}`` for narrow rotated headers. currency_style Number format: ``#,##0.00``. percent_style Number format: ``0.00%``. comma_style Number format: ``#,##0``. """ if self.workbook is None: return # User styles first — allows overriding built-ins for style_name, props in self.formatting.get('styles', {}).items(): self._add_named_style(self._build_named_style(style_name, props)) # Built-ins — skipped silently if user already registered the name from openpyxl.styles import Alignment as _Alignment _bold_font = Font(bold=True) for style in [ NamedStyle(name='date_style', number_format='YYYY-MM-DD'), NamedStyle(name='datetime_style', number_format='YYYY-MM-DD HH:MM:SS'), NamedStyle(name='hyperlink_style', font=Font(color="0000FF", underline="single")), NamedStyle(name='bold_style', font=_bold_font), NamedStyle(name='header_vert_style', font=_bold_font, alignment=_Alignment(text_rotation=90, horizontal='center')), NamedStyle(name='currency_style', number_format='#,##0.00'), NamedStyle(name='percent_style', number_format='0.00%'), NamedStyle(name='comma_style', number_format='#,##0'), ]: self._add_named_style(style) @staticmethod def _build_named_style(name: str, props: dict) -> 'NamedStyle': """Build a NamedStyle from a properties dict.""" from openpyxl.styles import PatternFill, Alignment style = NamedStyle(name=name) if 'bg_color' in props: color = props['bg_color'].lstrip('#') style.fill = PatternFill(fill_type='solid', fgColor=color) if 'font' in props: style.font = Font(**props['font']) if 'number_format' in props: style.number_format = props['number_format'] if 'alignment' in props: style.alignment = Alignment(**props['alignment']) return style def _ensure_style(self, props: dict) -> str: """Register an inline format dict as a NamedStyle; return its name.""" key = tuple(sorted((k, str(v)) for k, v in props.items())) name = 'fmt_' + hashlib.md5(str(key).encode()).hexdigest()[:8] if name not in self._named_style_objects: self._add_named_style(self._build_named_style(name, props)) return name def _register_call_styles(self, fmt: dict) -> None: """Register any user-defined styles from a per-call formatting dict.""" for name, props in fmt.get('styles', {}).items(): self._add_named_style(self._build_named_style(name, props)) def _build_col_fmt_map(self, columns: List[str], col_rules: list = None) -> list: """Build per-column formatting list from wildcard pattern rules. Returns a list (indexed by 0-based column position) of property dicts. Patterns are applied in definition order; later rules win per property. Three pattern forms are supported (all case-insensitive): - fnmatch glob: ``'*_fee*'``, ``'resv_*'`` - range: ``'start:end'``, ``':end'`` (from first), ``'start:'`` (to last) A pattern is only treated as a range if it contains ``:`` with no wildcard characters and does not itself match a column name literally. - literal: any exact column name (handled by fnmatch as a degenerate glob) """ if col_rules is None: col_rules = self._col_rules if not col_rules: return [] result: List[dict] = [{} for _ in columns] cols_lower = [c.lower() for c in columns] def _apply(dest: dict, props: dict) -> None: """Merge props into dest, composing overlapping 'style' values.""" for k, v in props.items(): if k == 'style' and 'style' in dest: existing = dest[k] dest[k] = existing if isinstance(existing, list) else [existing] dest[k].append(v) else: dest[k] = v for pattern_lower, pattern, props in col_rules: is_range = ':' in pattern_lower and not any(c in pattern_lower for c in '*?[') and pattern_lower not in cols_lower if 'group_label' in props and not is_range: logger.warning( f"'group_label' is only supported on range patterns (e.g. 'col_a:col_b'); " f"ignoring for pattern '{pattern}'" ) props = {k: v for k, v in props.items() if k != 'group_label'} if is_range: # Range syntax: 'start:end', ':end', 'start:' start_str, end_str = pattern_lower.split(':', 1) start_str, end_str = start_str.strip(), end_str.strip() if start_str: if start_str not in cols_lower: raise ValueError(f"Range start column '{start_str}' not found in result set") start_idx = cols_lower.index(start_str) else: start_idx = 0 if end_str: if end_str not in cols_lower: raise ValueError(f"Range end column '{end_str}' not found in result set") end_idx = cols_lower.index(end_str) else: end_idx = len(columns) - 1 if start_idx > end_idx: raise ValueError( f"Range '{pattern}': '{start_str}' (col {start_idx + 1}) " f"comes after '{end_str}' (col {end_idx + 1})" ) for i in range(start_idx, end_idx + 1): _apply(result[i], props) else: # fnmatch glob (also handles literal column names) for col_idx, col_name_lower in enumerate(cols_lower): if fnmatch.fnmatch(col_name_lower, pattern_lower): _apply(result[col_idx], props) for col_props in result: fmt = col_props.get('style') if isinstance(fmt, list): resolved = [self._ensure_style(f) if isinstance(f, dict) else f for f in fmt] col_props['style'] = self._compose_styles(*resolved) elif isinstance(fmt, dict): col_props['style'] = self._ensure_style(fmt) hfmt = col_props.get('header_style') if isinstance(hfmt, dict): col_props['header_style'] = self._ensure_style(hfmt) return result def _finalize_headers( self, worksheet: 'Worksheet', header_widths: list, data_widths: list, col_fmt: list, rows_fmt: dict, link_mapping: Optional[dict] = None, effective_fmt: Optional[dict] = None, ) -> None: """Apply column widths, auto-rotate, header height, and freeze panes. Called once after data has been sampled, so auto-rotate decisions have accurate data-width information. ``link_mapping`` is LinkedExcelWriter's column → (LinkSource, mode) map; LinkSource display widths substitute for sampled data widths on linked columns. """ link_mapping = link_mapping or {} effective_fmt = effective_fmt if effective_fmt is not None else self.formatting # Determine whether group headers are present has_groups = bool(col_fmt and any(p.get('group_label') for p in col_fmt)) header_row = 2 if has_groups else 1 # Substitute LinkSource display widths for linked columns effective_data_widths = list(data_widths) for col_idx, col_name in enumerate(self.columns, 1): if col_name in link_mapping: source, _ = link_mapping[col_name] if source.max_display_width > 0: effective_data_widths[col_idx - 1] = source.max_display_width # Auto-rotate: detect columns whose header is significantly longer than their data auto_rotated: set = set() har_height_factor = 6.5 har = effective_fmt.get('header_auto_rotate') if har: if isinstance(har, dict): har_min = har.get('min_length', 8) har_ratio = har.get('ratio', 1.5) har_height_factor = har.get('height_factor', 6.5) else: har_min = 8 har_ratio = float(har) har_height_factor = 6.5 for col_idx, (hw, dw) in enumerate(zip(header_widths, effective_data_widths), 1): col_props = col_fmt[col_idx - 1] if col_fmt else {} if col_props.get('header_style'): continue # explicit header_style takes precedence if hw >= har_min and hw > dw * har_ratio: auto_rotated.add(col_idx) worksheet.cell(header_row, col_idx).style = 'header_vert_style' # Column widths: auto-rotated columns use data width only; others use max of both min_col_width = effective_fmt.get('min_column_width', 6) max_col_width = effective_fmt.get('max_column_width', 60) for col_idx, (hw, dw) in enumerate(zip(header_widths, effective_data_widths), 1): raw = dw if col_idx in auto_rotated else max(hw, dw) adjusted = min(max(raw + 2, min_col_width), max_col_width) worksheet.column_dimensions[get_column_letter(col_idx)].width = adjusted # User column-rule overrides (width, hidden, comment) for col_idx, col_props in enumerate(col_fmt, 1): if col_props: col_letter = get_column_letter(col_idx) if 'width' in col_props: worksheet.column_dimensions[col_letter].width = col_props['width'] if 'hidden' in col_props: worksheet.column_dimensions[col_letter].hidden = bool(col_props['hidden']) if 'comment' in col_props: worksheet.cell(header_row, col_idx).comment = Comment(col_props['comment'], '') # Row heights — cascade: '*' (all rows) overridden by specific row type rows_fmt_d = rows_fmt if isinstance(rows_fmt, dict) else {} all_h = rows_fmt_d.get('*', {}).get('height') # Header row height: '*' → 'header' → auto-rotate header_h = rows_fmt_d.get('header', {}).get('height') or all_h if header_h is not None: worksheet.row_dimensions[header_row].height = header_h elif auto_rotated: max_rotated_len = max(header_widths[i - 1] for i in auto_rotated) worksheet.row_dimensions[header_row].height = max_rotated_len * har_height_factor # Group header row (row 1): merged cells spanning each labelled column range if has_groups: group_h = rows_fmt_d.get('group_header', {}).get('height') or all_h if group_h is not None: worksheet.row_dimensions[1].height = group_h group_font = Font(bold=True) group_align = Alignment(horizontal='center', vertical='center') i = 0 while i < len(col_fmt): group = col_fmt[i].get('group_label') if group: j = i while j < len(col_fmt) and col_fmt[j].get('group_label') == group: j += 1 start_col, end_col = i + 1, j # 1-based, inclusive if start_col < end_col: worksheet.merge_cells( start_row=1, start_column=start_col, end_row=1, end_column=end_col ) cell = worksheet.cell(1, start_col) cell.value = group cell.font = group_font cell.alignment = group_align i = j else: i += 1 # Freeze panes freeze = effective_fmt.get('freeze', 'A3' if has_groups else 'A2') if freeze: worksheet.freeze_panes = freeze # Auto-filter on header row filter_cols = {col_idx for col_idx, col_props in enumerate(col_fmt, 1) if col_props.get('filter')} if filter_cols or effective_fmt.get('auto_filter'): last_col = get_column_letter(len(self.columns)) filter_row = header_row worksheet.auto_filter.ref = f"A{filter_row}:{last_col}{filter_row}" if filter_cols: from openpyxl.worksheet.filters import FilterColumn for col_idx in range(1, len(self.columns) + 1): if col_idx not in filter_cols: worksheet.auto_filter.filterColumn.append( FilterColumn(colId=col_idx - 1, hiddenButton=True) ) def _get_or_create_worksheet(self, sheet_name: str) -> 'Worksheet': """Get existing worksheet or create new one.""" from openpyxl.worksheet.worksheet import Worksheet if sheet_name in self.workbook.sheetnames: return self.workbook[sheet_name] else: return self.workbook.create_sheet(sheet_name) def _clear_worksheet(self, worksheet: 'Worksheet') -> None: """Clear all rows from a worksheet.""" # Simply delete all rows - openpyxl will handle this correctly if worksheet.max_row >= 1: worksheet.delete_rows(1, worksheet.max_row) def _get_named_style(self, name: str) -> 'NamedStyle': try: return self._named_style_objects[name] except KeyError: raise KeyError(f"Named style '{name}' not found") def _decompose_style(self, name: str) -> dict: """Extract non-default properties from a named style.""" style = self._get_named_style(name) props = {} if style.number_format and style.number_format != 'General': props['number_format'] = style.number_format fill = style.fill if fill is not None and getattr(fill, 'fill_type', None) not in (None, 'none'): props['fill'] = fill font = style.font if font is not None and any([ font.bold, font.italic, font.underline, font.color, font.size, font.name, font.strike, ]): props['font'] = font aln = style.alignment if aln is not None and any([ aln.horizontal, aln.vertical, aln.text_rotation, aln.wrap_text, aln.shrink_to_fit, aln.indent, ]): props['alignment'] = aln return props def _compose_styles(self, *names) -> Optional[str]: """Compose multiple named styles into one, with later names taking precedence. Results are cached by style-name tuple so composition happens at most once per unique combination per workbook. Returns None when no names are given. """ valid = tuple(n for n in names if n) if not valid: return None if len(valid) == 1: return valid[0] if valid in self._style_cache: return self._style_cache[valid] merged: dict = {} for name in valid: merged.update(self._decompose_style(name)) if not merged: result = valid[-1] self._style_cache[valid] = result return result new_name = '_c_' + hashlib.md5(str(valid).encode()).hexdigest()[:10] if new_name not in self._named_style_objects: style = NamedStyle(name=new_name) if 'number_format' in merged: style.number_format = merged['number_format'] if 'font' in merged: style.font = merged['font'] if 'fill' in merged: style.fill = merged['fill'] if 'alignment' in merged: style.alignment = merged['alignment'] self._add_named_style(style) self._style_cache[valid] = new_name return new_name def _apply_cell_overrides( self, cell, record, col_name: str, col_idx: int, row_idx: int, style_names: list, ) -> None: """Hook called per cell after value and base styles are determined. Subclasses can set cell properties (e.g. hyperlinks) and append style names to ``style_names`` to include them in the composed result. """ def _post_row(self, record, row_idx: int) -> None: """Hook called after all cells in a row have been written.""" def _write_rows( self, worksheet: 'Worksheet', data_start_row: int, col_fmt: list, col_conditional_styles: list, rows_fmt, header_widths: list, data_widths: list, ) -> int: """Write data rows, applying the style cascade and calling subclass hooks. Style cascade per cell (lowest → highest priority): date/datetime base → column ``style`` → ``'*'`` style → ``'data'`` style → ``'odd'``/``'even'`` style → ``conditional_style`` callable(s) → column ``conditional_style`` callable(s) → ``_apply_cell_overrides`` hook. All active styles are composed once per unique combination via ``_compose_styles`` and cached for reuse. Returns the number of rows written. """ self._col_index_map = {name: idx + 1 for idx, name in enumerate(self.columns)} rows_fmt_d = rows_fmt if isinstance(rows_fmt, dict) else {} # Extract row-level formatting config once all_props = rows_fmt_d.get('*', {}) data_props = rows_fmt_d.get('data', {}) all_row_style = all_props.get('style') all_height = all_props.get('height') data_row_style = data_props.get('style') data_height = data_props.get('height') odd_style = data_props.get('odd', {}).get('style') even_style = data_props.get('even', {}).get('style') conditional_styles = data_props.get('conditional_style') if conditional_styles is not None and not isinstance(conditional_styles, list): conditional_styles = [conditional_styles] width_sample_size = 15 row_count = 0 for row_idx, record in enumerate(self.data_iterator, data_start_row): values = self._row_to_tuple(record) data_row_num = row_idx - data_start_row + 1 # Row height: data.height overrides *.height row_h = data_height if data_height is not None else all_height if row_h is not None: worksheet.row_dimensions[row_idx].height = row_h # Per-row styles evaluated once, reused for every cell in this row alt = odd_style if data_row_num % 2 else even_style fn_results = [fn(record) for fn in (conditional_styles or [])] for col_idx, value in enumerate(values, 1): col_name = self.columns[col_idx - 1] cell = worksheet.cell(row=row_idx, column=col_idx) if isinstance(value, (datetime, date)) and value.year < 1900: # Excel doesn't handle dates < 1900-01-01, convert to string cell.value = self.to_string(value) base_style = None elif isinstance(value, datetime) and value.time() != MIDNIGHT: cell.value = value if row_count < width_sample_size: data_widths[col_idx - 1] = max(data_widths[col_idx - 1], 19) base_style = 'datetime_style' elif isinstance(value, (date, datetime)): cell.value = value if row_count < width_sample_size: data_widths[col_idx - 1] = max(data_widths[col_idx - 1], 10) base_style = 'date_style' elif value is None: cell.value = '' base_style = None elif hasattr(value, 'read'): # handle LOBs cell.value = value.read() else: cell.value = value if row_count < width_sample_size: data_widths[col_idx - 1] = max(data_widths[col_idx - 1], len(str(value))) base_style = None col_style = col_fmt[col_idx - 1].get('style') if col_fmt else None col_fn = col_conditional_styles[col_idx - 1] if col_conditional_styles else None cell_style = col_fn(record) if col_fn else None style_names = [ s for s in [base_style, col_style, all_row_style, data_row_style, alt, *fn_results, cell_style] if s ] self._apply_cell_overrides(cell, record, col_name, col_idx, row_idx, style_names) composed = self._compose_styles(*style_names) if composed: cell.style = composed self._post_row(record, row_idx) row_count += 1 return row_count def _write_to_worksheet( self, data: Iterable[RecordLike], worksheet: 'Worksheet', columns: Optional[List[str]] = None, write_headers: bool = True, headers: Optional[List[str]] = None, effective_fmt: Optional[dict] = None, col_rules: Optional[list] = None, ) -> int: """Write data to an already-selected worksheet. Returns number of rows written.""" effective_fmt = effective_fmt if effective_fmt is not None else self.formatting self.data_iterator, detected_columns = self._get_data_iterator(data, columns) self.columns = detected_columns if not self.columns: raise ValueError("Could not determine columns from data") col_fmt = self._build_col_fmt_map(self.columns, col_rules=col_rules) col_conditional_styles = [p.get('conditional_style') for p in col_fmt] if col_fmt else [] rows_fmt = effective_fmt.get('rows', {}) display_headers = headers if headers is not None else self._get_headers(data) if len(display_headers) != len(self.columns): raise ValueError( f"headers has {len(display_headers)} name(s) but data has {len(self.columns)} column(s)" ) header_widths = [len(h) for h in display_headers] data_widths = [0] * len(self.columns) header_font = Font(bold=True) has_groups = bool(col_fmt and any(p.get('group_label') for p in col_fmt)) header_row = 2 if has_groups else 1 should_write_headers = write_headers and worksheet.cell(header_row, 1).value is None data_start_row = (header_row + 1) if should_write_headers else worksheet.max_row + 1 if should_write_headers: for col_idx, column_name in enumerate(display_headers, 1): cell = worksheet.cell(row=header_row, column=col_idx, value=column_name) hfmt = col_fmt[col_idx - 1].get('header_style') if col_fmt else None if hfmt: cell.style = hfmt else: cell.font = header_font row_count = self._write_rows( worksheet, data_start_row, col_fmt, col_conditional_styles, rows_fmt, header_widths, data_widths, ) if should_write_headers: self._finalize_headers(worksheet, header_widths, data_widths, col_fmt, rows_fmt, link_mapping=self._link_mapping, effective_fmt=effective_fmt) return row_count
[docs] def write_batch( self, data: Iterable[RecordLike], sheet_name: Optional[str] = None, headers: Optional[List[str]] = None, formatting: Optional[Dict] = None, ) -> None: """ Write a batch of data to a sheet. If this is the first write to this sheet in the current session, the sheet is cleared first. Subsequent writes to the same sheet append data. Parameters ---------- data : Iterable[RecordLike] The data batch sheet_name : str, optional Target sheet. If None, uses active_sheet or defaults to 'Data' headers : list of str, optional Display names for the header row. Overrides the writer-level ``headers`` set at initialisation for this batch only. Must match the column count. formatting : ExcelFormat or dict, optional Per-call formatting override. ``None`` (default) uses the writer-level formatting. Pass ``{}`` to write this sheet with no formatting. """ if self.workbook is None: raise RuntimeError("Workbook not initialized") if isinstance(formatting, ExcelFormat): formatting = formatting.to_dict() effective_fmt = self.formatting if formatting is None else formatting call_col_rules = [ (k.lower(), k, v.to_dict() if isinstance(v, ColumnRule) else v) for k, v in effective_fmt.get('columns', {}).items() ] self._register_call_styles(effective_fmt) target_sheet = sheet_name or self.active_sheet or 'Data' if sheet_name: self.active_sheet = sheet_name worksheet = self._get_or_create_worksheet(target_sheet) # Clear sheet if this is the first write to it in this session if target_sheet not in self._sheets_written_this_session: self._clear_worksheet(worksheet) self._sheets_written_this_session.add(target_sheet) tab_color = effective_fmt.get('tab_color') if tab_color: worksheet.sheet_properties.tabColor = tab_color.lstrip('#') row_count = self._write_to_worksheet( data=data, worksheet=worksheet, write_headers=self.write_headers, headers=headers, effective_fmt=effective_fmt, col_rules=call_col_rules, ) self._row_num += row_count logger.info(f"Wrote {row_count} rows to sheet '{target_sheet}' (total: {self._row_num})")
def _write_data(self, file_obj: Any) -> None: """BatchWriter contract: write data_iterator to the active sheet. Called by write() when data was provided at initialisation. """ if self.data_iterator is None: raise RuntimeError("No data provided") if not self.columns: raise RuntimeError("Columns not initialized") if self.workbook is None: raise RuntimeError("Workbook not initialized") col_fmt = self._build_col_fmt_map(self.columns) col_conditional_styles = [p.get('conditional_style') for p in col_fmt] if col_fmt else [] rows_fmt = self.formatting.get('rows', {}) target_sheet = self.active_sheet or 'Data' worksheet = self._get_or_create_worksheet(target_sheet) if target_sheet not in self._sheets_written_this_session: self._clear_worksheet(worksheet) self._sheets_written_this_session.add(target_sheet) has_groups = bool(col_fmt and any(p.get('group_label') for p in col_fmt)) header_row = 2 if has_groups else 1 should_write_headers = ( self.write_headers and not self._headers_written and worksheet.cell(header_row, 1).value is None ) data_start_row = (header_row + 1) if should_write_headers else worksheet.max_row + 1 display_headers = self._get_headers() if len(display_headers) != len(self.columns): raise ValueError( f"headers has {len(display_headers)} name(s) but data has {len(self.columns)} column(s)" ) header_widths = [len(h) for h in display_headers] data_widths = [0] * len(self.columns) if should_write_headers: header_font = Font(bold=True) for col_idx, column_name in enumerate(display_headers, 1): cell = worksheet.cell(row=header_row, column=col_idx, value=column_name) hfmt = col_fmt[col_idx - 1].get('header_style') if col_fmt else None if hfmt: cell.style = hfmt else: cell.font = header_font self._headers_written = True row_count = self._write_rows( worksheet, data_start_row, col_fmt, col_conditional_styles, rows_fmt, header_widths, data_widths, ) if should_write_headers: self._finalize_headers(worksheet, header_widths, data_widths, col_fmt, rows_fmt, link_mapping=self._link_mapping) self._row_num += row_count logger.info(f"Wrote {row_count} rows to sheet '{target_sheet}' (total: {self._row_num})") def _save_workbook(self): """Save the workbook. Idempotent - safe to call multiple times.""" if self.workbook is not None: try: self.workbook.save(self.output_path) logger.info(f"Saved workbook: {self.output_path}") except Exception as e: logger.error(f"Failed to save workbook: {e}") raise finally: self.workbook = None # Mark as saved to prevent duplicate saves
[docs] def close(self): """Close the writer and save the workbook.""" self._save_workbook()
def __exit__(self, exc_type, exc_val, exc_tb): """Save workbook on context exit.""" try: self.close() finally: if self.output_path.exists() and self.output_path.stat().st_size == 0: # remove invalid empty Excel file self.output_path.unlink() return False
[docs] def to_excel( data, file: Union[str, Path], sheet: str = 'Data', headers: Optional[List[str]] = None, write_headers: bool = True, ) -> None: """ Legacy convenience function — writes a single sheet. Parameters ---------- data : Iterable[RecordLike] Data to write (cursor, list of Records, etc.) file : str or Path Output Excel file (.xlsx) sheet : str, default 'Data' Sheet name to write to headers : List[str], optional Header row text. If None, uses cursor.description or detected column names write_headers : bool, default True Whether to write column headers Examples -------- # Write cursor with original database column names to_excel(cursor, 'report.xlsx') # Override header names to_excel(cursor, 'report.xlsx', headers=['User ID', 'Full Name', 'Email']) For multi-sheet or advanced reports, use ExcelWriter as a context manager with write_batch(). """ with ExcelWriter(data=None, file=file, headers=headers, write_headers=write_headers) as writer: writer.write_batch(data=data, sheet_name=sheet)
[docs] class LinkSource: """ Defines a linkable data source for creating internal and external hyperlinks in Excel. LinkSource enables rich hyperlinking between worksheets and to external URLs. Each LinkSource is registered once with LinkedExcelWriter and can be used across multiple sheets to create consistent, navigable Excel reports. As the source sheet is written, LinkSource caches each record's cell reference and generates formatted link text and URLs using Python's str.format_map(). Later sheets can reference these cached records to create hyperlinks. Link Types ---------- * **Internal links** - Excel cell references like ``#Students!A5`` for navigation * **External links** - HTTP/HTTPS URLs like ``https://crm.example.com/contact/123`` * **Hybrid mode** - Stores both internal and external, defaults to external if available Parameters ---------- name : str Unique identifier for this link source. Used when registering links in write_batch() (e.g., ``links={"student_name": "student"}``). source_sheet : str, optional The worksheet name that serves as the authoritative source for this entity. Internal links will point to rows in this sheet. This sheet must be written before any sheets that reference it. Not required when external_only=True. key_column : str, optional The column name containing unique identifiers for records (e.g., "student_id"). This column must exist in both the source sheet data and any sheets that create links to it. Not required when external_only=True. url_template : str, optional Python format string for generating external URLs. Uses str.format_map() with the full record dict as context. Example: ``"https://app.com/users/{user_id}"`` text_template : str, optional Python format string for generating link display text. Uses str.format_map() with the full record dict. Example: ``"{last_name}, {first_name} ({dept})"`` If not provided, uses the column value. missing_text : str, optional Fallback text to display when a link target cannot be resolved. If None, displays the raw value from the detail row. external_only : bool, default False If True, this LinkSource generates external links directly from current row data without caching. Can be reused across multiple sheets. source_sheet and key_column are not required. If False (default), caches records for cross-sheet linking. Attributes ---------- _records : dict Internal cache mapping key values to link metadata (ref, display_text, url). Populated automatically as the source sheet is written. Examples -------- **Internal links only (sheet navigation)**:: student_link = LinkSource( name="student", source_sheet="Students", key_column="student_id" ) **External links with custom text**:: employee_link = LinkSource( name="employee", source_sheet="Employees", key_column="employee_id", url_template="https://hr.company.com/profile/{employee_id}", text_template="{last_name}, {first_name} ({department})" ) **Hybrid with missing value handling**:: customer_link = LinkSource( name="customer", source_sheet="Customers", key_column="customer_id", url_template="https://crm.company.com/customers/{crm_id}", text_template="{company_name} - {contact_name}", missing_text="[Unknown Customer]" ) **External-only links (reusable across sheets)**:: # For external links - reusable on any sheet with required columns imdb_link = LinkSource( name="imdb", url_template="https://imdb.com/title/{tconst}", text_template="{primary_title} ({start_year})", external_only=True # No source_sheet needed - works on any sheet ) # Use on multiple sheets with same columns: writer.write_batch(movies, "Movies", links={"primary_title": "imdb"}) writer.write_batch(top_rated, "Top Rated", links={"primary_title": "imdb"}) Notes ----- * The source sheet MUST be written before sheets that reference it * All template fields must exist in the record data or KeyError will be logged * Key values are converted to strings for cache lookups * Templates use Python's str.format_map() - use double braces {{}} to escape See Also -------- LinkedExcelWriter : Writer that uses LinkSource for hyperlinking LinkedExcelWriter.register_link_source : Method to register a LinkSource """
[docs] def __init__(self, name: str, source_sheet: str = None, key_column: str = None, url_template: str = None, text_template: str = None, missing_text: str = None, external_only: bool = False): self.name = name self.source_sheet = source_sheet self.key_column = key_column self.url_template = url_template self.text_template = text_template self.missing_text = missing_text self.external_only = external_only self._records = {} # Validation if external_only: if not url_template: raise ValueError(f"url_template is required when external_only=True for LinkSource '{name}'") else: if not source_sheet: raise ValueError(f"source_sheet is required when external_only=False for LinkSource '{name}'") if not key_column: raise ValueError(f"key_column is required when external_only=False for LinkSource '{name}'") # Track display width for column sizing (sample first 100 rows, cap at 50 chars) self._max_display_width: int = 0 self._sample_count: int = 0
[docs] def cache_record(self, key_value: Any, row_dict: Dict[str, Any], ref: str) -> None: """Cache a record for cross-sheet linking (unless external_only=True).""" # Skip caching if this LinkSource is external-only (self-linking only) if self.external_only: # Still track display width for column sizing if self._sample_count < 100: if self.text_template: try: display_text = self.text_template.format_map(row_dict) except KeyError: display_text = str(key_value) else: display_text = str(key_value) display_len = min(len(display_text), 50) self._max_display_width = max(self._max_display_width, display_len) self._sample_count += 1 return key_str = str(key_value) if self.text_template: try: display_text = self.text_template.format_map(row_dict) except KeyError as e: logger.warning(f"Missing key {e} in text_template for {self.name}") display_text = f"{key_value} ({self.missing_text})" else: # No template → use the raw key value (clean, expected) display_text = str(key_value) # Track max display width for first 100 records (capped at 50 chars) if self._sample_count < 100: display_len = min(len(display_text), 50) self._max_display_width = max(self._max_display_width, display_len) self._sample_count += 1 record = { "ref": ref, "display_text": display_text, } if self.url_template: try: record["url"] = self.url_template.format_map(row_dict) except KeyError as e: logger.warning(f"Missing key {e} in url_template for {self.name}") if key_str not in self._records: self._records[key_str] = record
@property def max_display_width(self) -> int: """ Maximum display text width observed across sampled links. Samples first 100 records, capped at 50 characters to handle outliers. Used for automatic column width sizing. """ return self._max_display_width
[docs] class LinkedExcelWriter(ExcelWriter): """ Advanced Excel writer with internal and external hyperlink management. LinkedExcelWriter extends ExcelWriter to enable rich, bidirectional hyperlinking within Excel workbooks and to external systems. It automatically caches source records as they're written and creates formatted hyperlinks in detail sheets that reference those sources. This is particularly powerful for creating navigable multi-sheet reports with master-detail relationships, drill-through capabilities, and integration with external CRM, ticketing, or web applications. Key Features ------------ * **Internal navigation** - Links between worksheets (e.g., ``#Students!B5``) * **External integration** - Deep links to web applications * **Hybrid linking** - Store both internal and external, choose which to display * **Template-based formatting** - Use Python format strings for link text and URLs * **Automatic caching** - Source records cached as written, no manual tracking * **Mode control** - Force internal or external links per column via ``source:internal`` Workflow -------- 1. Create LinkSource definitions for each linkable entity 2. Register them with LinkedExcelWriter 3. Write source sheets first (e.g., Students, Products) 4. Write detail sheets with link specifications (e.g., Enrollments, Orders) 5. Links are resolved from cache and applied automatically Parameters ---------- file : str or Path Output Excel file (.xlsx) data : Iterable[RecordLike], optional Initial data to write. If None, use write_batch() for streaming mode. sheet_name : str, optional Default sheet name for write_batch() calls write_headers : bool, default True Whether to write column headers Examples -------- **Basic internal linking between sheets**:: with LinkedExcelWriter(file='school_report.xlsx') as writer: # Define linkable entity student_link = LinkSource( name="student", source_sheet="Students", key_column="student_id" ) writer.register_link_source(student_link) # Write source sheet writer.write_batch(students_data, sheet_name="Students") # Write detail sheet with internal links writer.write_batch( enrollments_data, sheet_name="Enrollments", links={"student_name": "student:internal"} ) **External links to CRM system**:: with LinkedExcelWriter(file='sales_report.xlsx') as writer: customer_link = LinkSource( name="customer", source_sheet="Customers", key_column="customer_id", url_template="https://crm.company.com/customers/{crm_id}", text_template="{company_name} ({customer_id})" ) writer.register_link_source(customer_link) writer.write_batch(customers_data, sheet_name="Customers") writer.write_batch( orders_data, sheet_name="Orders", links={"customer": "customer"} # Uses external URL ) **Hybrid mode with internal and external links**:: with LinkedExcelWriter(file='support_tickets.xlsx') as writer: ticket_link = LinkSource( name="ticket", source_sheet="Tickets", key_column="ticket_id", url_template="https://support.company.com/ticket/{ticket_id}", text_template="#{ticket_id} - {subject}" ) writer.register_link_source(ticket_link) writer.write_batch(tickets_data, sheet_name="Tickets") writer.write_batch( comments_data, sheet_name="Comments", links={ "ticket_link": "ticket", # External to support system "ticket_ref": "ticket:internal" # Internal sheet navigation } ) **Multiple link sources in one sheet**:: with LinkedExcelWriter(file='class_roster.xlsx') as writer: student_link = LinkSource( name="student", source_sheet="Students", key_column="student_id", text_template="{last_name}, {first_name}" ) course_link = LinkSource( name="course", source_sheet="Courses", key_column="course_id", text_template="{course_code} - {title}" ) writer.register_link_source(student_link) writer.register_link_source(course_link) writer.write_batch(students_data, sheet_name="Students") writer.write_batch(courses_data, sheet_name="Courses") writer.write_batch( enrollments_data, sheet_name="Enrollments", links={ "student_name": "student:internal", "course_name": "course:internal" } ) Notes ----- * Source sheets MUST be written before detail sheets that reference them * The key_column must exist in both source and detail datasets * Missing links display missing_text if set, otherwise show raw value * Link mode syntax: ``"source_name"`` (external) or ``"source_name:internal"`` * Templates use str.format_map() - all fields must exist in record data * Hyperlink styling (blue, underlined) is applied automatically See Also -------- LinkSource : Link definition class ExcelWriter : Base writer without linking capabilities """
[docs] def __init__( self, data: Optional[Iterable[RecordLike]] = None, file: Optional[Union[str, Path]] = None, sheet_name: Optional[str] = None, headers: Optional[List[str]] = None, write_headers: bool = True, formatting: Optional[Dict] = None, ): super().__init__(data=data, file=file, sheet_name=sheet_name, headers=headers, write_headers=write_headers, formatting=formatting) #: Registered LinkSource instances, keyed by name. self.link_sources: Dict[str, LinkSource] = {}
[docs] def write_batch( self, data: Iterable[RecordLike], sheet_name: Optional[str] = None, headers: Optional[List[str]] = None, links: Optional[Dict[str, str]] = None, formatting: Optional[Dict] = None, ) -> None: """ Write a batch with optional hyperlinking. If this is the first write to this sheet in the current session, the sheet is cleared first. Subsequent writes to the same sheet append data. headers: display names for this batch, overriding the writer-level default links: dict column_name → "source_name" or "source_name:internal" formatting: per-call formatting override; None uses writer-level formatting, {} writes with no formatting """ if isinstance(formatting, ExcelFormat): formatting = formatting.to_dict() effective_fmt = self.formatting if formatting is None else formatting call_col_rules = [ (k.lower(), k, v.to_dict() if isinstance(v, ColumnRule) else v) for k, v in effective_fmt.get('columns', {}).items() ] self._register_call_styles(effective_fmt) target_sheet = sheet_name or self.active_sheet or 'Data' if sheet_name: self.active_sheet = target_sheet worksheet = self._get_or_create_worksheet(target_sheet) # Clear sheet if this is the first write to it in this session if target_sheet not in self._sheets_written_this_session: self._clear_worksheet(worksheet) self._sheets_written_this_session.add(target_sheet) tab_color = effective_fmt.get('tab_color') if tab_color: worksheet.sheet_properties.tabColor = tab_color.lstrip('#') # Parse links dict into resolved mapping: column → (source, mode) link_mapping = {} if links: for col, spec in links.items(): if ':' in spec: source_name, mode_str = spec.split(':', 1) mode = mode_str.lower() if mode not in {'internal', 'external'}: raise ValueError(f"Invalid mode '{mode_str}' in link spec '{spec}'") else: source_name = spec mode = "external" if source_name not in self.link_sources: raise ValueError(f"Unknown LinkSource '{source_name}'") link_mapping[col] = (self.link_sources[source_name], mode) # Determine if this sheet is a source sheet for any registered LinkSource # Skip external_only sources since they don't cache source_for_this_sheet = [ src for src in self.link_sources.values() if not src.external_only and src.source_sheet == target_sheet ] row_count = self._write_to_worksheet( data=data, worksheet=worksheet, write_headers=self.write_headers, headers=headers, link_mapping=link_mapping, source_for_this_sheet=source_for_this_sheet, target_sheet=target_sheet, effective_fmt=effective_fmt, col_rules=call_col_rules, ) self._row_num += row_count logger.info(f"Wrote {row_count} rows to sheet '{target_sheet}' with linking")
def _write_to_worksheet( self, data: Iterable[RecordLike], worksheet: 'Worksheet', columns: Optional[List[str]] = None, write_headers: bool = True, headers: Optional[List[str]] = None, link_mapping: Optional[Dict[str, tuple]] = None, source_for_this_sheet: Optional[list] = None, target_sheet: Optional[str] = None, effective_fmt: Optional[dict] = None, col_rules: Optional[list] = None, ) -> int: self._link_mapping = link_mapping or {} self._source_for_this_sheet = source_for_this_sheet or [] self._target_sheet = target_sheet return super()._write_to_worksheet(data, worksheet, columns, write_headers, headers, effective_fmt=effective_fmt, col_rules=col_rules) def _apply_cell_overrides(self, cell, record, col_name, col_idx, row_idx, style_names): link_spec = self._link_mapping.get(col_name) if not link_spec: return source, mode = link_spec if source.external_only: link_info = source.generate_link_from_row( record, ref="", mode="external", column_value=cell.value ) elif source.source_sheet == self._target_sheet: key_col_letter = get_column_letter(self._col_index_map[source.key_column]) ref = f"#{self._target_sheet}!{key_col_letter}{row_idx}" link_info = source.generate_link_from_row(record, ref, mode=mode) else: key_value = cell.value link_info = source.get_link(key_value, mode=mode) if key_value is not None else None if link_info: cell.hyperlink = link_info["target"] cell.value = link_info["display_text"] style_names.append('hyperlink_style') elif source.missing_text is not None: cell.value = source.missing_text def _post_row(self, record, row_idx: int) -> None: for source in self._source_for_this_sheet: key_col_letter = get_column_letter(self._col_index_map[source.key_column]) ref = f"#{self._target_sheet}!{key_col_letter}{row_idx}" key_value = record.get(source.key_column) if key_value is not None: source.cache_record(key_value, record, ref)
[docs] def check_dependencies(): if not HAS_OPENPYXL: logger.error('Openpyxl is not available. Excel files not supported.')
check_dependencies()