ETL: DataSurge & BulkSurge
High-performance bulk loading for any database. Both classes wrap a configured Table and handle batching, progress tracking, and database-specific optimisations automatically.
DataSurge
The problem: Processing thousands or millions of records row-by-row is painfully slow. You need batching, but implementing it correctly is complex.
The solution: DataSurge handles batching, error tracking, and optimal merge strategies automatically. It’s built for high-volume data processing.
DBTK will automatically:
Check that all key columns are included in the data source and throw an exception if any are missing.
Check that non-key columns are included in the data source. If missing, log warnings and set the columns as ‘no_update’ to prevent database values from being nulled out by a misconfigured file.
Check that all required columns are populated.
Display progress tracker when processing large inputs.
Keep track of how many records were read, processed, skipped, along with elapsed time.
Keep track of metadata about why records where skipped (which missing columns) and line number for the first 20 skipped records to aid in troubleshooting.
Log information about the run.
from dbtk.etl import DataSurge
# Define table configuration
recruit_table = dbtk.etl.Table('fire_nation_soldiers', columns_config, cursor)
# Create DataSurge instance for bulk operations
bulk_writer = DataSurge(recruit_table, batch_size=2000)
# Bulk insert with batching
with dbtk.readers.get_reader('massive_conscript_list.csv') as reader:
errors = bulk_writer.insert(reader)
print(f"Inserted {recruit_table.counts['insert']} records with {errors} errors")
# Bulk merge (upsert) operations
with dbtk.readers.get_reader('soldier_updates.csv') as reader:
errors = bulk_writer.merge(reader)
Performance impact: DataSurge can be 10-100x faster than row-by-row operations, depending on your database and network latency.
Pass Through Mode
DataSurge and BulkSurge both support pass_through mode. Pass through mode uses the data directly from the reader or cursor.
Fastest possible execution, in a running-with-scissors kind of way
Source data must exactly match column order and count
No transforms or parameter reordering will be done
No columns with
db_exprare allowedDataSurge only supports INSERT operations
Great for copying from database to database
import dbtk
source_cursor = dbtk.connect('prod_db').cursor()
target_cursor = dbtk.connect('report_db').cursor()
source_cursor.execute('SELECT * FROM fire_nation_soldiers')
# Define table configuration
recruit_table = dbtk.etl.Table('fire_nation_soldiers', columns_config, target_cursor)
# create DataSurge in pass_through mode
bulk_writer = dbtk.etl.DataSurge(recruit_table, pass_through=True)
bulk_writer.insert(source_cursor)
BulkSurge
BulkSurge provides maximum throughput by leveraging database-specific bulk loading mechanisms. It supports both direct streaming (zero temp files) and external tool-based loading depending on your database and requirements.
Supported Databases
PostgreSQL & Redshift
Direct: Uses COPY FROM STDIN protocol with background writer thread
Streaming with zero temp files
200K+ rec/s sustained throughput
Oracle
Direct (default): python-oracledb direct_path_load API (requires 3.4+)
External: SQL*Loader (sqlldr) with auto-generated control files
Both methods: 200K+ rec/s
MySQL & MariaDB
Direct (default): LOAD DATA LOCAL INFILE with streaming buffer
External: Checks local_infile setting, streams or dumps CSV
Requires local_infile=1 on server for direct loading
SQL Server
External only: Uses bcp (bulk copy program) utility
Requires named connection from config for credentials
Supports SQL auth and Windows integrated auth
Basic Usage
from dbtk.etl import BulkSurge
# Define table (no db_expr columns allowed - BulkSurge loads raw data)
sensor_table = dbtk.etl.Table('sensor_readings', {
'sensor_id': {'field': 'id', 'primary_key': True},
'timestamp': {'field': 'ts', 'fn': 'datetime'},
'value': {'field': 'reading', 'fn': 'float'},
}, cursor=cursor)
# Direct streaming (default) - zero temp files
surge = BulkSurge(sensor_table, batch_size=50000)
with dbtk.readers.get_reader('sensor_data.csv.gz') as reader:
count = surge.load(reader) # Streams data directly
print(f"Loaded {count:,} records")
Loading Methods
BulkSurge automatically selects the optimal loading strategy, but you can override with the method parameter:
Direct Method (default) Streams data directly over the connected cursor.
# Streams data using native Python drivers - no temp files
surge.load(reader) # method='direct' is default
# Works for: PostgreSQL, Oracle (with python-oracledb 3.4+), MySQL (with local_infile=1)
External Method Dumps data to a file (typically CSV) and calls an external tool (bcp, sqlldr)
# Uses command-line tools - requires named connection
surge.load(reader, method='external')
# Oracle: Uses SQL*Loader (sqlldr)
# MySQL: Falls back to direct if local_infile enabled, else dumps CSV
# SQL Server: Uses bcp (only method available)
Database-Specific Examples
PostgreSQL - Direct Streaming
db = dbtk.connect('postgres_prod')
table = dbtk.etl.Table('events', columns_config, cursor=db.cursor())
surge = BulkSurge(table)
with dbtk.readers.get_reader('events.csv.gz') as reader:
surge.load(reader) # COPY FROM STDIN, zero temp files
Oracle - SQL*Loader
# Requires named connection for credentials
db = dbtk.connect('oracle_prod') # Named connection required
table = dbtk.etl.Table('schema.table_name', columns_config, cursor=db.cursor())
surge = BulkSurge(table)
with dbtk.readers.get_reader('data.csv') as reader:
surge.load(reader, method='external', dump_path='/staging')
# Creates CSV + control file, invokes sqlldr, cleans up temp files
MySQL - Streaming with Fallback
db = dbtk.connect('mysql_prod')
table = dbtk.etl.Table('orders', columns_config, cursor=db.cursor())
surge = BulkSurge(table)
# Direct method checks server configuration automatically
surge.load(reader, method='direct')
# If local_infile=1: streams with LOAD DATA LOCAL INFILE
# If local_infile=0: dumps CSV and logs manual load instructions
SQL Server - bcp with Named Connection
# IMPORTANT: SQL Server requires named connection from config
db = dbtk.connect('mssql_prod') # Must use named connection
table = dbtk.etl.Table('dbo.orders', columns_config, cursor=db.cursor())
surge = BulkSurge(table)
with dbtk.readers.get_reader('orders.csv') as reader:
surge.load(reader) # Uses bcp automatically
# Supports both SQL auth (-U/-P) and Windows integrated auth (-T)
SQL Server - Windows Integrated Auth
# Config file (dbtk.yml) - no user/password for integrated auth
connections:
mssql_prod:
type: sqlserver
host: sql-server.company.com
database: production
# No user/password = uses Windows integrated auth
Controlling Temp File Location
For external methods that create temp files, control the location with dump_path:
# Specify exact file path
surge.load(reader, method='external', dump_path='/staging/mydata.csv')
# Specify directory (auto-generates timestamped filename)
surge.load(reader, method='external', dump_path='/staging')
# Creates: /staging/orders_20260206_143022.csv
# Use configured directory from settings
# settings['data_dump_dir'] = '/data/staging'
surge.load(reader, method='external') # Uses configured directory
# Fallback to temp directory if not specified
surge.load(reader, method='external') # Uses tempfile.gettempdir()
Named Connections for External Tools
External tools (bcp, sqlldr) require credentials. In order to get those credentials, the table’s cursor object must be from a named connection (from dbtk.yml settings) instead of manually configured.
Manual CSV Export
For databases not yet supported, or when you need custom loading parameters, use dump() to export transformed data:
# Export to CSV for manual loading
surge = BulkSurge(table)
with dbtk.readers.get_reader('source.csv.gz') as reader:
csv_path = surge.dump(reader, 'staging/transformed.csv')
# Then load manually:
# SQL Server: bcp mydb.dbo.mytable in staging/transformed.csv -c -t, -S server -U user -P pass
# MySQL: LOAD DATA INFILE 'staging/transformed.csv' INTO TABLE mytable ...
# Snowflake: COPY INTO mytable FROM 'staging/transformed.csv' ...
Oracle Auto-generation:
When connected to Oracle, dump() automatically generates a SQL*Loader control file (.ctl) alongside the CSV and logs the sqlldr command:
db = dbtk.connect('oracle_prod') # Oracle connection
surge = BulkSurge(table)
surge.dump(reader, '/staging/export.csv')
# Automatically creates:
# /staging/export.csv (data file)
# /staging/export_a1b2c3d4.ctl (control file with unique suffix)
#
# Logs show the sqlldr command to run:
# sqlldr userid=USER/PASS@DB control=/staging/export_a1b2c3d4.ctl data=/staging/export.csv
BulkSurge vs DataSurge Comparison
Feature |
DataSurge |
BulkSurge |
|---|---|---|
Speed |
90-120K rec/s |
200K+ rec/s |
Method |
executemany batching |
Native bulk loading |
Temp files |
Never |
Only for external tools |
db_expr support |
Yes |
No (raw data only) |
MERGE/upsert |
Yes |
No (INSERT only) |
pass_through |
INSERT only |
If no |
Databases |
All (universal) |
PostgreSQL, Oracle, MySQL, SQL Server |
Setup |
Works everywhere |
May require server config (MySQL local_infile) |
Credentials |
Uses connection |
External tools need named connection |
When to Use BulkSurge
Use BulkSurge when:
Loading millions of rows
Simple INSERT operations (no upsert/merge needed)
No database functions required (
db_exprnot used)Maximum throughput is critical
You have appropriate server permissions (MySQL local_infile, etc.)
Use DataSurge when:
Need UPDATE, MERGE/upsert operations
Using
db_exprfor database functionsLoading moderate datasets (< 5M records)
Want universal compatibility without configuration
Don’t have server-level permissions
Troubleshooting
MySQL: “LOAD DATA LOCAL INFILE forbidden”
# Check server configuration
cursor.execute("SELECT @@local_infile")
# If 0: contact DBA to enable or use external method
surge.load(reader, method='external') # Dumps CSV with instructions
SQL Server: “connection_name missing”
# Use named connection, not direct connection
db = dbtk.connect('mssql_prod') # Not: dbtk.sqlserver(host=..., user=..., password=...)
Oracle: “direct_path_load not supported”
# Requires python-oracledb 3.4+
# Solution: Upgrade driver or use external method
surge.load(reader, method='external') # Uses SQL*Loader
“Table has db_expr columns”
# BulkSurge doesn't support database expressions
# Solution: Use DataSurge or remove db_expr from column config
See Also
ETL: Table & Transforms - Table configuration, field mapping, transforms
ETL: Tools & Logging - IdentityManager, ValidationCollector, logging
Database Connections - Connections, cursors, SQL file execution
Examples:
examples/data_load_imdb_subset.py—DataSurgeETL pipeline loading a filtered IMDB subset with validation
examples/bulk_load_imdb_subset_pg.py—BulkSurgehigh-throughput variant for PostgreSQL
examples/bulk_load_titles.py—BulkSurgeloading 12M+ title records via PolarsDataFrameReader