File Readers
The problem: Each file format has its own quirks and APIs. You end up writing different code for CSV vs Excel vs JSON, making your ETL pipelines fragile and hard to maintain.
The solution: DBTK provides a single, consistent interface for reading all common file formats. Whether you’re reading CSV, Excel, JSON, XML, or fixed-width files, the API is identical. Even better — get_reader() automatically detects the format from the file extension.
All “Large” readers will automatically display progress trackers.
Quick Start
from dbtk import readers
# CSV files — use utf-8-sig instead of utf-8 to avoid BOM issues (corrupted column names)
with readers.CSVReader(open('northern_water_tribe_census.csv', encoding='utf-8-sig')) as reader:
for waterbender in reader:
print(f"Waterbender: {waterbender.name}, Village: {waterbender.village}")
# Excel spreadsheets
with readers.get_reader('fire_nation_army.xlsx', sheet_index=1) as reader:
for soldier in reader:
print(f"Rank: {soldier.military_rank}, Firebending Level: {soldier.flame_intensity}")
# Fixed-width text files — see section below for column definition details
columns = [
readers.FixedColumn('earthbender_name', 1, 25),
readers.FixedColumn('rock_throwing_distance', 26, 35, 'float'),
readers.FixedColumn('training_complete_date', 36, 46, 'date')
]
with readers.FixedReader(open('earth_kingdom_records.txt'), columns) as reader:
for earthbender in reader:
print(f"{earthbender.earthbender_name}: {earthbender.rock_throwing_distance} meters")
# JSON files (array of objects)
with readers.JSONReader(open('eastern_air_temple.json')) as reader:
for monk in reader:
print(f"Air Nomad: {monk.monk_name}, Sky Bison: {monk.sky_bison_companion}")
# NDJSON files (one JSON object per line — common for streaming/logs)
with readers.NDJSONReader(open('api_events.ndjson')) as reader:
for event in reader:
print(f"Event: {event.type}, User: {event.user_id}")
# XML files — text child elements are auto-discovered; see section below for XMLColumn usage
with readers.XMLReader(open('avatar_chronicles.xml'), record_xpath='//avatar') as reader:
for avatar in reader:
print(f"Avatar {avatar.name}: {avatar.origin}")
Automatic Format Detection
Let DBTK figure out what you’re reading:
# Automatically detects format from extension
with readers.get_reader('data.xlsx') as reader:
for record in reader:
process(record)
Compressed Files — Automatic Decompression
DBTK transparently handles compressed files (.gz, .bz2, .xz, .zip) with zero configuration. Just pass the compressed filename — decompression happens automatically:
# GZIP compressed CSV — automatically decompressed
with readers.get_reader('census_data.csv.gz') as reader:
for record in reader:
process(record)
# BZ2 compressed JSON
with readers.get_reader('api_response.json.bz2') as reader:
for record in reader:
process(record)
# XZ compressed TSV
with readers.get_reader('large_dataset.tsv.xz') as reader:
for record in reader:
process(record)
ZIP Archives — Smart Member Selection
For ZIP files, DBTK automatically selects the right file to read:
# Single file in ZIP — automatically selected
with readers.get_reader('data.csv.zip') as reader:
for record in reader:
process(record)
# Archive name matches member name — automatically selected
# name.subset.zip containing name.subset.tsv
with readers.get_reader('name.subset.zip') as reader:
for record in reader:
process(record)
# Multiple files — specify which one to read
with readers.get_reader('archive.zip', zip_member='data.csv') as reader:
for record in reader:
process(record)
# Works with TSV delimiter too
with readers.get_reader('names.zip', delimiter='\t') as reader:
for record in reader:
process(record)
Performance Characteristics
Progress tracking — GZIP and ZIP files show accurate progress bars without decompressing entire file
Memory efficient — Streaming decompression, constant memory usage regardless of file size
Real-world speed — 500k+ records/sec reading compressed IMDB dataset (14.7M rows) with full transforms
# Configure buffer size if needed (default is 1MB)
from dbtk.defaults import settings
settings['compressed_file_buffer_size'] = 2 * 1024 * 1024 # 2MB buffer
Fixed-Width Files
Fixed-width files have no delimiters — every field occupies a specific character range within each line. You must define all fields explicitly using FixedColumn.
Defining Columns
FixedColumn(name, start_pos, end_pos, column_type='text', align=None, pad_char=None, comment=None)
Positions are 1-indexed (the first character is position 1, not 0) and the end position is inclusive. While programmers typically think in zero indexed arrays and strings, most interface file specifications use 1-indexed positions. FixedColumn can also be initialized with start position and width as many specification are given in this format.
columns = [
readers.FixedColumn('claim_id', 1, 12), # text (default) — strips whitespace
readers.FixedColumn('amount', 13, 22, 'float'), # parsed to float; None if blank
readers.FixedColumn('claim_date', 23, 32, 'date'), # parsed to Python date object
readers.FixedColumn('status', 33, width=2), # 2-char status code, text
]
with readers.FixedReader(open('claims.txt'), columns) as reader:
for claim in reader:
print(f"{claim.claim_id}: ${claim.amount} on {claim.claim_date}")
Column types:
Type |
Behavior |
|---|---|
|
Strips leading/trailing whitespace, returns string |
|
Converts to integer; returns |
|
Converts to float; returns |
|
Parses to |
|
Parses to |
|
Parses to |
align and pad_char — output formatting for to_line()
These parameters only affect how FixedWidthRecord.to_line() reconstructs a line; they are ignored during reading. When not set, defaults are inferred from column_type:
|
Default alignment |
Default pad_char |
|---|---|---|
|
left |
|
|
right |
|
Accepted alignment values: 'left'/'l'/'<', 'right'/'r'/'>', 'center'/'c'.
# Numeric field — zero-padded, right-aligned by default
readers.FixedColumn('amount', 1, 10, 'int')
# value 42 → '0000000042'
# Override to space-padded right-aligned (common for routing numbers)
readers.FixedColumn('routing_number', 1, 10, align='right', pad_char=' ')
# value '061000104' → ' 061000104'
Note:
alignandpad_charare independent. Explicitly settingalign='left'on anintcolumn does not automatically change the pad character — it will still default to'0'and produce left-aligned zero-padded output ('42 'becomes'4200000000')! When overriding alignment on a numeric column, setpad_char=' 'explicitly too.
Verifying Column Layout
When working from a file specification, use visualize() to confirm your positions match the actual data:
with readers.FixedReader(open('claims.txt'), columns) as reader:
print(reader.visualize())
# Output:
# 1 2 3 4 5 6 7 8 9
# 1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234
# ├├┤├────────┤├────────┤├────┤├──┤├├─┤├┤├├─────────────────────┤├─────────────────────┤├──────┤
# 101 02100002101234567892603011200A094101TEST BANK TEST COMPANY
This prints a character ruler with column boundary markers (|) over several sample lines from the file, making it easy to spot off-by-one errors before processing millions of rows.
Reconstructing Lines — to_line()
Records returned by FixedReader are FixedWidthRecord instances that know their column layout. Call to_line() to reconstruct the original fixed-width line — useful for writing modified records back to a file without disturbing untouched columns.
with readers.FixedReader(open('claims.txt'), columns) as reader:
with open('updated_claims.txt', 'w') as out:
for claim in reader:
if claim.status == 'P':
claim['status'] = 'A'
out.write(claim.to_line() + '\n')
to_line() builds its output by position, not by column order: it creates a space-filled buffer of the total line length and splices each formatted value into its exact byte range. Gaps between defined columns remain as spaces, and columns defined out of position order are placed correctly.
# truncate_overflow=True silently trims values that exceed their column width
# truncate_overflow=False (default) raises ValueError, naming the offending field
claim.to_line(truncate_overflow=True)
See Record Objects — FixedWidthRecord for full details.
EDI / Multi-Record-Type Fixed-Width Files
EDI (Electronic Data Interchange) formats interleave different record types in the same file — each line starts with a type code that determines its layout. Use EDIReader with a dict mapping type codes to column lists.
DBTK ships with pre-defined layouts for NACHA ACH files:
from dbtk.readers.fixed_width import EDIReader
from dbtk.formats.edi import ACH_COLUMNS
with EDIReader(open('payroll.ach'), ACH_COLUMNS) as reader:
for record in reader:
if record.record_type_code == '6': # Entry Detail
print(f"{record.individual_name}: ${int(record.amount) / 100:.2f}")
ACH_COLUMNS covers all standard NACHA record types:
Key |
Record type |
|---|---|
|
File Header |
|
Batch Header |
|
Entry Detail |
|
Addenda |
|
Batch Control |
|
File Control |
For custom multi-record formats, supply your own dict:
custom_layouts = {
'H': [FixedColumn('record_type', 1, 1), FixedColumn('file_date', 2, 9)],
'D': [FixedColumn('record_type', 1, 1), FixedColumn('account_id', 2, 11), ...],
'T': [FixedColumn('record_type', 1, 1), FixedColumn('record_count', 2, 9, 'int')],
}
with EDIReader(open('data.txt'), custom_layouts) as reader:
for record in reader:
process(record)
The type-code key can be any length — EDIReader slices the beginning of each line to match the longest key in your dict.
XML Files
XMLReader needs a record_xpath to locate the repeating record elements.
Auto-Discovered Columns
Text content of direct child elements is discovered automatically — no XMLColumn definitions needed. The reader samples the first few records to find all element names and creates columns for them.
<!-- avatar_chronicles.xml -->
<avatars>
<avatar reincarnation_cycle="148">
<name>Korra</name>
<origin>Southern Water Tribe</origin>
<mastered_elements>4</mastered_elements>
</avatar>
</avatars>
# name, origin, and mastered_elements are discovered automatically
with readers.XMLReader(open('avatar_chronicles.xml'), record_xpath='//avatar') as reader:
for avatar in reader:
print(f"{avatar.name} from {avatar.origin}")
Defining XMLColumn for Attributes and Nested Data
XMLColumn is only needed when the data you want is not the text of a direct child element:
xml_columns = [
# Attribute value on the record element itself
readers.XMLColumn('cycle', xpath='@reincarnation_cycle'),
# Nested sub-element (not a direct child)
readers.XMLColumn('air_move', xpath='.//airbending/signature_move/text()'),
# Explicit text node when you need a specific child (not just element matching)
readers.XMLColumn('full_name', xpath='./name/text()'),
]
with readers.XMLReader(open('avatar_chronicles.xml'),
record_xpath='//avatar',
columns=xml_columns) as reader:
for avatar in reader:
print(f"Cycle {avatar.cycle}: {avatar.full_name} — {avatar.air_move}")
When you need XMLColumn:
Situation |
XPath pattern |
|---|---|
Attribute on the record element |
|
Attribute on a child element |
|
Nested sub-element text |
|
Element in a specific namespace |
|
When you don’t need XMLColumn: any element whose text content is a direct child of the record node is auto-discovered. You only need to add explicit definitions for the fields listed above.
Common Reader Parameters
All readers support these parameters for controlling input processing:
# Skip first 10 data rows, read only 100 rows
reader = readers.CSVReader(
open('data.csv', encoding='utf-8-sig'),
skip_rows=10, # Skip N rows after headers (useful for bad data)
n_rows=100, # Only read first N rows (useful for testing/sampling)
add_row_num=True, # Add '_row_num' field to each record (default True)
)
# Row numbers track position in source file
with readers.get_reader('data.csv', skip_rows=5) as reader:
for record in reader:
print(f"Row {record._row_num}: {record.name}") # _row_num starts at 6 (after skip)
Encoding
All text-based readers default to utf-8-sig, which handles UTF-8 with or without a BOM. If your source files have inconsistent encoding — for example, Excel’s “CSV UTF-8” and “CSV” exports produce different encodings — use encoding='detect' to auto-detect via charset-normalizer:
# Auto-detect encoding — useful when source encoding is inconsistent
with readers.get_reader('export.csv', encoding='detect') as reader:
for record in reader:
print(record.name)
# Works transparently with compressed files too
with readers.get_reader('export.csv.gz', encoding='detect') as reader:
for record in reader:
print(record.name)
encoding='detect' requires charset-normalizer (pip install charset-normalizer, included in dbtk[recommended]). If not installed, a warning is logged and encoding falls back to utf-8-sig.
Filtering Records
Use add_filter() to selectively process records. Multiple filters accumulate in a pipeline — all must return True for a record to be included.
# Filter by column value
with readers.get_reader('soldiers.csv') as reader:
reader.add_filter(lambda r: r.rank == 'Captain')
reader.add_filter(lambda r: r.age >= 25) # Both conditions must be True
for record in reader:
process(record)
# Filter using ValidationCollector (seen in first pass)
from dbtk.etl import ValidationCollector
# First pass: collect valid IDs
valid_titles = ValidationCollector()
with readers.get_reader('titles.csv') as reader:
for record in reader:
valid_titles(record.tconst)
# Second pass: only process records with valid title references
with readers.get_reader('title_principals.csv') as reader:
reader.add_filter(lambda r: r.tconst in valid_titles)
for record in reader:
process(record) # Only records with valid tconst
# Complex filtering logic
with readers.get_reader('orders.csv') as reader:
reader.add_filter(lambda r: r.status == 'active' and r.total > 100)
reader.add_filter(lambda r: r.country in {'US', 'CA', 'MX'})
for record in reader:
process(record)
Key behaviors:
Filters applied after
skip_rowsand null value conversionFilters applied before
n_rowslimitMultiple
add_filter()calls create an AND pipeline (all must pass)Operates on final Record objects with normalized field names
DataFrame Reader
For maximum throughput, use polars to read files and DataFrameReader to stream rows into DBTK pipelines. This works with both polars and Pandas and can use any file format that either library supports. Tip: use the Lazy API and streaming to prevent loading massive files into memory.
import polars as pl
from dbtk.etl import DataSurge
from dbtk.readers import DataFrameReader
# polars rips through CSV files at incredible speed
df = pl.scan_csv('massive_file.csv.gz').collect(engine='streaming') # Handles compression natively
with DataFrameReader(df) as reader:
surge = DataSurge(table)
surge.insert(reader)
Dual Field Name Access
DBTK automatically handles messy field names by providing dual access — original names are preserved while normalized versions are auto-generated for convenient attribute access:
# Original headers from file: ["ID #", "Student Name", "Residency Code", "GPA Score", "Has Holds?"]
with readers.CSVReader(open('data.csv')) as reader:
for record in reader:
# Access by original field names (preserved exactly as they appear)
print(record['ID #'], record['Student Name'])
# Access by normalized field names (lowercased, underscored)
print(record.id, record.student_name)
# Both access the same data
assert record['ID #'] == record.id
assert record['Student Name'] == record.student_name
# Normalization rules:
# - Lowercase conversion
# - Non-alphanumeric characters → underscore
# - Leading underscores preserved (_row_num stays _row_num)
# - Trailing underscores removed
# Examples:
# "ID #" → id
# "Student Name" → student_name
# "GPA Score" → gpa_score
# "Has Holds?" → has_holds
# "_row_num" → _row_num (preserved)
This is particularly useful when processing files from multiple vendors — use normalized attribute access in your code while original names are preserved for exports.