ETL: DataSurge & BulkSurge

High-performance bulk loading for any database. Both classes wrap a configured Table and handle batching, progress tracking, and database-specific optimisations automatically.

DataSurge

The problem: Processing thousands or millions of records row-by-row is painfully slow. You need batching, but implementing it correctly is complex.

The solution: DataSurge handles batching, error tracking, and optimal merge strategies automatically. It’s built for high-volume data processing.

DBTK will automatically:

  • Check that all key columns are included in the data source and throw an exception if any are missing.

  • Check that non-key columns are included in the data source. If missing, log warnings and set the columns as ‘no_update’ to prevent database values from being nulled out by a misconfigured file.

  • Check that all required columns are populated.

  • Display progress tracker when processing large inputs.

  • Keep track of how many records were read, processed, skipped, along with elapsed time.

  • Keep track of metadata about why records where skipped (which missing columns) and line number for the first 20 skipped records to aid in troubleshooting.

  • Log information about the run.

from dbtk.etl import DataSurge

# Define table configuration
recruit_table = dbtk.etl.Table('fire_nation_soldiers', columns_config, cursor)

# Create DataSurge instance for bulk operations
bulk_writer = DataSurge(recruit_table, batch_size=2000)

# Bulk insert with batching
with dbtk.readers.get_reader('massive_conscript_list.csv') as reader:
    errors = bulk_writer.insert(reader)
    print(f"Inserted {recruit_table.counts['insert']} records with {errors} errors")

# Bulk merge (upsert) operations
with dbtk.readers.get_reader('soldier_updates.csv') as reader:
    errors = bulk_writer.merge(reader)

Performance impact: DataSurge can be 10-100x faster than row-by-row operations, depending on your database and network latency.

Pass Through Mode

DataSurge and BulkSurge both support pass_through mode. Pass through mode uses the data directly from the reader or cursor.

  • Fastest possible execution, in a running-with-scissors kind of way

  • Source data must exactly match column order and count

  • No transforms or parameter reordering will be done

  • No columns with db_expr are allowed

  • DataSurge only supports INSERT operations

  • Great for copying from database to database

import dbtk

source_cursor = dbtk.connect('prod_db').cursor()
target_cursor = dbtk.connect('report_db').cursor()

source_cursor.execute('SELECT * FROM fire_nation_soldiers')

# Define table configuration
recruit_table = dbtk.etl.Table('fire_nation_soldiers', columns_config, target_cursor)
# create DataSurge in pass_through mode
bulk_writer = dbtk.etl.DataSurge(recruit_table, pass_through=True)
bulk_writer.insert(source_cursor)

BulkSurge

BulkSurge provides maximum throughput by leveraging database-specific bulk loading mechanisms. It supports both direct streaming (zero temp files) and external tool-based loading depending on your database and requirements.

Supported Databases

PostgreSQL & Redshift

  • Direct: Uses COPY FROM STDIN protocol with background writer thread

  • Streaming with zero temp files

  • 200K+ rec/s sustained throughput

Oracle

  • Direct (default): python-oracledb direct_path_load API (requires 3.4+)

  • External: SQL*Loader (sqlldr) with auto-generated control files

  • Both methods: 200K+ rec/s

MySQL & MariaDB

  • Direct (default): LOAD DATA LOCAL INFILE with streaming buffer

  • External: Checks local_infile setting, streams or dumps CSV

  • Requires local_infile=1 on server for direct loading

SQL Server

  • External only: Uses bcp (bulk copy program) utility

  • Requires named connection from config for credentials

  • Supports SQL auth and Windows integrated auth

Basic Usage

from dbtk.etl import BulkSurge

# Define table (no db_expr columns allowed - BulkSurge loads raw data)
sensor_table = dbtk.etl.Table('sensor_readings', {
    'sensor_id': {'field': 'id', 'primary_key': True},
    'timestamp': {'field': 'ts', 'fn': 'datetime'},
    'value': {'field': 'reading', 'fn': 'float'},
}, cursor=cursor)

# Direct streaming (default) - zero temp files
surge = BulkSurge(sensor_table, batch_size=50000)
with dbtk.readers.get_reader('sensor_data.csv.gz') as reader:
    count = surge.load(reader)  # Streams data directly
    print(f"Loaded {count:,} records")

Loading Methods

BulkSurge automatically selects the optimal loading strategy, but you can override with the method parameter:

Direct Method (default) Streams data directly over the connected cursor.

# Streams data using native Python drivers - no temp files
surge.load(reader)  # method='direct' is default

# Works for: PostgreSQL, Oracle (with python-oracledb 3.4+), MySQL (with local_infile=1)

External Method Dumps data to a file (typically CSV) and calls an external tool (bcp, sqlldr)

# Uses command-line tools - requires named connection
surge.load(reader, method='external')

# Oracle: Uses SQL*Loader (sqlldr)
# MySQL: Falls back to direct if local_infile enabled, else dumps CSV
# SQL Server: Uses bcp (only method available)

Database-Specific Examples

PostgreSQL - Direct Streaming

db = dbtk.connect('postgres_prod')
table = dbtk.etl.Table('events', columns_config, cursor=db.cursor())
surge = BulkSurge(table)

with dbtk.readers.get_reader('events.csv.gz') as reader:
    surge.load(reader)  # COPY FROM STDIN, zero temp files

Oracle - SQL*Loader

