# dbtk/etl/bulk_surge.py
import csv
import logging
import time
import queue
import subprocess
import threading
import time
from pathlib import Path
from textwrap import dedent
from typing import Iterable, Union, Optional
import dbtk.config
from .base_surge import BaseSurge
from ..writers.csv import CSVWriter
from ..record import Record
logger = logging.getLogger(__name__)
class DequeBuffer:
"""File-like queue buffer for streaming to copy_expert."""
def __init__(self, max_rows = 50_000):
self._queue = queue.Queue(maxsize=max_rows)
self.closed = False
def write(self, data):
self._queue.put(data)
def read(self, size=-1):
if self.closed and self._queue.empty():
return '' # EOF
try:
# Block for up to 0.1 seconds waiting for data
return self._queue.get(timeout=0.1)
except queue.Empty:
# If closed and empty, EOF. Otherwise keep trying
if self.closed:
return ''
return self.read(size) # Retry
def close(self):
self.closed = True
[docs]
class BulkSurge(BaseSurge):
"""
Lightning-fast bulk loading using native database tools and streaming.
BulkSurge provides high-performance data loading by leveraging database-specific
bulk loading mechanisms. It supports both direct streaming (zero temp files) and
external tool-based loading (bcp, SQL*Loader) depending on the database and method chosen.
Supported Databases
-------------------
* **PostgreSQL/Redshift**: COPY FROM STDIN (streaming, no temp files)
* **Oracle**: direct_path_load (streaming) or SQL*Loader (external tool)
* **MySQL/MariaDB**: LOAD DATA LOCAL INFILE via temp file (external only)
* **SQL Server**: bcp utility (external tool, requires named connection)
Loading Methods
---------------
* **direct** (default): Uses native Python drivers for streaming bulk loads
- Postgres: COPY protocol with background writer thread
- Oracle: direct_path_load API (requires python-oracledb 3.4+)
* **external**: Dumps CSV then loads via command-line tool or SQL command
- Oracle: SQL*Loader (sqlldr) with auto-generated control file
- MySQL/MariaDB: LOAD DATA LOCAL INFILE from temp CSV file
- SQL Server: bcp utility (only external method available)
Performance Notes
-----------------
- BulkSurge is memory-efficient: uses batching and streaming to handle large datasets
- Direct methods are typically faster and require no temp files
- External tools require credentials from config file (see connection_name)
- Tables with db_expr columns are incompatible (use DataSurge instead)
Parameters
----------
table : Table
Table instance with column definitions and cursor
batch_size : int, optional
Number of records per batch (default: 10,000)
pass_through : bool, optional
Skip transformation and validation, using source data directly (default: False).
**When to use:**
- Database-to-database copies with identical schemas
- Pre-transformed data from upstream pipelines (already validated)
- Cursor-to-cursor streaming (maximum throughput)
- Raw positional tuples pre-ordered for binding parameters
**What's skipped:** Field mapping, type coercion, default values, null value
handling, required field validation, and Table.set_values() overhead.
**Warning:** Do NOT use if records might have missing required fields, mismatched
field names, need type transformations, or data quality is uncertain. All database
constraints (primary keys, foreign keys) still apply.
Attributes
----------
total_read : int
Total rows read from source. 1-based (first row = 1). Includes
both loaded and skipped rows.
total_loaded : int
Total rows successfully loaded.
skipped : int
Total rows skipped due to missing required fields.
skip_details : dict
Skip tracking grouped by reason. Key is a frozenset of missing
required field names. Value is a dict with:
- ``count``: total rows skipped for this reason
- ``sample``: list of up to 20 1-based row numbers (for debugging)
Example::
{frozenset({'primary_name'}): {'count': 5, 'sample': [937887, 957847, ...]}}
dump_path : Path or None
Path of the last file written by dump(). Set after each dump() call.
Examples
--------
PostgreSQL streaming (zero temp files)::
with BulkSurge(table) as surge:
surge.load(reader) # Uses COPY FROM STDIN
Oracle with SQL*Loader::
surge = BulkSurge(table)
surge.load(reader, method='external') # Uses sqlldr
MySQL with custom dump location::
surge = BulkSurge(table)
surge.load(reader, method='external', dump_path='/data/staging')
SQL Server with bcp (requires config)::
db = dbtk.connect('prod_mssql') # Named connection required
surge = BulkSurge(table)
surge.load(reader) # Uses bcp
Fast database-to-database copy (matching schemas)::
# Source and destination schemas match exactly
surge = BulkSurge(dest_table, pass_through=True)
surge.load(source_cursor.fetchall())
Pre-transformed data (already validated)::
# Data already transformed and validated by upstream process
surge = BulkSurge(table, pass_through=True)
surge.load(validated_records)
See Also
--------
DataSurge : Standard bulk operations using executemany
Table : Table definition and schema management
"""
[docs]
def __init__(self, table, batch_size: int = 10_000, pass_through: bool = False):
super().__init__(table, batch_size=batch_size, pass_through=pass_through)
# Make sure Table had built out insert query and parameter info
self.operation = 'insert'
self.table.get_sql('insert')
# Make sure Table columns are compatible with bulk processing
self._valid_for_bulk()
self.dump_path = None
self.control_path = None
def _valid_for_bulk(self):
""" Determine if table is compatible with bulk loading. """
expr_cols = self.table.db_expr_cols()
if expr_cols:
cols = ','.join(expr_cols)
msg = f"Columns with `db_expr` are incompatible with BulkSurge. Use DataSurge instead. cols: {cols}"
logger.exception(msg)
raise RuntimeError(msg)
def _get_connection_config(self):
if not self.cursor.connection.connection_name:
raise RuntimeError('BCP needs credentials. Please set up a named connection in the config file.')
cm = dbtk.config.ConfigManager()
config = cm.get_connection_config(self.cursor.connection.connection_name)
return config
[docs]
def load(self, records: Iterable[Record],
method: str = 'direct',
dump_path: Optional[Union[str, Path]] = None) -> int:
"""
Bulk load records using database-specific mechanisms.
Automatically selects the appropriate loading strategy based on database type
and method parameter. Direct methods use streaming with zero temp files when
possible. External methods use command-line tools and require a named connection.
Parameters
----------
records : Iterable[Record]
Iterator of Record objects to load
method : str, optional
Loading method to use (default: 'direct')
- 'direct': Stream data using native drivers (Postgres COPY, Oracle direct_path_load)
- 'external': Dump CSV then load (Oracle sqlldr, SQL Server bcp, MySQL LOAD DATA LOCAL INFILE)
dump_path : str or Path, optional
Path for temp CSV files (only used by external methods)
- If file path: use exactly as specified
- If directory: generate timestamped file in that directory
- If None: use settings['data_dump_dir'] or temp directory
Returns
-------
int
Number of records successfully loaded
Raises
------
RuntimeError
If external method requires credentials but connection_name is not set
NotImplementedError
If database type is not supported or required driver features unavailable
Notes
-----
**PostgreSQL/Redshift:**
- Only uses direct method (COPY FROM STDIN)
- Streaming with background writer thread, no temp files
- If you need to use `psql \\copy`, used BulkSurge.dump() to generate transformed CSV file
**Oracle:**
- Direct: Uses direct_path_load (requires python-oracledb 3.4+)
- External: Uses SQL*Loader (sqlldr) with auto-generated control file
- External method requires named connection from config
**MySQL/MariaDB:**
- Only external method supported (direct raises NotImplementedError)
- Dumps CSV then executes LOAD DATA LOCAL INFILE
- Requires local_infile=1 on server
**SQL Server:**
- Only external method (uses bcp utility)
- Requires named connection from config for credentials
- Supports integrated auth (Windows) if no user/password in config
Examples
--------
Direct streaming (default)::
surge = BulkSurge(table)
surge.load(reader) # Streams data, zero temp files
Oracle SQL*Loader::
surge = BulkSurge(table)
surge.load(reader, method='external', dump_path='/staging')
SQL Server with bcp (requires config connection)::
db = dbtk.connect('prod_mssql') # Must use named connection
table = Table('dbo.orders', columns=..., cursor=db.cursor())
surge = BulkSurge(table)
surge.load(reader) # Uses bcp with credentials from config
See Also
--------
dump : Export records to CSV file
"""
self.start_time = time.monotonic()
# Commit any pending transaction to avoid locks during bulk load
self.cursor.connection.commit()
db_type = self.cursor.connection.database_type.lower()
self.start_time = time.monotonic()
if method == 'direct':
if db_type in ('postgres', 'redshift'):
result = self._load_postgres_direct(records)
elif db_type == 'oracle':
result = self._load_oracle_direct(records)
else:
raise NotImplementedError(f'Direct load not available for {db_type}')
elif method == 'external':
if db_type == 'oracle':
result = self._load_oracle_sqlldr(records, dump_path=dump_path)
elif db_type in ('sqlserver', 'mssql'):
result = self._load_mssql_bcp(records, dump_path=dump_path)
elif db_type in ('mysql', 'mariadb'):
result = self._load_mysql_external(records, dump_path=dump_path)
else:
raise NotImplementedError(f'External load not available for {db_type}')
else:
raise ValueError(f'Method {method} not supported. Must be either direct or external.')
self._log_summary()
return result
def _load_postgres_direct(self, records: Iterable[Record]) -> int:
_ = self.table.get_sql('insert')
cols = ", ".join(self._get_columns('insert'))
sql = f"COPY {self.table.name} ({cols}) FROM STDIN WITH (FORMAT csv, NULL '\\N')"
if self.cursor.connection.driver.__name__ == 'psycopg':
return self._load_postgres_psycopg3(records, sql)
return self._load_postgres_psycopg2(records, sql)
def _load_postgres_psycopg2(self, records: Iterable[Record], sql: str) -> int:
buffer = DequeBuffer(max_rows=self.batch_size * 3)
exception = None
def writer_thread():
nonlocal exception
try:
writer = CSVWriter(data=None, file=buffer, write_headers=False, null_string='\\N')
for batch in self.batched(records):
writer.write_batch(batch)
except Exception as e:
exception = e
finally:
buffer.close()
thread = threading.Thread(target=writer_thread, daemon=True)
thread.start()
try:
self.cursor.copy_expert(sql, buffer)
finally:
buffer.close()
thread.join(timeout=30)
if thread.is_alive():
logger.warning("Writer thread did not finish in time")
if exception:
raise exception
return self.total_loaded
def _load_postgres_psycopg3(self, records: Iterable[Record], sql: str) -> int:
import io
with self.cursor._cursor.copy(sql) as copy:
for batch in self.batched(records):
buf = io.StringIO()
writer = CSVWriter(data=None, file=buf, write_headers=False, null_string='\\N')
writer.write_batch(batch)
copy.write(buf.getvalue())
return self.total_loaded
def _load_oracle_direct(self, records: Iterable[Record]) -> int:
"""
Load data into Oracle using python-oracledb's direct_path_load.
This method uses Oracle's direct path load mechanism for maximum performance.
It bypasses the SQL engine and writes directly to data files, offering
significantly higher throughput than standard INSERT. However, DataSurge
(using normal inserts and executemany) is MUCH more forgiving and in most cases, fast enough.
Parameters
----------
records : Iterable[Record]
Stream of transformed and validated Record objects from batched().
Returns
-------
int
Total number of records successfully loaded.
Raises
------
ValueError
If table name is not in schema.table format.
NotImplementedError
If the oracledb driver version does not support direct_path_load
(requires python-oracledb >= 3.4).
RuntimeError
If direct_path_load fails (e.g., due to constraints, triggers,
or data type issues).
Notes
-----
- direct_path_load is extremely fast but has strict requirements:
* Table must allow direct path loads (no active triggers, foreign keys,
or certain constraints unless disabled).
* Primary keys and unique indexes should typically be disabled or
deferred.
* The load is non-logged in some configurations (faster but less recoverable).
- This method is only used when BulkSurge is instantiated — it is not
suitable for tables with db_expr columns or active DML constraints.
- For more forgiving loads, use DataSurge.insert().
"""
_ = self.table.get_sql('insert')
cols = self._get_columns('insert')
tabname = self.table.name.split('.')
if len(tabname) == 2:
schema, table_name = tabname
else:
raise ValueError("Schema is required, use [schema].[table] format")
# Check if driver supports direct_path_load
if not hasattr(self.cursor.connection, 'direct_path_load'):
raise NotImplementedError(
"direct_path_load requires python-oracledb 3.4+. "
"Try method=external to use SQL*Loader"
)
for batch in self.batched(records):
# Execute direct path load
self.cursor.connection.direct_path_load(
schema_name=schema,
table_name=table_name,
batch_size=self.batch_size,
data=batch,
column_names=cols
)
return self.total_loaded
def _load_oracle_sqlldr(self, records: Iterable[Record],
dump_path: Optional[Union[str, Path]] = None) -> int:
"""
Load data into Oracle using SQL*Loader (sqlldr) external utility.
Dumps records to CSV, generates a control file, and invokes sqlldr with
credentials from the named connection. Both CSV and control files are
cleaned up after loading completes.
Parameters
----------
records : Iterable[Record]
Records to load
dump_path : str or Path, optional
Path for CSV file (control file placed alongside with unique suffix)
Returns
-------
int
Number of records loaded
Raises
------
RuntimeError
If connection was not created from named config (no connection_name)
If sqlldr command fails
Notes
-----
- Requires connection via dbtk.connect('connection_name') for credentials
- Auto-generates control file with CHAR type for all columns
- Uses CSV format with comma delimiter and quoted fields
- Credentials passed via command line (sqlldr limitation)
- Temp files (CSV + .ctl) are deleted after load completes
Examples
--------
::
db = dbtk.connect('oracle_prod') # Named connection required
table = Table('schema.table_name', columns=..., cursor=db.cursor())
surge = BulkSurge(table)
surge.load(reader, method='external') # Uses SQL*Loader
"""
config = self._get_connection_config()
user = config.get('user')
password = config.get('password')
db = config.get('database') or config.get('dsn')
# Dump CSV
csv_path = self._resolve_file_path(dump_path, 'csv')
self.dump(records, file=csv_path, delimiter=',', quotechar='"')
# dump automatically creates .ctl file if connected to Oracle
ctl_path = self.control_path
log_path = self.log_dir + self.dump_path.stem + '.log'
cmd = ['sqlldr', f'userid={user}/{password}@{db}', f'control={ctl_path}', f'log={log_path}']
logger.debug(f'sqlldr userid={user}/<PASSWORD>@{db} control={ctl_path} log={log_path}')
try:
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
if result.returncode == 0:
logger.info("SQL*Loader completed successfully")
elif result.returncode == 2:
logger.warning(f"SQL*Loader completed with warnings:")
else:
logger.error(f"SQL*Loader failed with exit code {result.returncode}:\n{result.stderr}")
raise RuntimeError(f"sql*loader failed with exit code {result.returncode}")
lines = result.stdout.strip().split('\n')
msg = "sql*loader success: " + '\n'.join(lines[-6:])
logger.info(msg)
logger.info(f"See sql*loader log for details: {log_path}")
except subprocess.CalledProcessError as e:
logger.error(f"sql*loader failed: {e.stderr}")
raise RuntimeError("sql*loader failed") from e
csv_path.unlink(missing_ok=True)
ctl_path.unlink(missing_ok=True)
return self.total_loaded
def _load_mssql_bcp(self, records: Iterable[Record],
dump_path: Optional[Union[str, Path]] = None) -> int:
"""
Load data into SQL Server using bcp (bulk copy program) utility.
Dumps records to a delimited file using ASCII Unit Separator (\\x1f)
as the field delimiter — a control character that never appears in
real-world data, eliminating the need for quoting or escaping. Invokes
bcp with credentials from the named connection.
Parameters
----------
records : Iterable[Record]
Records to load
dump_path : str or Path, optional
Path for the data file (directory or full path)
Returns
-------
int
Number of records loaded
Raises
------
RuntimeError
If connection was not created from named config (no connection_name)
If bcp command fails
Notes
-----
- Requires connection via dbtk.connect('connection_name') for credentials
- Uses ASCII Unit Separator (\\x1f) as field delimiter — no quoting needed
- Uses ``-u`` flag (TrustServerCertificate) for ODBC Driver 18 compatibility
- If user/password in config: uses SQL authentication (-U, -P)
- If no user/password: uses Windows integrated auth (-T)
- Credentials passed via command line (bcp limitation)
- Temp data file is deleted after load completes
- Alternative: Use DataSurge with pyodbc for fast executemany (no bcp needed)
Examples
--------
SQL Authentication::
# Config file has user/password
db = dbtk.connect('mssql_prod')
surge = BulkSurge(table)
surge.load(reader) # Uses bcp with -U/-P
Windows Integrated Auth::
# Config file has no user/password
db = dbtk.connect('mssql_prod')
surge = BulkSurge(table)
surge.load(reader) # Uses bcp with -T
"""
config = self._get_connection_config()
user = config.get('user')
password = config.get('password')
host = config.get('host')
db = config.get('database')
self.dump(
records,
file=dump_path,
write_headers=False,
delimiter='\x1f', # Unit Separator — super safe
quotechar=None, # No quoting needed
quoting=csv.QUOTE_NONE,
escapechar=None # No escaping
)
csv_path = self.dump_path
cmd = ['bcp', self.table.name, 'in', str(csv_path), '-S', host, '-d', db, '-c', '-u', '-t\x1f', '-r\\n']
logger.debug('BCP command (minus auth):' + ' '.join(cmd))
if user and password:
cmd += ['-U', user, '-P', password]
else:
cmd += ['-T',] # integrated auth
# Run BCP
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
if result.returncode != 0:
error_msg = f"bcp failed with exit code {result.returncode}"
if result.stderr:
error_msg += f"\n{result.stderr}"
elif result.stdout:
# Stderr empty - show last 10 lines of stdout as fallback
lines = result.stdout.strip().split('\n')
error_msg += f"\nstdout (last 10 lines):\n" + '\n'.join(lines[-10:])
logger.error(error_msg)
raise RuntimeError(error_msg)
else:
# On success, show summary (last 5 lines)
if result.stdout:
lines = result.stdout.strip().split('\n')
logger.info("bcp completed:\n" + '\n'.join(lines[-5:]))
csv_path.unlink(missing_ok=True)
return self.total_loaded
def _load_mysql_external(self, records: Iterable[Record],
dump_path: Optional[Union[str, Path]] = None) -> int:
"""
External bulk load for MySQL using temp file + LOAD DATA LOCAL INFILE.
Dumps records to CSV file, then executes LOAD DATA LOCAL INFILE to bulk load
from the file. Unlike direct method, always uses a temp file on disk.
Parameters
----------
records : Iterable[Record]
Records to load
dump_path : str or Path, optional
Path for CSV dump (file or directory)
Returns
-------
int
Number of records loaded
Notes
-----
Uses LOAD DATA LOCAL INFILE which requires local_infile=1 on server.
For direct streaming without temp files, use method='direct' instead.
"""
# Dump to CSV file
csv_path = self._resolve_file_path(dump_path, 'csv')
self.dump(records, file=csv_path, lineterminator='\n')
# Execute LOAD DATA LOCAL INFILE from the temp file
# Use forward slashes for MySQL (works on Windows too, avoids escape issues)
csv_path_str = csv_path.absolute().as_posix()
cols = ", ".join(self._get_columns('insert'))
sql = dedent(f"""\
LOAD DATA LOCAL INFILE '{csv_path_str}'
INTO TABLE {self.table.name}
FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"'
ESCAPED BY '\\\\'
LINES TERMINATED BY '\\n'
IGNORE 1 LINES
({cols})
""")
try:
self.cursor.execute(sql)
self.cursor.connection.commit() # MySQL requires explicit commit
et = time.monotonic()
logger.info(f"Loaded {self.total_loaded:,} records from {csv_path} total time: {et - self.start_time:.2}")
except Exception as e:
logger.error(f"LOAD DATA LOCAL INFILE failed: {e}")
logger.info(f"CSV file preserved at: {csv_path}")
raise
# csv_path.unlink(missing_ok=True)
return self.total_loaded
def _generate_control_file(self, csv_path: Path, write_headers: bool =True) -> Path:
"""
Generate SQL*Loader control file for Oracle.
Parameters
----------
csv_path : Path
Path to the CSV data file
write_headers : bool, optional
If headers were written to data file, the first row will be skipped.
Returns
-------
Path
Path to the generated control file
Notes
-----
Control file is placed alongside the CSV with .ctl extension.
Uses CHAR type for all columns with CSV format.
"""
ctl_path = csv_path.with_name(f"{csv_path.stem}.ctl")
# Generate control file from current Table schema
cols = ',\n '.join(f"{col} CHAR" for col in self._get_columns('insert'))
ctl_content = dedent(f"""\
OPTIONS (DIRECT=TRUE, ROWS={self.batch_size}, SKIP={int(write_headers)})
LOAD DATA
INFILE '{csv_path.absolute()}'
BADFILE '{csv_path.with_suffix(".bad").absolute()}'
DISCARDFILE '{csv_path.with_suffix(".dsc").absolute()}'
INTO TABLE {self.table.name}
FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"'
TRAILING NULLCOLS(
{cols})
""")
ctl_path.write_text(ctl_content)
self.control_path = ctl_path
return ctl_path
[docs]
def dump(self, records: Iterable[Record],
filename: Optional[Union[str, Path]] = None,
write_headers: bool = True,
delimiter: str = ",",
encoding: str = 'utf-8',
**csv_args) -> int:
"""
Export records to a delimited file.
Resolves the output path, writes all records via CSVWriter, and sets
``self.dump_path`` to the resolved path for callers that need it.
For Oracle connections, automatically generates a SQL*Loader control
file alongside the data file.
Parameters
----------
records : Iterable[Record]
Records to export
filename : Union[str, Path], optional
Target file path (directory or full path). See _resolve_file_path
for resolution priority.
write_headers : bool, optional
Include column headers (default: True)
delimiter : str, optional
Field delimiter character (default: ','). Extension is inferred:
'\\t' → .tsv, anything else → .csv
encoding : str, optional
File encoding (default: 'utf-8')
**csv_args : optional
Additional keyword arguments passed to csv.writer (e.g.
quoting, quotechar, escapechar)
Returns
-------
int
Number of records written
Side Effects
------------
Sets ``self.dump_path`` to the resolved output Path.
Notes
-----
**Oracle Auto-generation:**
When connected to Oracle, automatically generates a SQL*Loader
control file (.ctl) alongside the data file and logs the sqlldr
command to run.
Examples
--------
Export with auto-generated Oracle control file::
db = dbtk.connect('oracle_prod')
surge = BulkSurge(table)
surge.dump(reader, '/staging/export.csv')
# Creates: /staging/export.csv + /staging/export_a1b2c3d4.ctl
# Logs sqlldr command, e.g.:
# sqlldr userid=USER/PASS@DB control=... data=...
Custom delimiter with no quoting (e.g. for bcp)::
surge.dump(reader, '/staging/data.csv', delimiter='\\x1f',
quoting=csv.QUOTE_NONE, escapechar=None)
"""
ext = '.tsv' if delimiter == '\t' else '.csv'
headers = self._get_columns('insert')
logger.debug(f'Dump column headers: {headers}')
dump_path = self._resolve_file_path(filename, extension=ext)
self.dump_path = dump_path
with open(dump_path, "w", encoding=encoding, newline='') as fp:
writer = CSVWriter(data=None,
file=fp,
write_headers=write_headers,
headers=headers,
delimiter=delimiter, **csv_args)
for batch in self.batched(records):
writer.write_batch(batch)
logger.info(f"Dumped {self.total_loaded:,} records to {dump_path}")
# Oracle: auto-generate control file and provide sqlldr command
db_type = self.cursor.connection.database_type.lower()
if 'oracle' in db_type:
ctl_path = self._generate_control_file(dump_path, write_headers=write_headers)
logger.info(f"Generated SQL*Loader control file: {ctl_path}")
# Show sqlldr command with placeholders
if self.cursor.connection.connection_name:
logger.info(
f"To load with SQL*Loader:\n"
f" sqlldr userid=USER/PASS@DB control={ctl_path} data={dump_path}\n"
" (or use DataSurge.load(method='external') for automatic loading"
)
else:
logger.info(
f"To load with SQL*Loader:\n"
f" sqlldr userid=USER/PASS@DB control={ctl_path} data={dump_path}"
)
return self.total_loaded