# dbtk/readers/excel.py
"""Excel workbook reader supporting XLS and XLSX formats."""
import datetime as dt
import logging
from pathlib import Path
from typing import List, Any, Iterator, Optional, Union
from .base import Reader
logger = logging.getLogger(__name__)
# Check for optional dependencies
try:
import openpyxl
HAS_OPENPYXL = True
except ImportError:
HAS_OPENPYXL = False
logger.warning('openpyxl not available. xlsx files not supported.')
try:
import xlrd
HAS_XLRD = True
except ImportError:
HAS_XLRD = False
if not HAS_OPENPYXL:
logger.warning('xlrd not available. xls files not supported.')
[docs]
class ExcelReader(Reader):
"""Class to iterate over an Excel Spreadsheet using openpyxl."""
[docs]
def __init__(self,
worksheet,
headers: Optional[List[str]] = None,
add_row_num: bool = True,
skip_rows: int = 0,
n_rows: Optional[int] = None,
null_values=None):
"""Initialize ExcelReader for reading Excel .xlsx files.
Args:
worksheet: openpyxl.Worksheet object to read from.
headers: Optional list of header names to use instead of reading from row 1.
add_row_num: If True, adds a _row_num field to each record (default: True).
skip_rows: Number of data rows to skip after headers (default: 0).
n_rows: Maximum number of rows to read, or None for all (default: None).
null_values: Values to convert to None (e.g., '\\N', 'NULL', 'NA').
Raises:
TypeError: If worksheet is not an openpyxl.Worksheet.
"""
if worksheet.__class__.__name__ != 'Worksheet':
raise TypeError('worksheet must be of type openpyxl.Worksheet or use XLSReader')
super().__init__(add_row_num=add_row_num,
skip_rows=skip_rows, n_rows=n_rows,
null_values=null_values)
self.ws = worksheet
self._trackable = self.ws
self._total_records = self.ws.max_row + 1
self._headers_read = False
self._raw_headers = headers # Use provided headers if given
self._start_row = 1 if headers else 2 # openpyxl 1-based indexing
def _read_headers(self) -> List[str]:
"""Read the header row from the Excel worksheet (row 1) or use provided headers.
Returns:
List of header values.
Raises:
StopIteration: If the file is empty and no headers are provided.
"""
if self._raw_headers is not None:
return self._raw_headers
if not self._headers_read:
# openpyxl is 1-based, so row 1 is the first row
if self.ws.max_row < 1:
raise StopIteration("Empty worksheet")
header_cells = self.ws[1]
self._raw_headers = [cell.value for cell in header_cells]
self._headers_read = True
return self._raw_headers
def _generate_rows(self) -> Iterator[List[Any]]:
"""Yield data rows from the Excel worksheet starting from _start_row.
Yields:
List of cell values for each data row.
"""
for row_num in range(self._start_row, self.ws.max_row + 1): # 1-based
row_cells = self.ws[row_num]
yield [cell.value for cell in row_cells]
def _cleanup(self):
"""Perform cleanup (no-op for openpyxl worksheet as no file pointer is managed)."""
pass
[docs]
class XLSReader(Reader):
"""Class to iterate over an Excel Spreadsheet using xlrd."""
[docs]
def __init__(self,
worksheet,
headers: Optional[List[str]] = None,
add_row_num: bool = True,
skip_rows: int = 0,
n_rows: Optional[int] = None,
null_values=None):
"""Initialize XLReader for reading Excel .xls files.
Args:
worksheet: xlrd.Sheet object to read from.
headers: Optional list of header names to use instead of reading from row 0.
add_row_num: If True, adds a _row_num field to each record (default: True).
skip_rows: Number of data rows to skip after headers (default: 0).
n_rows: Maximum number of rows to read, or None for all (default: None).
null_values: Values to convert to None (e.g., '\\N', 'NULL', 'NA').
Raises:
TypeError: If worksheet is not an xlrd.Sheet.
"""
if worksheet.__class__.__name__ != 'Sheet':
raise TypeError('worksheet must be of type xlrd.Sheet or use ExcelReader')
super().__init__(add_row_num=add_row_num,
skip_rows=skip_rows, n_rows=n_rows,
headers=headers, null_values=null_values)
self.ws = worksheet
self._trackable = self.ws
self.datemode = worksheet.book.datemode
self._headers_read = False
self._start_row = 0 if headers is not None else 1 # xlrd 0-based indexing
def _read_headers(self) -> List[str]:
"""Read the header row from the Excel worksheet (row 0) or use provided headers.
Returns:
List of header values.
Raises:
StopIteration: If the file is empty and no headers are provided.
"""
if self._raw_headers is not None:
return self._raw_headers
if not self._headers_read:
if self.ws.nrows < 1:
raise StopIteration("Empty worksheet")
header_cells = self.ws.row(0) # xlrd is 0-based
self._raw_headers = [cell.value for cell in header_cells]
self._headers_read = True
return self._raw_headers
def _generate_rows(self) -> Iterator[List[Any]]:
"""Yield data rows from the Excel worksheet starting from _start_row.
Yields:
List of converted cell values for each data row.
"""
for row_num in range(self._start_row, self.ws.nrows): # 0-based
row_cells = self.ws.row(row_num)
yield [self._convert_cell_value(cell) for cell in row_cells]
def _convert_cell_value(self, cell) -> Any:
"""Convert an Excel cell value to an appropriate Python type.
Args:
cell: xlrd cell object to convert.
Returns:
Converted value (datetime, int, float, str, or None).
"""
if cell is None or cell.ctype == 0:
return None
elif cell.ctype == 3:
try:
tm_tuple = xlrd.xldate_as_tuple(cell.value, self.datemode)
if any(tm_tuple[:3]):
return dt.datetime(*tm_tuple)
else:
return dt.time(*tm_tuple[3:])
except (ValueError, TypeError):
return None
elif cell.ctype == 2:
if cell.value == int(cell.value):
return int(cell.value)
else:
return cell.value
else:
try:
return str(cell.value)
except (ValueError, TypeError):
return cell.value
def _cleanup(self):
"""Perform cleanup (no-op for xlrd worksheet as no file pointer is managed)."""
pass
[docs]
def open_workbook(filename: Union[str, Path]):
"""Open an Excel workbook using openpyxl for .xlsx or xlrd for .xls.
Args:
filename: Path to the Excel file (.xlsx or .xls).
Returns:
Workbook object (openpyxl.Workbook or xlrd.Book).
Raises:
ImportError: If neither openpyxl nor xlrd is available.
"""
if HAS_OPENPYXL and filename.endswith('.xlsx'):
workbook = openpyxl.load_workbook(filename, data_only=True)
elif HAS_XLRD:
workbook = xlrd.open_workbook(filename)
else:
raise ImportError('Neither openpyxl nor xlrd available for Excel file support')
return workbook
[docs]
def get_sheet_by_index(wb, index: int):
"""Get a worksheet from a workbook by index.
Args:
wb: Workbook object (openpyxl.Workbook or xlrd.Book).
index: Index of the sheet to retrieve (0-based).
Returns:
Worksheet object (openpyxl.Worksheet or xlrd.Sheet).
Raises:
TypeError: If workbook type is not supported.
"""
if wb.__class__.__name__ == 'Workbook':
# openpyxl
return wb.worksheets[index]
elif wb.__class__.__name__ == 'Book':
# xlrd
return wb.sheet_by_index(index)
else:
raise TypeError(f"Unknown workbook type: {wb.__class__.__name__}")
[docs]
def get_sheet_by_name(wb, sheet_name: str):
"""Get a worksheet from a workbook by name."""
if wb.__class__.__name__ == 'Workbook':
# openpyxl
return wb[sheet_name]
elif wb.__class__.__name__ == 'Book':
# xlrd
return wb.sheet_by_name(sheet_name)
else:
raise TypeError(f"Unknown workbook type: {wb.__class__.__name__}")
[docs]
def check_dependencies():
"""Check for optional dependencies and issue warnings if missing."""
if not HAS_OPENPYXL and not HAS_XLRD:
logger.error('Neither openpyxl nor xlrd available. Excel files not supported.')
check_dependencies()