# dbtk/writers/excel.py
"""
Excel writer for database results using openpyxl.
"""
import logging
from typing import Any, Callable, Union, List, Optional, Iterable, Dict, TYPE_CHECKING
from pathlib import Path
from datetime import datetime, date, time
from zipfile import BadZipFile
import fnmatch
import hashlib
from .base import BatchWriter, RecordLike
if TYPE_CHECKING:
from openpyxl.worksheet.worksheet import Worksheet
try:
from openpyxl import Workbook, load_workbook
from openpyxl.styles import Font, NamedStyle, Alignment
from openpyxl.comments import Comment
from openpyxl.utils.exceptions import InvalidFileException
from openpyxl.utils import get_column_letter
HAS_OPENPYXL = True
except ImportError:
HAS_OPENPYXL = False
InvalidFileException = Exception # fallback so except clause is valid
logger = logging.getLogger(__name__)
MIDNIGHT = time(0, 0)
_BUILTIN_STYLE_NAMES = frozenset({
'date_style', 'datetime_style', 'hyperlink_style',
'bold_style', 'header_vert_style',
'currency_style', 'percent_style', 'comma_style',
})
[docs]
class ColumnRule:
"""Per-column formatting rule for use with :class:`ExcelFormat`.
Interchangeable with a plain ``dict`` — ``ExcelWriter`` accepts either.
The order of rules in ``ExcelFormat.columns`` is significant: later rules
override earlier ones on a per-property basis.
Parameters
----------
style : str or dict, optional
Static style applied to every data cell in the column. Either a
registered style name (str) or an inline property dict
(``bg_color``, ``font``, ``number_format``, ``alignment``).
header_style : str or dict, optional
Static style applied to the header cell only (owns the cell entirely —
include ``font={'bold': True}`` if you still want bold).
width : float, optional
Explicit column width in Excel units; bypasses auto-sizing.
hidden : bool, default False
Hide the column.
comment : str, optional
Comment/note added to the header cell.
filter : bool, default False
Show the auto-filter dropdown on this column only; hides dropdowns on
all other columns.
group_label : str, optional
Label for a group super-header spanning this column range.
Only valid on range patterns (``'col_a:col_z'``).
conditional_style : callable or list[callable], optional
``lambda record: style_name_or_None`` — evaluated per row; overrides
``style`` and row styles when non-``None``. Multiple callables
are composed in order.
"""
[docs]
def __init__(self, style=None, header_style=None, width=None, hidden=False,
comment=None, filter=False, group_label=None, conditional_style=None):
self.style = style
self.header_style = header_style
self.width = width
self.hidden = hidden
self.comment = comment
self.filter = filter
self.group_label = group_label
self.conditional_style = conditional_style
[docs]
def to_dict(self):
d: dict = {}
if self.style is not None:
d['style'] = self.style
if self.header_style is not None:
d['header_style'] = self.header_style
if self.width is not None:
d['width'] = self.width
if self.hidden:
d['hidden'] = self.hidden
if self.comment is not None:
d['comment'] = self.comment
if self.filter:
d['filter'] = self.filter
if self.group_label is not None:
d['group_label'] = self.group_label
if self.conditional_style is not None:
d['conditional_style'] = self.conditional_style
return d
[docs]
class ExcelWriter(BatchWriter):
"""
Stateful Excel writer using openpyxl.
Keeps the workbook open across multiple write_batch() calls and saves only on context exit.
Designed for both single-sheet legacy use and multi-sheet reports.
Supports all 3 BatchWriter modes:
1. Complete write from __init__ + write()
2. Batch write (no data on init) + write_batch()
3. Hybrid: data on init + write() + write_batch()
Usage examples::
# Mode 1: Traditional single-shot write
ExcelWriter(cursor, 'report.xlsx').write()
# Mode 2: Pure streaming with write_batch()
with ExcelWriter(file='report.xlsx') as writer:
writer.write_batch(cursor) # goes to sheet 'Data'
# Mode 3: Hybrid - initial data + streaming
with ExcelWriter(first_batch, 'report.xlsx') as writer:
writer.write() # Write initial batch
writer.write_batch(second_batch) # Stream additional batches
# Multi-sheet report
with ExcelWriter(file='report.xlsx', sheet_name='Summary') as writer:
writer.write_batch(summary_data, sheet_name='Summary')
writer.write_batch(users_data, sheet_name='Users')
writer.write_batch(orders_data, sheet_name='Orders')
# Streaming / batch mode
with ExcelWriter(file='large.xlsx') as writer:
for batch in large_generator:
writer.write_batch(batch, sheet_name='Data') # appends to 'Data'
"""
accepts_file_handle = False
preserve_types = True
[docs]
def __init__(
self,
data: Optional[Iterable[RecordLike]] = None,
file: Optional[Union[str, Path]] = None,
sheet_name: Optional[str] = None,
headers: Optional[List[str]] = None,
write_headers: bool = True,
formatting: Optional[Union[Dict, ExcelFormat]] = None,
):
"""
Initialize the Excel writer.
Parameters
----------
data : Iterable[RecordLike], optional
Initial data to write. If None, use write_batch() for streaming mode.
file : str or Path, optional
Output Excel file (.xlsx). Required for Excel output.
sheet_name : str, optional
Default/active sheet name to use for write_batch() calls without
an explicit sheet_name.
headers : List[str], optional
Header row text. If None, checks data.description for original
column names, then falls back to detected column names.
write_headers : bool, default True
Whether to write column headers (only when the sheet is empty).
formatting : ExcelFormat or dict, optional
Worksheet formatting rules. Prefer :class:`ExcelFormat` for
IDE autocomplete; a plain ``dict`` is also accepted.
See :class:`ExcelFormat` for the full reference.
"""
if not HAS_OPENPYXL:
raise ImportError("ExcelWriter requires openpyxl: pip install openpyxl")
if file is None:
raise ValueError("ExcelWriter requires an output file path")
if isinstance(formatting, ExcelFormat):
formatting = formatting.to_dict()
super().__init__(data=data, file=file, headers=headers, write_headers=write_headers)
self.output_path = Path(file)
self.active_sheet: Optional[str] = sheet_name
self.workbook: Optional[Workbook] = None
self._sheets_written_this_session: set = set()
self.formatting: dict = formatting or {}
self._link_mapping: dict = {}
self._style_cache: dict = {}
self._named_style_objects: dict = {}
self._col_rules: list = [
(k.lower(), k, v.to_dict() if isinstance(v, ColumnRule) else v)
for k, v in self.formatting.get('columns', {}).items()
]
self._load_or_create_workbook()
def _load_or_create_workbook(self) -> None:
"""Load existing workbook or create a new one."""
try:
if self.output_path.exists():
with open(self.output_path, mode='r+b'):
# make sure it is actually writable (not open in Excel)
pass
self.workbook = load_workbook(self.output_path)
logger.info(f"Loaded existing workbook: {self.output_path}")
else:
# make sure path is writable
with open(self.output_path, mode='wb'):
pass
# clean up empty file that was just created so an exception doesn't leave us with an invalid Excel file
# self.output_path.unlink()
self.workbook = Workbook()
if 'Sheet' in self.workbook.sheetnames:
self.workbook.remove(self.workbook['Sheet'])
except (InvalidFileException, BadZipFile, ValueError) as e:
raise ValueError(
f"File '{self.output_path}' exists but is not a valid Excel workbook. "
f"Original error: {e}"
) from e
except PermissionError:
raise PermissionError(
f"Cannot write to '{self.output_path}' - file may be open in Excel or another application. "
"Please close the file and try again."
)
self._register_styles()
def _add_named_style(self, style: 'NamedStyle') -> None:
"""Register a NamedStyle if not already in the workbook, and cache the object."""
if style.name not in self.workbook.named_styles:
self.workbook.add_named_style(style)
self._named_style_objects[style.name] = style
def _register_styles(self) -> None:
"""Register styles for this workbook.
User-defined styles in ``formatting['styles']`` are registered first so
they take precedence over the built-ins. Built-in styles that already
exist (e.g. because the user redefined ``comma_style``) are silently
skipped by :meth:`_add_named_style`.
Built-in styles
---------------
date_style
Date number format: ``YYYY-MM-DD``.
datetime_style
Datetime number format: ``YYYY-MM-DD HH:MM:SS``.
hyperlink_style
Blue underlined font (used automatically by LinkedExcelWriter).
bold_style
Bold font. Useful as a ``header_style`` when you only want emphasis.
header_vert_style
Bold font + 90° text rotation. Pair with
``rows={'header': {'height': 120}}`` for narrow rotated headers.
currency_style
Number format: ``#,##0.00``.
percent_style
Number format: ``0.00%``.
comma_style
Number format: ``#,##0``.
"""
if self.workbook is None:
return
# User styles first — allows overriding built-ins
for style_name, props in self.formatting.get('styles', {}).items():
self._add_named_style(self._build_named_style(style_name, props))
# Built-ins — skipped silently if user already registered the name
from openpyxl.styles import Alignment as _Alignment
_bold_font = Font(bold=True)
for style in [
NamedStyle(name='date_style', number_format='YYYY-MM-DD'),
NamedStyle(name='datetime_style', number_format='YYYY-MM-DD HH:MM:SS'),
NamedStyle(name='hyperlink_style', font=Font(color="0000FF", underline="single")),
NamedStyle(name='bold_style', font=_bold_font),
NamedStyle(name='header_vert_style', font=_bold_font,
alignment=_Alignment(text_rotation=90, horizontal='center')),
NamedStyle(name='currency_style', number_format='#,##0.00'),
NamedStyle(name='percent_style', number_format='0.00%'),
NamedStyle(name='comma_style', number_format='#,##0'),
]:
self._add_named_style(style)
@staticmethod
def _build_named_style(name: str, props: dict) -> 'NamedStyle':
"""Build a NamedStyle from a properties dict."""
from openpyxl.styles import PatternFill, Alignment
style = NamedStyle(name=name)
if 'bg_color' in props:
color = props['bg_color'].lstrip('#')
style.fill = PatternFill(fill_type='solid', fgColor=color)
if 'font' in props:
style.font = Font(**props['font'])
if 'number_format' in props:
style.number_format = props['number_format']
if 'alignment' in props:
style.alignment = Alignment(**props['alignment'])
return style
def _ensure_style(self, props: dict) -> str:
"""Register an inline format dict as a NamedStyle; return its name."""
key = tuple(sorted((k, str(v)) for k, v in props.items()))
name = 'fmt_' + hashlib.md5(str(key).encode()).hexdigest()[:8]
if name not in self._named_style_objects:
self._add_named_style(self._build_named_style(name, props))
return name
def _register_call_styles(self, fmt: dict) -> None:
"""Register any user-defined styles from a per-call formatting dict."""
for name, props in fmt.get('styles', {}).items():
self._add_named_style(self._build_named_style(name, props))
def _build_col_fmt_map(self, columns: List[str], col_rules: list = None) -> list:
"""Build per-column formatting list from wildcard pattern rules.
Returns a list (indexed by 0-based column position) of property dicts.
Patterns are applied in definition order; later rules win per property.
Three pattern forms are supported (all case-insensitive):
- fnmatch glob: ``'*_fee*'``, ``'resv_*'``
- range: ``'start:end'``, ``':end'`` (from first), ``'start:'`` (to last)
A pattern is only treated as a range if it contains ``:`` with no wildcard
characters and does not itself match a column name literally.
- literal: any exact column name (handled by fnmatch as a degenerate glob)
"""
if col_rules is None:
col_rules = self._col_rules
if not col_rules:
return []
result: List[dict] = [{} for _ in columns]
cols_lower = [c.lower() for c in columns]
def _apply(dest: dict, props: dict) -> None:
"""Merge props into dest, composing overlapping 'style' values."""
for k, v in props.items():
if k == 'style' and 'style' in dest:
existing = dest[k]
dest[k] = existing if isinstance(existing, list) else [existing]
dest[k].append(v)
else:
dest[k] = v
for pattern_lower, pattern, props in col_rules:
is_range = ':' in pattern_lower and not any(c in pattern_lower for c in '*?[') and pattern_lower not in cols_lower
if 'group_label' in props and not is_range:
logger.warning(
f"'group_label' is only supported on range patterns (e.g. 'col_a:col_b'); "
f"ignoring for pattern '{pattern}'"
)
props = {k: v for k, v in props.items() if k != 'group_label'}
if is_range:
# Range syntax: 'start:end', ':end', 'start:'
start_str, end_str = pattern_lower.split(':', 1)
start_str, end_str = start_str.strip(), end_str.strip()
if start_str:
if start_str not in cols_lower:
raise ValueError(f"Range start column '{start_str}' not found in result set")
start_idx = cols_lower.index(start_str)
else:
start_idx = 0
if end_str:
if end_str not in cols_lower:
raise ValueError(f"Range end column '{end_str}' not found in result set")
end_idx = cols_lower.index(end_str)
else:
end_idx = len(columns) - 1
if start_idx > end_idx:
raise ValueError(
f"Range '{pattern}': '{start_str}' (col {start_idx + 1}) "
f"comes after '{end_str}' (col {end_idx + 1})"
)
for i in range(start_idx, end_idx + 1):
_apply(result[i], props)
else:
# fnmatch glob (also handles literal column names)
for col_idx, col_name_lower in enumerate(cols_lower):
if fnmatch.fnmatch(col_name_lower, pattern_lower):
_apply(result[col_idx], props)
for col_props in result:
fmt = col_props.get('style')
if isinstance(fmt, list):
resolved = [self._ensure_style(f) if isinstance(f, dict) else f for f in fmt]
col_props['style'] = self._compose_styles(*resolved)
elif isinstance(fmt, dict):
col_props['style'] = self._ensure_style(fmt)
hfmt = col_props.get('header_style')
if isinstance(hfmt, dict):
col_props['header_style'] = self._ensure_style(hfmt)
return result
def _finalize_headers(
self,
worksheet: 'Worksheet',
header_widths: list,
data_widths: list,
col_fmt: list,
rows_fmt: dict,
link_mapping: Optional[dict] = None,
effective_fmt: Optional[dict] = None,
) -> None:
"""Apply column widths, auto-rotate, header height, and freeze panes.
Called once after data has been sampled, so auto-rotate decisions have
accurate data-width information. ``link_mapping`` is LinkedExcelWriter's
column → (LinkSource, mode) map; LinkSource display widths substitute for
sampled data widths on linked columns.
"""
link_mapping = link_mapping or {}
effective_fmt = effective_fmt if effective_fmt is not None else self.formatting
# Determine whether group headers are present
has_groups = bool(col_fmt and any(p.get('group_label') for p in col_fmt))
header_row = 2 if has_groups else 1
# Substitute LinkSource display widths for linked columns
effective_data_widths = list(data_widths)
for col_idx, col_name in enumerate(self.columns, 1):
if col_name in link_mapping:
source, _ = link_mapping[col_name]
if source.max_display_width > 0:
effective_data_widths[col_idx - 1] = source.max_display_width
# Auto-rotate: detect columns whose header is significantly longer than their data
auto_rotated: set = set()
har_height_factor = 6.5
har = effective_fmt.get('header_auto_rotate')
if har:
if isinstance(har, dict):
har_min = har.get('min_length', 8)
har_ratio = har.get('ratio', 1.5)
har_height_factor = har.get('height_factor', 6.5)
else:
har_min = 8
har_ratio = float(har)
har_height_factor = 6.5
for col_idx, (hw, dw) in enumerate(zip(header_widths, effective_data_widths), 1):
col_props = col_fmt[col_idx - 1] if col_fmt else {}
if col_props.get('header_style'):
continue # explicit header_style takes precedence
if hw >= har_min and hw > dw * har_ratio:
auto_rotated.add(col_idx)
worksheet.cell(header_row, col_idx).style = 'header_vert_style'
# Column widths: auto-rotated columns use data width only; others use max of both
min_col_width = effective_fmt.get('min_column_width', 6)
max_col_width = effective_fmt.get('max_column_width', 60)
for col_idx, (hw, dw) in enumerate(zip(header_widths, effective_data_widths), 1):
raw = dw if col_idx in auto_rotated else max(hw, dw)
adjusted = min(max(raw + 2, min_col_width), max_col_width)
worksheet.column_dimensions[get_column_letter(col_idx)].width = adjusted
# User column-rule overrides (width, hidden, comment)
for col_idx, col_props in enumerate(col_fmt, 1):
if col_props:
col_letter = get_column_letter(col_idx)
if 'width' in col_props:
worksheet.column_dimensions[col_letter].width = col_props['width']
if 'hidden' in col_props:
worksheet.column_dimensions[col_letter].hidden = bool(col_props['hidden'])
if 'comment' in col_props:
worksheet.cell(header_row, col_idx).comment = Comment(col_props['comment'], '')
# Row heights — cascade: '*' (all rows) overridden by specific row type
rows_fmt_d = rows_fmt if isinstance(rows_fmt, dict) else {}
all_h = rows_fmt_d.get('*', {}).get('height')
# Header row height: '*' → 'header' → auto-rotate
header_h = rows_fmt_d.get('header', {}).get('height') or all_h
if header_h is not None:
worksheet.row_dimensions[header_row].height = header_h
elif auto_rotated:
max_rotated_len = max(header_widths[i - 1] for i in auto_rotated)
worksheet.row_dimensions[header_row].height = max_rotated_len * har_height_factor
# Group header row (row 1): merged cells spanning each labelled column range
if has_groups:
group_h = rows_fmt_d.get('group_header', {}).get('height') or all_h
if group_h is not None:
worksheet.row_dimensions[1].height = group_h
group_font = Font(bold=True)
group_align = Alignment(horizontal='center', vertical='center')
i = 0
while i < len(col_fmt):
group = col_fmt[i].get('group_label')
if group:
j = i
while j < len(col_fmt) and col_fmt[j].get('group_label') == group:
j += 1
start_col, end_col = i + 1, j # 1-based, inclusive
if start_col < end_col:
worksheet.merge_cells(
start_row=1, start_column=start_col,
end_row=1, end_column=end_col
)
cell = worksheet.cell(1, start_col)
cell.value = group
cell.font = group_font
cell.alignment = group_align
i = j
else:
i += 1
# Freeze panes
freeze = effective_fmt.get('freeze', 'A3' if has_groups else 'A2')
if freeze:
worksheet.freeze_panes = freeze
# Auto-filter on header row
filter_cols = {col_idx for col_idx, col_props in enumerate(col_fmt, 1) if col_props.get('filter')}
if filter_cols or effective_fmt.get('auto_filter'):
last_col = get_column_letter(len(self.columns))
filter_row = header_row
worksheet.auto_filter.ref = f"A{filter_row}:{last_col}{filter_row}"
if filter_cols:
from openpyxl.worksheet.filters import FilterColumn
for col_idx in range(1, len(self.columns) + 1):
if col_idx not in filter_cols:
worksheet.auto_filter.filterColumn.append(
FilterColumn(colId=col_idx - 1, hiddenButton=True)
)
def _get_or_create_worksheet(self, sheet_name: str) -> 'Worksheet':
"""Get existing worksheet or create new one."""
from openpyxl.worksheet.worksheet import Worksheet
if sheet_name in self.workbook.sheetnames:
return self.workbook[sheet_name]
else:
return self.workbook.create_sheet(sheet_name)
def _clear_worksheet(self, worksheet: 'Worksheet') -> None:
"""Clear all rows from a worksheet."""
# Simply delete all rows - openpyxl will handle this correctly
if worksheet.max_row >= 1:
worksheet.delete_rows(1, worksheet.max_row)
def _get_named_style(self, name: str) -> 'NamedStyle':
try:
return self._named_style_objects[name]
except KeyError:
raise KeyError(f"Named style '{name}' not found")
def _decompose_style(self, name: str) -> dict:
"""Extract non-default properties from a named style."""
style = self._get_named_style(name)
props = {}
if style.number_format and style.number_format != 'General':
props['number_format'] = style.number_format
fill = style.fill
if fill is not None and getattr(fill, 'fill_type', None) not in (None, 'none'):
props['fill'] = fill
font = style.font
if font is not None and any([
font.bold, font.italic, font.underline, font.color,
font.size, font.name, font.strike,
]):
props['font'] = font
aln = style.alignment
if aln is not None and any([
aln.horizontal, aln.vertical, aln.text_rotation,
aln.wrap_text, aln.shrink_to_fit, aln.indent,
]):
props['alignment'] = aln
return props
def _compose_styles(self, *names) -> Optional[str]:
"""Compose multiple named styles into one, with later names taking precedence.
Results are cached by style-name tuple so composition happens at most once
per unique combination per workbook. Returns None when no names are given.
"""
valid = tuple(n for n in names if n)
if not valid:
return None
if len(valid) == 1:
return valid[0]
if valid in self._style_cache:
return self._style_cache[valid]
merged: dict = {}
for name in valid:
merged.update(self._decompose_style(name))
if not merged:
result = valid[-1]
self._style_cache[valid] = result
return result
new_name = '_c_' + hashlib.md5(str(valid).encode()).hexdigest()[:10]
if new_name not in self._named_style_objects:
style = NamedStyle(name=new_name)
if 'number_format' in merged:
style.number_format = merged['number_format']
if 'font' in merged:
style.font = merged['font']
if 'fill' in merged:
style.fill = merged['fill']
if 'alignment' in merged:
style.alignment = merged['alignment']
self._add_named_style(style)
self._style_cache[valid] = new_name
return new_name
def _apply_cell_overrides(
self,
cell,
record,
col_name: str,
col_idx: int,
row_idx: int,
style_names: list,
) -> None:
"""Hook called per cell after value and base styles are determined.
Subclasses can set cell properties (e.g. hyperlinks) and append style names
to ``style_names`` to include them in the composed result.
"""
def _post_row(self, record, row_idx: int) -> None:
"""Hook called after all cells in a row have been written."""
def _write_rows(
self,
worksheet: 'Worksheet',
data_start_row: int,
col_fmt: list,
col_conditional_styles: list,
rows_fmt,
header_widths: list,
data_widths: list,
) -> int:
"""Write data rows, applying the style cascade and calling subclass hooks.
Style cascade per cell (lowest → highest priority):
date/datetime base → column ``style`` → ``'*'`` style →
``'data'`` style → ``'odd'``/``'even'`` style → ``conditional_style``
callable(s) → column ``conditional_style`` callable(s) →
``_apply_cell_overrides`` hook.
All active styles are composed once per unique combination via
``_compose_styles`` and cached for reuse.
Returns the number of rows written.
"""
self._col_index_map = {name: idx + 1 for idx, name in enumerate(self.columns)}
rows_fmt_d = rows_fmt if isinstance(rows_fmt, dict) else {}
# Extract row-level formatting config once
all_props = rows_fmt_d.get('*', {})
data_props = rows_fmt_d.get('data', {})
all_row_style = all_props.get('style')
all_height = all_props.get('height')
data_row_style = data_props.get('style')
data_height = data_props.get('height')
odd_style = data_props.get('odd', {}).get('style')
even_style = data_props.get('even', {}).get('style')
conditional_styles = data_props.get('conditional_style')
if conditional_styles is not None and not isinstance(conditional_styles, list):
conditional_styles = [conditional_styles]
width_sample_size = 15
row_count = 0
for row_idx, record in enumerate(self.data_iterator, data_start_row):
values = self._row_to_tuple(record)
data_row_num = row_idx - data_start_row + 1
# Row height: data.height overrides *.height
row_h = data_height if data_height is not None else all_height
if row_h is not None:
worksheet.row_dimensions[row_idx].height = row_h
# Per-row styles evaluated once, reused for every cell in this row
alt = odd_style if data_row_num % 2 else even_style
fn_results = [fn(record) for fn in (conditional_styles or [])]
for col_idx, value in enumerate(values, 1):
col_name = self.columns[col_idx - 1]
cell = worksheet.cell(row=row_idx, column=col_idx)
if isinstance(value, (datetime, date)) and value.year < 1900:
# Excel doesn't handle dates < 1900-01-01, convert to string
cell.value = self.to_string(value)
base_style = None
elif isinstance(value, datetime) and value.time() != MIDNIGHT:
cell.value = value
if row_count < width_sample_size:
data_widths[col_idx - 1] = max(data_widths[col_idx - 1], 19)
base_style = 'datetime_style'
elif isinstance(value, (date, datetime)):
cell.value = value
if row_count < width_sample_size:
data_widths[col_idx - 1] = max(data_widths[col_idx - 1], 10)
base_style = 'date_style'
elif value is None:
cell.value = ''
base_style = None
elif hasattr(value, 'read'):
# handle LOBs
cell.value = value.read()
else:
cell.value = value
if row_count < width_sample_size:
data_widths[col_idx - 1] = max(data_widths[col_idx - 1], len(str(value)))
base_style = None
col_style = col_fmt[col_idx - 1].get('style') if col_fmt else None
col_fn = col_conditional_styles[col_idx - 1] if col_conditional_styles else None
cell_style = col_fn(record) if col_fn else None
style_names = [
s for s in [base_style, col_style, all_row_style, data_row_style, alt, *fn_results, cell_style]
if s
]
self._apply_cell_overrides(cell, record, col_name, col_idx, row_idx, style_names)
composed = self._compose_styles(*style_names)
if composed:
cell.style = composed
self._post_row(record, row_idx)
row_count += 1
return row_count
def _write_to_worksheet(
self,
data: Iterable[RecordLike],
worksheet: 'Worksheet',
columns: Optional[List[str]] = None,
write_headers: bool = True,
headers: Optional[List[str]] = None,
effective_fmt: Optional[dict] = None,
col_rules: Optional[list] = None,
) -> int:
"""Write data to an already-selected worksheet. Returns number of rows written."""
effective_fmt = effective_fmt if effective_fmt is not None else self.formatting
self.data_iterator, detected_columns = self._get_data_iterator(data, columns)
self.columns = detected_columns
if not self.columns:
raise ValueError("Could not determine columns from data")
col_fmt = self._build_col_fmt_map(self.columns, col_rules=col_rules)
col_conditional_styles = [p.get('conditional_style') for p in col_fmt] if col_fmt else []
rows_fmt = effective_fmt.get('rows', {})
display_headers = headers if headers is not None else self._get_headers(data)
if len(display_headers) != len(self.columns):
raise ValueError(
f"headers has {len(display_headers)} name(s) but data has {len(self.columns)} column(s)"
)
header_widths = [len(h) for h in display_headers]
data_widths = [0] * len(self.columns)
header_font = Font(bold=True)
has_groups = bool(col_fmt and any(p.get('group_label') for p in col_fmt))
header_row = 2 if has_groups else 1
should_write_headers = write_headers and worksheet.cell(header_row, 1).value is None
data_start_row = (header_row + 1) if should_write_headers else worksheet.max_row + 1
if should_write_headers:
for col_idx, column_name in enumerate(display_headers, 1):
cell = worksheet.cell(row=header_row, column=col_idx, value=column_name)
hfmt = col_fmt[col_idx - 1].get('header_style') if col_fmt else None
if hfmt:
cell.style = hfmt
else:
cell.font = header_font
row_count = self._write_rows(
worksheet, data_start_row, col_fmt, col_conditional_styles, rows_fmt,
header_widths, data_widths,
)
if should_write_headers:
self._finalize_headers(worksheet, header_widths, data_widths, col_fmt, rows_fmt,
link_mapping=self._link_mapping, effective_fmt=effective_fmt)
return row_count
[docs]
def write_batch(
self,
data: Iterable[RecordLike],
sheet_name: Optional[str] = None,
headers: Optional[List[str]] = None,
formatting: Optional[Dict] = None,
) -> None:
"""
Write a batch of data to a sheet.
If this is the first write to this sheet in the current session, the sheet
is cleared first. Subsequent writes to the same sheet append data.
Parameters
----------
data : Iterable[RecordLike]
The data batch
sheet_name : str, optional
Target sheet. If None, uses active_sheet or defaults to 'Data'
headers : list of str, optional
Display names for the header row. Overrides the writer-level ``headers``
set at initialisation for this batch only. Must match the column count.
formatting : ExcelFormat or dict, optional
Per-call formatting override. ``None`` (default) uses the writer-level
formatting. Pass ``{}`` to write this sheet with no formatting.
"""
if self.workbook is None:
raise RuntimeError("Workbook not initialized")
if isinstance(formatting, ExcelFormat):
formatting = formatting.to_dict()
effective_fmt = self.formatting if formatting is None else formatting
call_col_rules = [
(k.lower(), k, v.to_dict() if isinstance(v, ColumnRule) else v)
for k, v in effective_fmt.get('columns', {}).items()
]
self._register_call_styles(effective_fmt)
target_sheet = sheet_name or self.active_sheet or 'Data'
if sheet_name:
self.active_sheet = sheet_name
worksheet = self._get_or_create_worksheet(target_sheet)
# Clear sheet if this is the first write to it in this session
if target_sheet not in self._sheets_written_this_session:
self._clear_worksheet(worksheet)
self._sheets_written_this_session.add(target_sheet)
tab_color = effective_fmt.get('tab_color')
if tab_color:
worksheet.sheet_properties.tabColor = tab_color.lstrip('#')
row_count = self._write_to_worksheet(
data=data,
worksheet=worksheet,
write_headers=self.write_headers,
headers=headers,
effective_fmt=effective_fmt,
col_rules=call_col_rules,
)
self._row_num += row_count
logger.info(f"Wrote {row_count} rows to sheet '{target_sheet}' (total: {self._row_num})")
def _write_data(self, file_obj: Any) -> None:
"""BatchWriter contract: write data_iterator to the active sheet.
Called by write() when data was provided at initialisation.
"""
if self.data_iterator is None:
raise RuntimeError("No data provided")
if not self.columns:
raise RuntimeError("Columns not initialized")
if self.workbook is None:
raise RuntimeError("Workbook not initialized")
col_fmt = self._build_col_fmt_map(self.columns)
col_conditional_styles = [p.get('conditional_style') for p in col_fmt] if col_fmt else []
rows_fmt = self.formatting.get('rows', {})
target_sheet = self.active_sheet or 'Data'
worksheet = self._get_or_create_worksheet(target_sheet)
if target_sheet not in self._sheets_written_this_session:
self._clear_worksheet(worksheet)
self._sheets_written_this_session.add(target_sheet)
has_groups = bool(col_fmt and any(p.get('group_label') for p in col_fmt))
header_row = 2 if has_groups else 1
should_write_headers = (
self.write_headers and not self._headers_written
and worksheet.cell(header_row, 1).value is None
)
data_start_row = (header_row + 1) if should_write_headers else worksheet.max_row + 1
display_headers = self._get_headers()
if len(display_headers) != len(self.columns):
raise ValueError(
f"headers has {len(display_headers)} name(s) but data has {len(self.columns)} column(s)"
)
header_widths = [len(h) for h in display_headers]
data_widths = [0] * len(self.columns)
if should_write_headers:
header_font = Font(bold=True)
for col_idx, column_name in enumerate(display_headers, 1):
cell = worksheet.cell(row=header_row, column=col_idx, value=column_name)
hfmt = col_fmt[col_idx - 1].get('header_style') if col_fmt else None
if hfmt:
cell.style = hfmt
else:
cell.font = header_font
self._headers_written = True
row_count = self._write_rows(
worksheet, data_start_row, col_fmt, col_conditional_styles, rows_fmt,
header_widths, data_widths,
)
if should_write_headers:
self._finalize_headers(worksheet, header_widths, data_widths, col_fmt, rows_fmt,
link_mapping=self._link_mapping)
self._row_num += row_count
logger.info(f"Wrote {row_count} rows to sheet '{target_sheet}' (total: {self._row_num})")
def _save_workbook(self):
"""Save the workbook. Idempotent - safe to call multiple times."""
if self.workbook is not None:
try:
self.workbook.save(self.output_path)
logger.info(f"Saved workbook: {self.output_path}")
except Exception as e:
logger.error(f"Failed to save workbook: {e}")
raise
finally:
self.workbook = None # Mark as saved to prevent duplicate saves
[docs]
def close(self):
"""Close the writer and save the workbook."""
self._save_workbook()
def __exit__(self, exc_type, exc_val, exc_tb):
"""Save workbook on context exit."""
try:
self.close()
finally:
if self.output_path.exists() and self.output_path.stat().st_size == 0:
# remove invalid empty Excel file
self.output_path.unlink()
return False
[docs]
def to_excel(
data,
file: Union[str, Path],
sheet: str = 'Data',
headers: Optional[List[str]] = None,
write_headers: bool = True,
) -> None:
"""
Legacy convenience function — writes a single sheet.
Parameters
----------
data : Iterable[RecordLike]
Data to write (cursor, list of Records, etc.)
file : str or Path
Output Excel file (.xlsx)
sheet : str, default 'Data'
Sheet name to write to
headers : List[str], optional
Header row text. If None, uses cursor.description or detected column names
write_headers : bool, default True
Whether to write column headers
Examples
--------
# Write cursor with original database column names
to_excel(cursor, 'report.xlsx')
# Override header names
to_excel(cursor, 'report.xlsx', headers=['User ID', 'Full Name', 'Email'])
For multi-sheet or advanced reports, use ExcelWriter as a context manager with write_batch().
"""
with ExcelWriter(data=None, file=file, headers=headers, write_headers=write_headers) as writer:
writer.write_batch(data=data, sheet_name=sheet)
[docs]
class LinkSource:
"""
Defines a linkable data source for creating internal and external hyperlinks in Excel.
LinkSource enables rich hyperlinking between worksheets and to external URLs. Each
LinkSource is registered once with LinkedExcelWriter and can be used across multiple
sheets to create consistent, navigable Excel reports.
As the source sheet is written, LinkSource caches each record's cell reference and
generates formatted link text and URLs using Python's str.format_map(). Later sheets
can reference these cached records to create hyperlinks.
Link Types
----------
* **Internal links** - Excel cell references like ``#Students!A5`` for navigation
* **External links** - HTTP/HTTPS URLs like ``https://crm.example.com/contact/123``
* **Hybrid mode** - Stores both internal and external, defaults to external if available
Parameters
----------
name : str
Unique identifier for this link source. Used when registering links in
write_batch() (e.g., ``links={"student_name": "student"}``).
source_sheet : str, optional
The worksheet name that serves as the authoritative source for this entity.
Internal links will point to rows in this sheet. This sheet must be written
before any sheets that reference it. Not required when external_only=True.
key_column : str, optional
The column name containing unique identifiers for records (e.g., "student_id").
This column must exist in both the source sheet data and any sheets that
create links to it. Not required when external_only=True.
url_template : str, optional
Python format string for generating external URLs. Uses str.format_map() with
the full record dict as context. Example: ``"https://app.com/users/{user_id}"``
text_template : str, optional
Python format string for generating link display text. Uses str.format_map()
with the full record dict. Example: ``"{last_name}, {first_name} ({dept})"``
If not provided, uses the column value.
missing_text : str, optional
Fallback text to display when a link target cannot be resolved. If None,
displays the raw value from the detail row.
external_only : bool, default False
If True, this LinkSource generates external links directly from current row data
without caching. Can be reused across multiple sheets. source_sheet and key_column
are not required. If False (default), caches records for cross-sheet linking.
Attributes
----------
_records : dict
Internal cache mapping key values to link metadata (ref, display_text, url).
Populated automatically as the source sheet is written.
Examples
--------
**Internal links only (sheet navigation)**::
student_link = LinkSource(
name="student",
source_sheet="Students",
key_column="student_id"
)
**External links with custom text**::
employee_link = LinkSource(
name="employee",
source_sheet="Employees",
key_column="employee_id",
url_template="https://hr.company.com/profile/{employee_id}",
text_template="{last_name}, {first_name} ({department})"
)
**Hybrid with missing value handling**::
customer_link = LinkSource(
name="customer",
source_sheet="Customers",
key_column="customer_id",
url_template="https://crm.company.com/customers/{crm_id}",
text_template="{company_name} - {contact_name}",
missing_text="[Unknown Customer]"
)
**External-only links (reusable across sheets)**::
# For external links - reusable on any sheet with required columns
imdb_link = LinkSource(
name="imdb",
url_template="https://imdb.com/title/{tconst}",
text_template="{primary_title} ({start_year})",
external_only=True # No source_sheet needed - works on any sheet
)
# Use on multiple sheets with same columns:
writer.write_batch(movies, "Movies", links={"primary_title": "imdb"})
writer.write_batch(top_rated, "Top Rated", links={"primary_title": "imdb"})
Notes
-----
* The source sheet MUST be written before sheets that reference it
* All template fields must exist in the record data or KeyError will be logged
* Key values are converted to strings for cache lookups
* Templates use Python's str.format_map() - use double braces {{}} to escape
See Also
--------
LinkedExcelWriter : Writer that uses LinkSource for hyperlinking
LinkedExcelWriter.register_link_source : Method to register a LinkSource
"""
[docs]
def __init__(self,
name: str,
source_sheet: str = None,
key_column: str = None,
url_template: str = None,
text_template: str = None,
missing_text: str = None,
external_only: bool = False):
self.name = name
self.source_sheet = source_sheet
self.key_column = key_column
self.url_template = url_template
self.text_template = text_template
self.missing_text = missing_text
self.external_only = external_only
self._records = {}
# Validation
if external_only:
if not url_template:
raise ValueError(f"url_template is required when external_only=True for LinkSource '{name}'")
else:
if not source_sheet:
raise ValueError(f"source_sheet is required when external_only=False for LinkSource '{name}'")
if not key_column:
raise ValueError(f"key_column is required when external_only=False for LinkSource '{name}'")
# Track display width for column sizing (sample first 100 rows, cap at 50 chars)
self._max_display_width: int = 0
self._sample_count: int = 0
[docs]
def cache_record(self, key_value: Any, row_dict: Dict[str, Any], ref: str) -> None:
"""Cache a record for cross-sheet linking (unless external_only=True)."""
# Skip caching if this LinkSource is external-only (self-linking only)
if self.external_only:
# Still track display width for column sizing
if self._sample_count < 100:
if self.text_template:
try:
display_text = self.text_template.format_map(row_dict)
except KeyError:
display_text = str(key_value)
else:
display_text = str(key_value)
display_len = min(len(display_text), 50)
self._max_display_width = max(self._max_display_width, display_len)
self._sample_count += 1
return
key_str = str(key_value)
if self.text_template:
try:
display_text = self.text_template.format_map(row_dict)
except KeyError as e:
logger.warning(f"Missing key {e} in text_template for {self.name}")
display_text = f"{key_value} ({self.missing_text})"
else:
# No template → use the raw key value (clean, expected)
display_text = str(key_value)
# Track max display width for first 100 records (capped at 50 chars)
if self._sample_count < 100:
display_len = min(len(display_text), 50)
self._max_display_width = max(self._max_display_width, display_len)
self._sample_count += 1
record = {
"ref": ref,
"display_text": display_text,
}
if self.url_template:
try:
record["url"] = self.url_template.format_map(row_dict)
except KeyError as e:
logger.warning(f"Missing key {e} in url_template for {self.name}")
if key_str not in self._records:
self._records[key_str] = record
[docs]
def generate_link_from_row(
self,
row_dict: Dict[str, Any],
ref: str,
mode: str = "external",
column_value: Any = None
) -> Optional[dict]:
"""
Generate link info directly from row data (for self-linking or external-only).
Used when writing the source sheet itself to create links from current row
instead of looking up from cache.
Parameters
----------
row_dict : dict
The current row's data as a dictionary
ref : str
The cell reference for internal links (e.g., "#Movies!A5")
mode : str
"external" or "internal"
column_value : Any, optional
For external_only sources, the value from the linked column
Returns
-------
dict or None
Dict with "target" and "display_text", or None if link cannot be generated
"""
# Generate display text
if self.text_template:
try:
display_text = self.text_template.format_map(row_dict)
except KeyError as e:
logger.warning(f"Missing key {e} in text_template for {self.name}")
return None
else:
# No template → use column_value for external_only, or key_column for others
if self.external_only:
# For external_only: use the linked column's value
if column_value is None:
return None
display_text = str(column_value)
elif self.key_column:
# For self-linking: use key_column
key_value = row_dict.get(self.key_column)
if key_value is None:
return None
display_text = str(key_value)
else:
return None
# Track max display width for first 100 self-links (capped at 50 chars)
if self._sample_count < 100:
display_len = min(len(display_text), 50)
self._max_display_width = max(self._max_display_width, display_len)
self._sample_count += 1
# Generate target
if mode == "external":
if self.url_template:
try:
target = self.url_template.format_map(row_dict)
except KeyError as e:
logger.warning(f"Missing key {e} in url_template for {self.name}")
# Fallback to internal ref if URL template fails
target = ref
else:
# No URL template, fall back to internal
target = ref
else:
# Internal mode
target = ref
return {
"target": target,
"display_text": display_text
}
[docs]
def get_link(
self,
key_value: Any,
mode: str = "external",
) -> Optional[dict]:
"""
Resolve link for a key.
mode: "external" (default) or "internal"
Returns dict with "target" and "display_text" or None if missing.
"""
record = self._records.get(str(key_value))
if not record:
return None
if mode == "external":
target = record.get("url") or record.get("ref")
else:
target = record.get("ref")
if not target:
return None
return {
"target": target,
"display_text": record["display_text"],
}
@property
def max_display_width(self) -> int:
"""
Maximum display text width observed across sampled links.
Samples first 100 records, capped at 50 characters to handle outliers.
Used for automatic column width sizing.
"""
return self._max_display_width
[docs]
class LinkedExcelWriter(ExcelWriter):
"""
Advanced Excel writer with internal and external hyperlink management.
LinkedExcelWriter extends ExcelWriter to enable rich, bidirectional hyperlinking
within Excel workbooks and to external systems. It automatically caches source
records as they're written and creates formatted hyperlinks in detail sheets that
reference those sources.
This is particularly powerful for creating navigable multi-sheet reports with
master-detail relationships, drill-through capabilities, and integration with
external CRM, ticketing, or web applications.
Key Features
------------
* **Internal navigation** - Links between worksheets (e.g., ``#Students!B5``)
* **External integration** - Deep links to web applications
* **Hybrid linking** - Store both internal and external, choose which to display
* **Template-based formatting** - Use Python format strings for link text and URLs
* **Automatic caching** - Source records cached as written, no manual tracking
* **Mode control** - Force internal or external links per column via ``source:internal``
Workflow
--------
1. Create LinkSource definitions for each linkable entity
2. Register them with LinkedExcelWriter
3. Write source sheets first (e.g., Students, Products)
4. Write detail sheets with link specifications (e.g., Enrollments, Orders)
5. Links are resolved from cache and applied automatically
Parameters
----------
file : str or Path
Output Excel file (.xlsx)
data : Iterable[RecordLike], optional
Initial data to write. If None, use write_batch() for streaming mode.
sheet_name : str, optional
Default sheet name for write_batch() calls
write_headers : bool, default True
Whether to write column headers
Examples
--------
**Basic internal linking between sheets**::
with LinkedExcelWriter(file='school_report.xlsx') as writer:
# Define linkable entity
student_link = LinkSource(
name="student",
source_sheet="Students",
key_column="student_id"
)
writer.register_link_source(student_link)
# Write source sheet
writer.write_batch(students_data, sheet_name="Students")
# Write detail sheet with internal links
writer.write_batch(
enrollments_data,
sheet_name="Enrollments",
links={"student_name": "student:internal"}
)
**External links to CRM system**::
with LinkedExcelWriter(file='sales_report.xlsx') as writer:
customer_link = LinkSource(
name="customer",
source_sheet="Customers",
key_column="customer_id",
url_template="https://crm.company.com/customers/{crm_id}",
text_template="{company_name} ({customer_id})"
)
writer.register_link_source(customer_link)
writer.write_batch(customers_data, sheet_name="Customers")
writer.write_batch(
orders_data,
sheet_name="Orders",
links={"customer": "customer"} # Uses external URL
)
**Hybrid mode with internal and external links**::
with LinkedExcelWriter(file='support_tickets.xlsx') as writer:
ticket_link = LinkSource(
name="ticket",
source_sheet="Tickets",
key_column="ticket_id",
url_template="https://support.company.com/ticket/{ticket_id}",
text_template="#{ticket_id} - {subject}"
)
writer.register_link_source(ticket_link)
writer.write_batch(tickets_data, sheet_name="Tickets")
writer.write_batch(
comments_data,
sheet_name="Comments",
links={
"ticket_link": "ticket", # External to support system
"ticket_ref": "ticket:internal" # Internal sheet navigation
}
)
**Multiple link sources in one sheet**::
with LinkedExcelWriter(file='class_roster.xlsx') as writer:
student_link = LinkSource(
name="student",
source_sheet="Students",
key_column="student_id",
text_template="{last_name}, {first_name}"
)
course_link = LinkSource(
name="course",
source_sheet="Courses",
key_column="course_id",
text_template="{course_code} - {title}"
)
writer.register_link_source(student_link)
writer.register_link_source(course_link)
writer.write_batch(students_data, sheet_name="Students")
writer.write_batch(courses_data, sheet_name="Courses")
writer.write_batch(
enrollments_data,
sheet_name="Enrollments",
links={
"student_name": "student:internal",
"course_name": "course:internal"
}
)
Notes
-----
* Source sheets MUST be written before detail sheets that reference them
* The key_column must exist in both source and detail datasets
* Missing links display missing_text if set, otherwise show raw value
* Link mode syntax: ``"source_name"`` (external) or ``"source_name:internal"``
* Templates use str.format_map() - all fields must exist in record data
* Hyperlink styling (blue, underlined) is applied automatically
See Also
--------
LinkSource : Link definition class
ExcelWriter : Base writer without linking capabilities
"""
[docs]
def __init__(
self,
data: Optional[Iterable[RecordLike]] = None,
file: Optional[Union[str, Path]] = None,
sheet_name: Optional[str] = None,
headers: Optional[List[str]] = None,
write_headers: bool = True,
formatting: Optional[Dict] = None,
):
super().__init__(data=data, file=file, sheet_name=sheet_name, headers=headers,
write_headers=write_headers, formatting=formatting)
#: Registered LinkSource instances, keyed by name.
self.link_sources: Dict[str, LinkSource] = {}
[docs]
def register_link_source(self, source: LinkSource) -> None:
"""Register a link source for use across sheets."""
if source.name in self.link_sources:
logger.warning(f"LinkSource '{source.name}' already registered — overwriting")
self.link_sources[source.name] = source
[docs]
def write_batch(
self,
data: Iterable[RecordLike],
sheet_name: Optional[str] = None,
headers: Optional[List[str]] = None,
links: Optional[Dict[str, str]] = None,
formatting: Optional[Dict] = None,
) -> None:
"""
Write a batch with optional hyperlinking.
If this is the first write to this sheet in the current session, the sheet
is cleared first. Subsequent writes to the same sheet append data.
headers: display names for this batch, overriding the writer-level default
links: dict column_name → "source_name" or "source_name:internal"
formatting: per-call formatting override; None uses writer-level formatting,
{} writes with no formatting
"""
if isinstance(formatting, ExcelFormat):
formatting = formatting.to_dict()
effective_fmt = self.formatting if formatting is None else formatting
call_col_rules = [
(k.lower(), k, v.to_dict() if isinstance(v, ColumnRule) else v)
for k, v in effective_fmt.get('columns', {}).items()
]
self._register_call_styles(effective_fmt)
target_sheet = sheet_name or self.active_sheet or 'Data'
if sheet_name:
self.active_sheet = target_sheet
worksheet = self._get_or_create_worksheet(target_sheet)
# Clear sheet if this is the first write to it in this session
if target_sheet not in self._sheets_written_this_session:
self._clear_worksheet(worksheet)
self._sheets_written_this_session.add(target_sheet)
tab_color = effective_fmt.get('tab_color')
if tab_color:
worksheet.sheet_properties.tabColor = tab_color.lstrip('#')
# Parse links dict into resolved mapping: column → (source, mode)
link_mapping = {}
if links:
for col, spec in links.items():
if ':' in spec:
source_name, mode_str = spec.split(':', 1)
mode = mode_str.lower()
if mode not in {'internal', 'external'}:
raise ValueError(f"Invalid mode '{mode_str}' in link spec '{spec}'")
else:
source_name = spec
mode = "external"
if source_name not in self.link_sources:
raise ValueError(f"Unknown LinkSource '{source_name}'")
link_mapping[col] = (self.link_sources[source_name], mode)
# Determine if this sheet is a source sheet for any registered LinkSource
# Skip external_only sources since they don't cache
source_for_this_sheet = [
src for src in self.link_sources.values()
if not src.external_only and src.source_sheet == target_sheet
]
row_count = self._write_to_worksheet(
data=data,
worksheet=worksheet,
write_headers=self.write_headers,
headers=headers,
link_mapping=link_mapping,
source_for_this_sheet=source_for_this_sheet,
target_sheet=target_sheet,
effective_fmt=effective_fmt,
col_rules=call_col_rules,
)
self._row_num += row_count
logger.info(f"Wrote {row_count} rows to sheet '{target_sheet}' with linking")
def _write_to_worksheet(
self,
data: Iterable[RecordLike],
worksheet: 'Worksheet',
columns: Optional[List[str]] = None,
write_headers: bool = True,
headers: Optional[List[str]] = None,
link_mapping: Optional[Dict[str, tuple]] = None,
source_for_this_sheet: Optional[list] = None,
target_sheet: Optional[str] = None,
effective_fmt: Optional[dict] = None,
col_rules: Optional[list] = None,
) -> int:
self._link_mapping = link_mapping or {}
self._source_for_this_sheet = source_for_this_sheet or []
self._target_sheet = target_sheet
return super()._write_to_worksheet(data, worksheet, columns, write_headers, headers,
effective_fmt=effective_fmt, col_rules=col_rules)
def _apply_cell_overrides(self, cell, record, col_name, col_idx, row_idx, style_names):
link_spec = self._link_mapping.get(col_name)
if not link_spec:
return
source, mode = link_spec
if source.external_only:
link_info = source.generate_link_from_row(
record, ref="", mode="external", column_value=cell.value
)
elif source.source_sheet == self._target_sheet:
key_col_letter = get_column_letter(self._col_index_map[source.key_column])
ref = f"#{self._target_sheet}!{key_col_letter}{row_idx}"
link_info = source.generate_link_from_row(record, ref, mode=mode)
else:
key_value = cell.value
link_info = source.get_link(key_value, mode=mode) if key_value is not None else None
if link_info:
cell.hyperlink = link_info["target"]
cell.value = link_info["display_text"]
style_names.append('hyperlink_style')
elif source.missing_text is not None:
cell.value = source.missing_text
def _post_row(self, record, row_idx: int) -> None:
for source in self._source_for_this_sheet:
key_col_letter = get_column_letter(self._col_index_map[source.key_column])
ref = f"#{self._target_sheet}!{key_col_letter}{row_idx}"
key_value = record.get(source.key_column)
if key_value is not None:
source.cache_record(key_value, record, ref)
[docs]
def check_dependencies():
if not HAS_OPENPYXL:
logger.error('Openpyxl is not available. Excel files not supported.')
check_dependencies()