# Requires named connection for credentials
db = dbtk.connect('oracle_prod')  # Named connection required
table = dbtk.etl.Table('schema.table_name', columns_config, cursor=db.cursor())
surge = BulkSurge(table)

with dbtk.readers.get_reader('data.csv') as reader:
    surge.load(reader, method='external', dump_path='/staging')
    # Creates CSV + control file, invokes sqlldr, cleans up temp files

MySQL - Streaming with Fallback

db = dbtk.connect('mysql_prod')
table = dbtk.etl.Table('orders', columns_config, cursor=db.cursor())
surge = BulkSurge(table)

# Direct method checks server configuration automatically
surge.load(reader, method='direct')
# If local_infile=1: streams with LOAD DATA LOCAL INFILE
# If local_infile=0: dumps CSV and logs manual load instructions

SQL Server - bcp with Named Connection

# IMPORTANT: SQL Server requires named connection from config
db = dbtk.connect('mssql_prod')  # Must use named connection
table = dbtk.etl.Table('dbo.orders', columns_config, cursor=db.cursor())
surge = BulkSurge(table)

with dbtk.readers.get_reader('orders.csv') as reader:
    surge.load(reader)  # Uses bcp automatically
    # Supports both SQL auth (-U/-P) and Windows integrated auth (-T)

SQL Server - Windows Integrated Auth

# Config file (dbtk.yml) - no user/password for integrated auth
connections:
  mssql_prod:
    type: sqlserver
    host: sql-server.company.com
    database: production
    # No user/password = uses Windows integrated auth

Controlling Temp File Location

For external methods that create temp files, control the location with dump_path:

# Specify exact file path
surge.load(reader, method='external', dump_path='/staging/mydata.csv')

# Specify directory (auto-generates timestamped filename)
surge.load(reader, method='external', dump_path='/staging')
# Creates: /staging/orders_20260206_143022.csv

# Use configured directory from settings
# settings['data_dump_dir'] = '/data/staging'
surge.load(reader, method='external')  # Uses configured directory

# Fallback to temp directory if not specified
surge.load(reader, method='external')  # Uses tempfile.gettempdir()

Named Connections for External Tools

External tools (bcp, sqlldr) require credentials. In order to get those credentials, the table’s cursor object must be from a named connection (from dbtk.yml settings) instead of manually configured.

Manual CSV Export

For databases not yet supported, or when you need custom loading parameters, use dump() to export transformed data:

# Export to CSV for manual loading
surge = BulkSurge(table)
with dbtk.readers.get_reader('source.csv.gz') as reader:
    csv_path = surge.dump(reader, 'staging/transformed.csv')

# Then load manually:
# SQL Server: bcp mydb.dbo.mytable in staging/transformed.csv -c -t, -S server -U user -P pass
# MySQL: LOAD DATA INFILE 'staging/transformed.csv' INTO TABLE mytable ...
# Snowflake: COPY INTO mytable FROM 'staging/transformed.csv' ...

Oracle Auto-generation:

When connected to Oracle, dump() automatically generates a SQL*Loader control file (.ctl) alongside the CSV and logs the sqlldr command:

db = dbtk.connect('oracle_prod')  # Oracle connection
surge = BulkSurge(table)
surge.dump(reader, '/staging/export.csv')

# Automatically creates:
#   /staging/export.csv           (data file)
#   /staging/export_a1b2c3d4.ctl  (control file with unique suffix)
#
# Logs show the sqlldr command to run:
#   sqlldr userid=USER/PASS@DB control=/staging/export_a1b2c3d4.ctl data=/staging/export.csv

BulkSurge vs DataSurge Comparison

Feature

DataSurge

BulkSurge

Speed

90-120K rec/s

200K+ rec/s

Method

executemany batching

Native bulk loading

Temp files

Never

Only for external tools

db_expr support

Yes

No (raw data only)

MERGE/upsert

Yes

No (INSERT only)

pass_through

INSERT only

If no db_expr columns

Databases

All (universal)

PostgreSQL, Oracle, MySQL, SQL Server

Setup

Works everywhere

May require server config (MySQL local_infile)

Credentials

Uses connection

External tools need named connection

When to Use BulkSurge

Use BulkSurge when:

  • Loading millions of rows

  • Simple INSERT operations (no upsert/merge needed)

  • No database functions required (db_expr not used)

  • Maximum throughput is critical

  • You have appropriate server permissions (MySQL local_infile, etc.)

Use DataSurge when:

  • Need UPDATE, MERGE/upsert operations

  • Using db_expr for database functions

  • Loading moderate datasets (< 5M records)

  • Want universal compatibility without configuration

  • Don’t have server-level permissions

Troubleshooting

MySQL: “LOAD DATA LOCAL INFILE forbidden”

# Check server configuration
cursor.execute("SELECT @@local_infile")
# If 0: contact DBA to enable or use external method
surge.load(reader, method='external')  # Dumps CSV with instructions

SQL Server: “connection_name missing”

# Use named connection, not direct connection
db = dbtk.connect('mssql_prod')  # Not: dbtk.sqlserver(host=..., user=..., password=...)

Oracle: “direct_path_load not supported”

# Requires python-oracledb 3.4+
# Solution: Upgrade driver or use external method
surge.load(reader, method='external')  # Uses SQL*Loader

“Table has db_expr columns”

# BulkSurge doesn't support database expressions
# Solution: Use DataSurge or remove db_expr from column config

See Also

Examples: