Source code for dbtk.record

# dbtk/record.py
"""
Record classes for database result sets.
"""
import logging
from typing import List, Any, Iterator, Tuple, Union
from .utils import to_string, normalize_field_name, FixedColumn

logger = logging.getLogger(__name__)

_MISSING = object()  # sentinel for pop() default argument

[docs] class Record(list): """ Flexible/lightweight that strikes a balance between the memory efficiency of list and the functionality of dicts/objects. Record extends list to provide a rich interface for accessing query result rows. It supports attribute access, dictionary-style key access, integer indexing, and slicing - all on the same object. This makes it a very flexible and memory efficient return type for both cursors and readers. Access Patterns --------------- * **Dictionary-style**: ``row['column_name']`` - Safe with .get() method * **Attribute access**: ``row.column_name`` - Clean, readable syntax * **Integer index**: ``row[3]`` - Positional access * **Slicing**: ``row[1:4]`` - Get multiple columns at once * **Iteration**: ``for value in row`` - Iterate over values * **Containment**: ``'column_name' in row`` - Check if column exists Key Methods ----------- * **get(key, default=None)** - Safe dictionary-style access with default * **keys()** - Get list of column names * **values()** - Get list of column values * **items()** - Get (column, value) pairs * **copy()** - Create a shallow copy of the record * **update(dict)** - Update multiple columns from a dictionary * **coalesce(dict)** - Update only missing values from a dictionary * **pprint()** - Pretty-print the record Column Names: Original vs Normalized ------------------------------------- Every Record stores two parallel lists of names for each column: * ``_fields`` — the **original** names exactly as returned by the database (e.g. ``'First Name'``, ``'User ID'``, ``'#term_code'``). * ``_fields_normalized`` — **Python-safe** versions used for attribute access (e.g. ``'first_name'``, ``'user_id'``, ``'term_code'``). Both lists are set once by :meth:`set_fields` when the cursor executes its first query. Normalization converts field to be suitable for attribute access. It lowercases, replaces non-alphanumeric characters with underscores, collapses runs, strips trailing underscores, and prefixes digit-leading names with ``n``. **Which to use:** * Use **original names** (``row['First Name']``, ``row.keys()``, ``row.to_dict()``) when round-tripping data back to the database or to a CSV, where column names must match the schema exactly. * Use **normalized names** (``row.first_name``, ``row['first_name']``, ``row.keys(normalized=True)``, ``row.to_dict(normalized=True)``) in application code where Pythonic attribute access is preferred and when case and white-space insensitive matching is beneficial. Both forms work interchangeably for item get/set and ``in`` checks, so ``row['First Name']`` and ``row['first_name']`` return the same value. Note ---- Record is dynamically subclassed when a cursor executes a query. Each unique set of column names gets its own Record subclass with those names set as class attributes. This enables attribute access while maintaining the list base class for compatibility. Example ------- :: cursor = db.cursor() cursor.execute("SELECT id, name, email, created FROM users WHERE id = :id", {'id': 42}) user = cursor.fetchone() # All these access patterns work on the same object: print(user['name']) # Dictionary-style: 'Aang' print(user.name) # Attribute access: 'Aang' print(user[1]) # Index access: 'Aang' print(user[1:3]) # Slicing: ['Aang', 'aang@avatar.com'] # Safe access with default print(user.get('phone', 'N/A')) # 'N/A' if no phone column # Dictionary methods for col, val in user.items(): print(f"{col}: {val}") # List compatibility user_id, name, email, created = user # Unpack like a tuple print(' | '.join(user)) # Join like a list # Update columns user['email'] = 'newemail@avatar.com' user.name = 'Avatar Aang' See Also -------- Cursor : Database cursor that returns Record objects """ __slots__ = ("_added", "_deleted_fields") _fields: List[str] = [] # Original field names (e.g., 'Start Year') _fields_normalized: List[str] = [] # Normalized for access (e.g., 'start_year') _field_len: int = 0 # cached for fast hot path _mutable_schema: bool = True # Set to False in subclasses to forbid field add/delete # list of method and attribute names that normalized field names must not collide with _RESERVED: frozenset = frozenset({ # methods defined on Record 'append', 'clear', 'coalesce', 'copy', 'extend', 'get', 'items', 'keys', 'pop', 'pprint', 'remove', 'to_dict', 'update', 'values', # classmethods 'set_fields', '_get_reserved', # inherited list methods not overridden 'count', 'index', 'insert', 'reverse', 'sort', # attributes / slots '_mutable_schema', '_fields', '_fields_normalized', '_field_len', '_added', '_deleted_fields', # _RESERVED itself normalizes to _reserved '_reserved', })
[docs] def __init__(self, *args, **kwargs): # Lazy attributes object.__setattr__(self, "_deleted_fields", set()) object.__setattr__(self, "_added", None) # Fast path check first n = self._field_len if len(args) == n and not kwargs: super().__init__(args) return # Default values values = [None] * n # Positional: fill in order, truncate if too many for i, val in enumerate(args): if i < n: values[i] = val # Keyword args: override for k, v in kwargs.items(): if k in self._fields_normalized: idx = self._fields_normalized.index(k) values[idx] = v elif k in self._fields: idx = self._fields.index(k) values[idx] = v super().__init__(values)
# ------------------------------------------------------------------ # # Core access methods # ------------------------------------------------------------------ # def __getitem__(self, key: Union[int, str, slice]) -> Any: if isinstance(key, str): # 1. Runtime-added fields if self._added and key in self._added: return self._added[key] # 2. Deleted fields (check by original name) if key in self._deleted_fields: raise KeyError(f"Column '{key}' has been deleted") # 3. Original field names try: return super().__getitem__(self._fields.index(key)) except ValueError: pass # 4. Normalized field names try: idx = self._fields_normalized.index(key) # Check if the corresponding original field was deleted if self._fields[idx] in self._deleted_fields: raise KeyError(f"Column '{key}' has been deleted") return super().__getitem__(idx) except ValueError: raise KeyError(f"Column '{key}' not found") return super().__getitem__(key) def __setitem__(self, key: Union[int, str], value: Any) -> None: if isinstance(key, int): # Positional set — update list directly # Note: if index > current len, list pads with None — we allow it super().__setitem__(key, value) return if not isinstance(key, str): raise TypeError("key must be int or str") # 1. Runtime-added field? Update it if self._added and key in self._added: self._added[key] = value return # 2. Try original field name if key in self._fields: if key in self._deleted_fields: # Revive deleted field self._deleted_fields.remove(key) super().__setitem__(self._fields.index(key), value) return # 3. Try normalized field name if key in self._fields_normalized: idx = self._fields_normalized.index(key) original_name = self._fields[idx] if original_name in self._deleted_fields: # Revive deleted field self._deleted_fields.remove(original_name) super().__setitem__(idx, value) return # 4. New field — add to runtime dict if not self.__class__._mutable_schema: raise TypeError( f"Cannot add field '{key}': schema is fixed (_mutable_schema=False)" ) if self._added is None: object.__setattr__(self, "_added", {}) self._added[key] = value def __delitem__(self, key: Union[int, str]) -> None: if not isinstance(key, str): raise TypeError("Record supports deletion only by column name (string)") # Delegate to pop() — it has all the logic self.pop(key) def __getattr__(self, name: str) -> Any: # Called only when normal attribute lookup fails if self._added and name in self._added: return self._added[name] # Check normalized field names for attribute access if name in self._fields_normalized: idx = self._fields_normalized.index(name) if self._fields[idx] not in self._deleted_fields: return super(Record, self).__getitem__(idx) raise AttributeError(f"'Record' object has no attribute '{name}'") def __setattr__(self, name: str, value: Any) -> None: # Allow setting row.new_field = value self[name] = value def __delattr__(self, name: str) -> None: try: del self[name] except KeyError: raise AttributeError(name) def __contains__(self, key: object) -> bool: if not isinstance(key, str): return False # Check original fields if key in self._fields: return key not in self._deleted_fields # Check normalized fields if key in self._fields_normalized: idx = self._fields_normalized.index(key) return self._fields[idx] not in self._deleted_fields # Check runtime-added fields return self._added and key in self._added @classmethod def _get_reserved(cls) -> frozenset: """Return all reserved names, including those from parent classes.""" reserved = cls._RESERVED # Walk the MRO and collect reserved names from all base classes for base in cls.__mro__: if hasattr(base, '_RESERVED'): reserved = reserved | base._RESERVED return reserved
[docs] @classmethod def set_fields(cls, fields: List[str]) -> None: """ Set the field names for this Record class. Stores original field names and generates normalized versions for attribute access. Handles collisions by appending _2, _3, etc. to duplicate normalized names. Args: fields: Original field names (e.g., ['Start Year', 'End Date']) Examples: >>> rec.set_fields(['Start Year', 'End Date']) >>> rec._fields ['Start Year', 'End Date'] >>> rec._fields_normalized ['start_year', 'end_date'] """ # Deduplicate original field names before normalization. # When a name appears more than once, rename later occurrences by # appending _2, _3, … — skipping any candidate already taken by an # earlier renamed field OR that already exists in the original list. deduped = [] seen_originals = set() for name in fields: if name not in seen_originals: seen_originals.add(name) deduped.append(name) else: counter = 1 candidate = name while candidate in seen_originals or candidate in fields: counter += 1 candidate = f"{name}_{counter}" logger.warning( f"Duplicate field '{name}' renamed to '{candidate}' in _fields." ) seen_originals.add(candidate) deduped.append(candidate) fields = deduped cls._fields = fields cls._field_len = len(fields) # Normalize and handle collisions normalized = [] seen = {} reserved = cls._get_reserved() for original in fields: norm = normalize_field_name(original) # normalize name for attribute access original_norm = norm counter = 1 while norm in seen or norm in reserved: counter += 1 norm = f"{original_norm}_{counter}" if norm != original_norm: logger.warning( f"Attribute access for '{original}' was renamed to '{norm}' due to collision " "with existing Record method/attribute." ) seen[norm] = True normalized.append(norm) cls._fields_normalized = normalized
# ------------------------------------------------------------------ # # Dict-like interface # ------------------------------------------------------------------ #
[docs] def keys(self, normalized: bool = False) -> List[str]: """ Get list of field names. Args: normalized: If True, return normalized field names. If False (default), return original field names. Returns: List of field names """ if normalized: # Return normalized names for non-deleted fields base = [self._fields_normalized[i] for i, f in enumerate(self._fields) if f not in self._deleted_fields] else: base = [f for f in self._fields if f not in self._deleted_fields] if self._added: base.extend(self._added.keys()) return base
[docs] def values(self) -> Tuple[Any, ...]: """Get list of field values (in original field order).""" return tuple(self[k] for k in self.keys())
[docs] def items(self, normalized: bool = False) -> Iterator[Tuple[str, Any]]: """ Get (field_name, value) pairs. Args: normalized: If True, use normalized field names. If False (default), use original field names. Yields: Tuples of (field_name, value) """ if normalized: for i, field in enumerate(self._fields): if field not in self._deleted_fields: yield self._fields_normalized[i], self[field] else: for field in self._fields: if field not in self._deleted_fields: yield field, self[field] if self._added: yield from self._added.items()
[docs] def get(self, key: str, default: Any = None) -> Any: try: return self[key] except KeyError: return default
[docs] def pop(self, key: str, default: object = _MISSING) -> Any: if not isinstance(key, str): raise TypeError("pop() key must be str") if not self.__class__._mutable_schema: raise TypeError( f"Cannot delete field '{key}': schema is fixed (_mutable_schema=False)" ) # 1. Runtime-added field? if self._added and key in self._added: return self._added.pop(key) # 2. Try original field names if key in self._fields: if key in self._deleted_fields: raise KeyError(key) value = self[key] self._deleted_fields.add(key) return value # 3. Try normalized field names if key in self._fields_normalized: idx = self._fields_normalized.index(key) original_name = self._fields[idx] if original_name in self._deleted_fields: raise KeyError(key) value = super(Record, self).__getitem__(idx) self._deleted_fields.add(original_name) return value # 4. Not found if default is not _MISSING: return default raise KeyError(key)
[docs] def update(self, other=None, **kwargs) -> None: """ Update fields from another dict, Record, or keyword arguments. Overwrites existing field values unconditionally. To preserve existing non-empty values, use :meth:`coalesce` instead. Accepts any mapping with an ``items()`` method, an iterable of ``(key, value)`` pairs, or keyword arguments. Unknown keys are added as runtime fields. Args: other: Optional dict, Record, or iterable of (key, value) pairs. **kwargs: Additional key-value pairs to set. Examples: >>> Record.set_fields(['id', 'name', 'email']) >>> record = Record(1, 'Scott', 'old@example.com') >>> record.update({'email': 'new@example.com'}, name='Scott Bailey') >>> record # [1, 'Scott Bailey', 'new@example.com'] """ if other is not None: if hasattr(other, "items"): for k, v in other.items(): self[k] = v else: for k, v in other: self[k] = v for k, v in kwargs.items(): self[k] = v
[docs] def coalesce(self, other=None, **kwargs) -> None: """ Fill in missing or empty fields from another dict, Record, or keyword arguments. Updates the current Record by copying values from `other` (or `**kwargs`) only for fields that are currently `None` or an empty string (`''`). Existing non-empty values are preserved. This is a non-destructive "fill gaps" operation — it will never overwrite valid data. Args: other: Optional dict or Record containing values to coalesce. If provided, its items are processed first. **kwargs: Additional key-value pairs to coalesce (overrides keys in `other` if both provide a value). Returns: self: The updated Record (for chaining). Examples: >>> Record.set_fields(['id', 'name', 'email', 'phone', 'notes']) >>> record = Record(None, "Scott", "", "scott@example.com", None) >>> resolved = {'id': 123, 'name': 'Scott Bailey', 'notes': 'VIP'} >>> record.coalesce(resolved, phone="555-1234") >>> record # [123, "Scott", "", "555-1234", "VIP"] """ if other is not None: if hasattr(other, "items"): for k, v in other.items(): if k in self and self[k] in (None, ''): self[k] = v for k, v in kwargs.items(): if k in self and self[k] in (None, ''): self[k] = v return self
[docs] def to_dict(self, normalized: bool = False) -> dict: """ Convert Record to dictionary. Args: normalized: If True, use normalized field names as keys. If False (default), use original field names. Returns: Dictionary representation of the record Examples: >>> rec = Record(2020, 2025) >>> rec.set_fields(['Start Year', 'End Year']) >>> rec.to_dict() {'Start Year': 2020, 'End Year': 2025} >>> rec.to_dict(normalized=True) {'start_year': 2020, 'end_year': 2025} """ return dict(self.items(normalized=normalized))
[docs] def copy(self): """ Return a shallow copy of the Record. - Copies the underlying list values - Copies the field metadata (_fields, _fields_normalized) - Copies deleted fields set - Copies any runtime-added fields (_added dict) - Preserves the same Record subclass (so attribute access works) Returns: Record: A new Record instance with the same data and state """ # Create a new instance of the same class (preserves subclass attrs) new = self.__class__.__new__(self.__class__) # Initialize slots via object.__setattr__ before any attribute access — # Record.__setattr__ routes everything through __setitem__, which would # recurse infinitely on an uninitialized instance. object.__setattr__(new, '_deleted_fields', self._deleted_fields.copy()) object.__setattr__(new, '_added', self._added.copy() if self._added is not None else None) # Shallow copy of the underlying list (values) super(Record, new).__init__(super().__iter__()) return new
# ------------------------------------------------------------------ # # Utilities # ------------------------------------------------------------------ # def __len__(self) -> int: return len(self.keys()) def __iter__(self) -> Iterator[Any]: return iter(self.values()) def __str__(self) -> str: items = ", ".join(f"{k!r}: {v!r}" for k, v in self.items()) return f"{self.__class__.__name__}({items})" def __repr__(self) -> str: values = ", ".join(repr(v) for v in super().__iter__()) # original order return f"{self.__class__.__name__}({values})" def __dir__(self) -> List[str]: return sorted(set(super().__dir__()) | set(self.keys()))
[docs] def reverse(self) -> None: raise TypeError("Record.reverse() is not supported — it would break field-index mappings")
[docs] def sort(self, *args, **kwargs) -> None: raise TypeError("Record.sort() is not supported — it would break field-index mappings")
[docs] def insert(self, index: int, value: object) -> None: raise TypeError("Record.insert() is not supported — it would break field-index mappings")
[docs] def pprint(self, normalized: bool = False) -> None: """ Pretty-print the record with aligned columns. Args: normalized: If True, use normalized field names. If False (default), use original field names. """ keys_to_use = self.keys(normalized=normalized) if not keys_to_use: print("<Empty Record>") return width = max(len(k) for k in keys_to_use) template = f"{{:<{width}}} : {{}}" for key in keys_to_use: value = self[key] print(template.format(key, to_string(value)))
[docs] class FixedWidthRecord(Record): """ A Record subclass optimized for fixed-width data parsing and reconstruction. Instances represent a single row from a fixed-width file, with values accessible by name, attribute, or index. The class retains the original List[FixedColumn] definitions so that `to_line()` can reconstruct the exact source line by splicing each formatted value into its correct byte position. Designed for use with `FixedReader` or `EDIReader`, where each record type gets its own dynamic subclass with the appropriate column definitions. Class attributes (set automatically by `set_fields`): _columns : List[FixedColumn] Full column definitions in definition order, including name, position, type, alignment, pad character, and comment. _line_len : int Total line width in characters (max end_pos across all columns). Example usage:: # In reader factory RecordClass.set_fields(columns) # columns is List[FixedColumn] record = RecordClass(*row_values) original_line = record.to_line() # reconstructs formatted string # Parse a raw fixed-width string directly record = RecordClass.from_line(raw_line) # corollary to to_line() """ _columns: List[FixedColumn] = [] _line_len: int = 0 _mutable_schema: bool = False _RESERVED: frozenset = frozenset({'_columns', '_line_len', 'from_line', 'to_line', 'visualize'})
[docs] @classmethod def set_fields(cls, fields: List[FixedColumn]): """ Set field names and column definitions from a list of FixedColumn objects. Stores the column list as-is on the class (preserving position, type, alignment, pad character, and comment for introspection) and pre-computes _line_len as the rightmost end_pos across all columns. Args: fields: FixedColumn definitions for this record type. Definition order determines value order on instances; to_line() places each value by start_idx so out-of-position-order definitions work correctly. """ names = [col.name for col in fields] super().set_fields(names) cls._columns = list(fields) cls._line_len = max(col.end_pos for col in fields) if fields else 0
[docs] @classmethod def from_line(cls, line: str, auto_trim: bool = True) -> 'FixedWidthRecord': """ Parse a fixed-width string and return a new instance of this record type. The corollary to ``to_line()``: slice each field from its declared position, apply type conversion based on ``column_type``, and construct the record. Mirrors the parsing logic in :class:`FixedReader` so that records created here behave identically to those produced by the reader. Args: line: A fixed-width string. Should be at least ``_line_len`` characters; shorter strings are handled gracefully (missing positions return an empty string). auto_trim: If True (default), strip leading/trailing whitespace from ``text`` fields before storing. Set to False to preserve the raw padded value exactly as it appears in the source line. Returns: A new instance of this :class:`FixedWidthRecord` subclass with values populated from the parsed string. Raises: TypeError: If ``line`` is not a string. Example:: MyRecord = fixed_record_factory([ ('record_type', 1), FixedColumn('amount', 2, 11, 'int'), ]) record = MyRecord.from_line('6 42') assert record.record_type == '6' assert record.amount == 42 # Round-trip assert record.to_line() == MyRecord.from_line(record.to_line()).to_line() """ from .etl.transforms.datetime import parse_date, parse_datetime, parse_timestamp row_data = [] for col in cls._columns: val = line[col.start_idx:col.end_pos] try: if col.column_type == 'text' and auto_trim: val = str(val).strip() elif col.column_type == 'date': val = parse_date(val) elif col.column_type == 'datetime': val = parse_datetime(val) elif col.column_type == 'timestamp': val = parse_timestamp(val) elif col.column_type == 'int': val = int(val.strip()) if val.strip() else None elif col.column_type == 'float': val = float(val.strip()) if val.strip() else None else: val = str(val) except (ValueError, TypeError): val = str(val).strip() if auto_trim else str(val) row_data.append(val) return cls(*row_data)
[docs] def to_line(self, truncate_overflow: bool = False): """ Reconstruct the original fixed-width line from this record's values. Builds a space-filled buffer of _line_len characters and splices each field value into its position using start_idx. Column order in the definition does not matter; gaps between columns remain as spaces. Iterates only the column fields (stops before _row_num or any other appended fields). Missing values are treated as empty strings. Args: truncate_overflow: If False (default), raise ValueError when a value exceeds its column width. If True, silently truncate. Returns: A string exactly matching the fixed-width format for this record type. Raises: ValueError: If truncate_overflow=False and any value exceeds its width. Example: record.to_line() # -> '1234567890ABC 0000012345' """ cls = self.__class__ line = [' '] * cls._line_len for col, (name, value) in zip(cls._columns, self.items()): str_val = to_string(value) if len(str_val) > col.width: if truncate_overflow: str_val = str_val[:col.width] else: raise ValueError(f'Value too large for {name} limit: {col.width}') elif len(str_val) < col.width: if col.align: align = col.align[0] # 'left'→'l', 'right'→'r', 'center'→'c' elif col.column_type in ('int', 'float'): align = 'r' else: align = 'l' pad = col.pad_char if col.pad_char is not None else ( '0' if col.column_type in ('int', 'float') else ' ' ) if align == 'r': str_val = str_val.rjust(col.width, pad) elif align == 'c': str_val = str_val.center(col.width, pad) else: str_val = str_val.ljust(col.width, pad) line[col.start_idx:col.start_idx + col.width] = str_val return ''.join(line)
[docs] def pprint(self, normalized: bool = False, add_comments: bool = False) -> None: """ Pretty-print the record with aligned columns. Args: normalized: If True, use normalized field names. add_comments: If True, append each column's comment (from the FixedColumn definition) after the value. Columns without a comment are left blank in that position. Has no effect when there are no _columns defined. """ cls = self.__class__ if not add_comments or not cls._columns: super().pprint(normalized=normalized) return keys = self.keys(normalized=normalized) if not keys: print("<Empty Record>") return col_map = {col.name: col for col in cls._columns} key_width = max(len(k) for k in keys) val_width = max( len(to_string(self[k])) for k in keys ) template = f"{{:<{key_width}}} : {{:<{val_width}}} {{}}" no_comment_template = f"{{:<{key_width}}} : {{}}" comments_present = any( col_map[k].comment for k in keys if k in col_map ) for key in keys: value = to_string(self[key]) col = col_map.get(key) comment = col.comment if col else None if comments_present: print(template.format(key, value, f'# {comment}' if comment else '')) else: print(no_comment_template.format(key, value))
[docs] def visualize(self) -> str: """ Return a diagnostic string showing column boundaries over the record value. Output format (4 lines):: 1 2 ... 1234567890123456789012345... ← position ruler || | | |... ← '|' at each column start 101 123456789 87654321... ← to_line() output Returns a string; call ``print(record.visualize())`` to display. Consistent with ``FixedReader.visualize()`` which also returns a string. """ cls = self.__class__ line_len = cls._line_len ruler_10s = ''.join(str(i // 10 % 10) if i % 10 == 0 else ' ' for i in range(1, line_len + 1)) ruler_10s = '0' + ruler_10s[1:] ruler_1s = ''.join(str(i % 10) for i in range(1, line_len + 1)) boundary_line = ['─'] * line_len for col in cls._columns: boundary_line[col.start_idx] = '├' if boundary_line[col.end_pos - 1] == '─': boundary_line[col.end_pos - 1] = '┤' return f'{ruler_10s}\n{ruler_1s}\n{"".join(boundary_line)}\n{self.to_line()}'
[docs] def fixed_record_factory(columns, name='FixedRecord'): """ Class factory: build a :class:`FixedWidthRecord` subclass from a compact column spec. Follows the same convention as :func:`collections.namedtuple` — returns a *class*, not an instance. Parameters ---------- columns : list of FixedColumn or (name, width) tuple May be mixed. Tuples specify ``(name, width)`` and are assigned sequential positions starting at 1 (or immediately after the previous column). :class:`FixedColumn` objects are used as-is and advance the auto-position cursor to ``col.end_pos + 1``. name : str, optional Class name of the returned type. Defaults to ``'FixedRecord'``. Returns ------- type A :class:`FixedWidthRecord` subclass with ``set_fields()`` already called. Examples -------- :: # Tuple-only: positions calculated automatically AchDetail = fixed_record_factory([ ('record_type', 1), ('priority_code', 2), ('routing_number', 9), ('account_number', 17), ('amount', 10), ], name='AchDetail') line = AchDetail('6', '22', '123456789', '00012345678', 100) print(line.to_line()) # Mixed: use FixedColumn where you need explicit control AchHeader = fixed_record_factory([ FixedColumn('record_type', 1, 1), ('priority_code', 2), FixedColumn('routing_number', 4, 12, column_type='int', align='right'), ('filler', 39), ]) """ pos = 1 fixed_cols = [] for col in columns: if isinstance(col, FixedColumn): fixed_cols.append(col) pos = col.end_pos + 1 else: col_name, width = col[0], col[1] fixed_cols.append(FixedColumn(col_name, pos, width=width)) pos += width cls = type(name, (FixedWidthRecord,), {}) cls.set_fields(fixed_cols) return cls