# ETL: DataSurge & BulkSurge High-performance bulk loading for any database. Both classes wrap a configured `Table` and handle batching, progress tracking, and database-specific optimisations automatically. ## DataSurge
**The problem:** Processing thousands or millions of records row-by-row is painfully slow. You need batching, but implementing it correctly is complex. **The solution:** DataSurge handles batching, error tracking, and optimal merge strategies automatically. It's built for high-volume data processing. DBTK will automatically: - Check that all key columns are included in the data source and throw an exception if any are missing. - Check that non-key columns are included in the data source. If missing, log warnings and set the columns as 'no_update' to prevent database values from being nulled out by a misconfigured file. - Check that all required columns are populated. - Display progress tracker when processing large inputs. - Keep track of how many records were read, processed, skipped, along with elapsed time. - Keep track of metadata about why records where skipped (which missing columns) and line number for the first 20 skipped records to aid in troubleshooting. - Log information about the run. ```python from dbtk.etl import DataSurge # Define table configuration recruit_table = dbtk.etl.Table('fire_nation_soldiers', columns_config, cursor) # Create DataSurge instance for bulk operations bulk_writer = DataSurge(recruit_table, batch_size=2000) # Bulk insert with batching with dbtk.readers.get_reader('massive_conscript_list.csv') as reader: errors = bulk_writer.insert(reader) print(f"Inserted {recruit_table.counts['insert']} records with {errors} errors") # Bulk merge (upsert) operations with dbtk.readers.get_reader('soldier_updates.csv') as reader: errors = bulk_writer.merge(reader) ``` **Performance impact:** DataSurge can be 10-100x faster than row-by-row operations, depending on your database and network latency. ### Pass Through Mode DataSurge and BulkSurge both support `pass_through` mode. Pass through mode uses the data directly from the reader or cursor. - Fastest possible execution, in a running-with-scissors kind of way - Source data must exactly match column order and count - No transforms or parameter reordering will be done - No columns with `db_expr` are allowed - DataSurge only supports INSERT operations - Great for copying from database to database ```python import dbtk source_cursor = dbtk.connect('prod_db').cursor() target_cursor = dbtk.connect('report_db').cursor() source_cursor.execute('SELECT * FROM fire_nation_soldiers') # Define table configuration recruit_table = dbtk.etl.Table('fire_nation_soldiers', columns_config, target_cursor) # create DataSurge in pass_through mode bulk_writer = dbtk.etl.DataSurge(recruit_table, pass_through=True) bulk_writer.insert(source_cursor) ``` ## BulkSurge
BulkSurge provides maximum throughput by leveraging database-specific bulk loading mechanisms. It supports both **direct streaming** (zero temp files) and **external tool-based** loading depending on your database and requirements. ### Supported Databases **PostgreSQL & Redshift** - **Direct:** Uses COPY FROM STDIN protocol with background writer thread - Streaming with zero temp files - 200K+ rec/s sustained throughput **Oracle** - **Direct (default):** python-oracledb direct_path_load API (requires 3.4+) - **External:** SQL\*Loader (sqlldr) with auto-generated control files - Both methods: 200K+ rec/s **MySQL & MariaDB** - **Direct (default):** LOAD DATA LOCAL INFILE with streaming buffer - **External:** Checks local_infile setting, streams or dumps CSV - Requires local_infile=1 on server for direct loading **SQL Server** - **External only:** Uses bcp (bulk copy program) utility - Requires named connection from config for credentials - Supports SQL auth and Windows integrated auth ### Basic Usage ```python from dbtk.etl import BulkSurge # Define table (no db_expr columns allowed - BulkSurge loads raw data) sensor_table = dbtk.etl.Table('sensor_readings', { 'sensor_id': {'field': 'id', 'primary_key': True}, 'timestamp': {'field': 'ts', 'fn': 'datetime'}, 'value': {'field': 'reading', 'fn': 'float'}, }, cursor=cursor) # Direct streaming (default) - zero temp files surge = BulkSurge(sensor_table, batch_size=50000) with dbtk.readers.get_reader('sensor_data.csv.gz') as reader: count = surge.load(reader) # Streams data directly print(f"Loaded {count:,} records") ``` ### Loading Methods BulkSurge automatically selects the optimal loading strategy, but you can override with the `method` parameter: **Direct Method (default)** Streams data directly over the connected cursor. ```python # Streams data using native Python drivers - no temp files surge.load(reader) # method='direct' is default # Works for: PostgreSQL, Oracle (with python-oracledb 3.4+), MySQL (with local_infile=1) ``` **External Method** Dumps data to a file (typically CSV) and calls an external tool (`bcp`, `sqlldr`) ```python # Uses command-line tools - requires named connection surge.load(reader, method='external') # Oracle: Uses SQL*Loader (sqlldr) # MySQL: Falls back to direct if local_infile enabled, else dumps CSV # SQL Server: Uses bcp (only method available) ``` ### Database-Specific Examples **PostgreSQL - Direct Streaming** ```python db = dbtk.connect('postgres_prod') table = dbtk.etl.Table('events', columns_config, cursor=db.cursor()) surge = BulkSurge(table) with dbtk.readers.get_reader('events.csv.gz') as reader: surge.load(reader) # COPY FROM STDIN, zero temp files ``` **Oracle - SQL\*Loader** ```python # Requires named connection for credentials db = dbtk.connect('oracle_prod') # Named connection required table = dbtk.etl.Table('schema.table_name', columns_config, cursor=db.cursor()) surge = BulkSurge(table) with dbtk.readers.get_reader('data.csv') as reader: surge.load(reader, method='external', dump_path='/staging') # Creates CSV + control file, invokes sqlldr, cleans up temp files ``` **MySQL - Streaming with Fallback** ```python db = dbtk.connect('mysql_prod') table = dbtk.etl.Table('orders', columns_config, cursor=db.cursor()) surge = BulkSurge(table) # Direct method checks server configuration automatically surge.load(reader, method='direct') # If local_infile=1: streams with LOAD DATA LOCAL INFILE # If local_infile=0: dumps CSV and logs manual load instructions ``` **SQL Server - bcp with Named Connection** ```python # IMPORTANT: SQL Server requires named connection from config db = dbtk.connect('mssql_prod') # Must use named connection table = dbtk.etl.Table('dbo.orders', columns_config, cursor=db.cursor()) surge = BulkSurge(table) with dbtk.readers.get_reader('orders.csv') as reader: surge.load(reader) # Uses bcp automatically # Supports both SQL auth (-U/-P) and Windows integrated auth (-T) ``` **SQL Server - Windows Integrated Auth** ```yaml # Config file (dbtk.yml) - no user/password for integrated auth connections: mssql_prod: type: sqlserver host: sql-server.company.com database: production # No user/password = uses Windows integrated auth ``` ### Controlling Temp File Location For external methods that create temp files, control the location with `dump_path`: ```python # Specify exact file path surge.load(reader, method='external', dump_path='/staging/mydata.csv') # Specify directory (auto-generates timestamped filename) surge.load(reader, method='external', dump_path='/staging') # Creates: /staging/orders_20260206_143022.csv # Use configured directory from settings # settings['data_dump_dir'] = '/data/staging' surge.load(reader, method='external') # Uses configured directory # Fallback to temp directory if not specified surge.load(reader, method='external') # Uses tempfile.gettempdir() ``` ### Named Connections for External Tools External tools (bcp, sqlldr) require credentials. In order to get those credentials, the table's cursor object must be from a named connection (from dbtk.yml settings) instead of manually configured. ### Manual CSV Export For databases not yet supported, or when you need custom loading parameters, use `dump()` to export transformed data: ```python # Export to CSV for manual loading surge = BulkSurge(table) with dbtk.readers.get_reader('source.csv.gz') as reader: csv_path = surge.dump(reader, 'staging/transformed.csv') # Then load manually: # SQL Server: bcp mydb.dbo.mytable in staging/transformed.csv -c -t, -S server -U user -P pass # MySQL: LOAD DATA INFILE 'staging/transformed.csv' INTO TABLE mytable ... # Snowflake: COPY INTO mytable FROM 'staging/transformed.csv' ... ``` **Oracle Auto-generation:** When connected to Oracle, `dump()` automatically generates a SQL\*Loader control file (.ctl) alongside the CSV and logs the `sqlldr` command: ```python db = dbtk.connect('oracle_prod') # Oracle connection surge = BulkSurge(table) surge.dump(reader, '/staging/export.csv') # Automatically creates: # /staging/export.csv (data file) # /staging/export_a1b2c3d4.ctl (control file with unique suffix) # # Logs show the sqlldr command to run: # sqlldr userid=USER/PASS@DB control=/staging/export_a1b2c3d4.ctl data=/staging/export.csv ``` ### BulkSurge vs DataSurge Comparison | Feature | DataSurge | BulkSurge | |---------------------|----------------------|------------------------------------------------| | **Speed** | 90-120K rec/s | 200K+ rec/s | | **Method** | executemany batching | Native bulk loading | | **Temp files** | Never | Only for external tools | | **db_expr support** | Yes | No (raw data only) | | **MERGE/upsert** | Yes | No (INSERT only) | | **pass_through** | INSERT only | If no `db_expr` columns | | **Databases** | All (universal) | PostgreSQL, Oracle, MySQL, SQL Server | | **Setup** | Works everywhere | May require server config (MySQL local_infile) | | **Credentials** | Uses connection | External tools need named connection | ### When to Use BulkSurge **Use BulkSurge when:** - Loading millions of rows - Simple INSERT operations (no upsert/merge needed) - No database functions required (`db_expr` not used) - Maximum throughput is critical - You have appropriate server permissions (MySQL local_infile, etc.) **Use DataSurge when:** - Need UPDATE, MERGE/upsert operations - Using `db_expr` for database functions - Loading moderate datasets (< 5M records) - Want universal compatibility without configuration - Don't have server-level permissions ### Troubleshooting **MySQL: "LOAD DATA LOCAL INFILE forbidden"** ```python # Check server configuration cursor.execute("SELECT @@local_infile") # If 0: contact DBA to enable or use external method surge.load(reader, method='external') # Dumps CSV with instructions ``` **SQL Server: "connection_name missing"** ```python # Use named connection, not direct connection db = dbtk.connect('mssql_prod') # Not: dbtk.sqlserver(host=..., user=..., password=...) ``` **Oracle: "direct_path_load not supported"** ```python # Requires python-oracledb 3.4+ # Solution: Upgrade driver or use external method surge.load(reader, method='external') # Uses SQL*Loader ``` **"Table has db_expr columns"** ```python # BulkSurge doesn't support database expressions # Solution: Use DataSurge or remove db_expr from column config ``` ## See Also - [ETL: Table & Transforms](07-table.md) - Table configuration, field mapping, transforms - [ETL: Tools & Logging](09-etl-tools.md) - IdentityManager, ValidationCollector, logging - [Database Connections](03-database-connections.md) - Connections, cursors, SQL file execution > **Examples:** > - [`examples/data_load_imdb_subset.py`](../examples/data_load_imdb_subset.py) — `DataSurge` ETL pipeline loading a filtered IMDB subset with validation > - [`examples/bulk_load_imdb_subset_pg.py`](../examples/bulk_load_imdb_subset_pg.py) — `BulkSurge` high-throughput variant for PostgreSQL > - [`examples/bulk_load_titles.py`](../examples/bulk_load_titles.py) — `BulkSurge` loading 12M+ title records via Polars `DataFrameReader`