# dbtk/cli.py
import argparse
import importlib.util
import sys
from .database import _get_all_drivers
from .record import fixed_record_factory
from . import config
try:
from importlib.metadata import metadata, distributions, requires
except ImportError:
# backport to older versions
from importlib_metadata import metadata, distributions, requires
def _name_cleanup(name):
"""Cleanup module names for search and display"""
return name.lower().replace('-', '_')
def _get_optional_deps(extra_name='recommended'):
""" Get optional dependencies for dbtk """
reqs = requires('dbtk') or []
deps = []
# Parse requirements like: 'psycopg2-binary>=2.8; extra == "recommended"'
for req in reqs:
req = req.replace("'", '"') # quoting changed between versions
if f'extra == "{extra_name}"' in req:
# Extract just the package name and version spec
pkg = req.split(';')[0].strip()
deps.append(pkg)
return deps
def _is_installed(pkg: str) -> bool:
"""find_spec fails on tomli → this never does."""
pkg = _name_cleanup(pkg)
return (
importlib.util.find_spec(pkg) is not None
or pkg in sys.modules
or pkg in {_name_cleanup(d.name) for d in distributions()}
)
[docs]
def checkup():
""" Check which optional dependencies are installed."""
deps = []
for dep in _get_optional_deps('recommended'):
if ';' in dep:
dep = dep.split(';')[0].strip()
if dep and not dep.startswith('#'):
deps.append(dep.split('>=')[0].split('==')[0].split('<')[0].strip())
installed = {_name_cleanup(d.name): d.version for d in distributions()}
package_rec = fixed_record_factory([('Package', 22), ('Status', 8), ('Version', 15)])
driver_rec = fixed_record_factory([('DB Driver', 22), ('Priority*', 10),
('Status', 9), ('Version', 8), ('Notes', 41)])
row = package_rec(*package_rec._fields)
print(row.to_line(True))
print("-" * row._line_len)
for dep in deps:
clean = dep.replace('-', '_')
status = "✓" if _is_installed(clean) else "✗"
version = installed.get(clean.lower(), '-')
row = package_rec(dep, status, version)
print(row.to_line(True))
print("")
row = driver_rec(*driver_rec._fields)
print(row.to_line(True))
print("-" * 80)
all_drivers = _get_all_drivers()
by_type = {}
for name, info in all_drivers.items():
db_type = info['database_type']
by_type.setdefault(db_type, []).append((info['priority'], name, info))
if _is_installed('pyodbc'):
import pyodbc
odbc_drivers = pyodbc.drivers()
else:
odbc_drivers = []
for db_type in sorted(by_type):
drivers = sorted(by_type[db_type], key=lambda x: x[0])
print(f"{db_type}") # ← bold header
for pri, name, info in drivers:
try:
# see if driver has 'module' attribute to use instead of name
module_name = info.get('module', name)
spec = importlib.util.find_spec(module_name)
version = installed.get(_name_cleanup(module_name), '-- ')
status = "✓" if spec else "✗"
except ModuleNotFoundError:
version = '--'
status = "✗"
odbc_driver_name = info.get("odbc_driver_name")
if odbc_driver_name:
odbc_status = "✓" if odbc_driver_name in odbc_drivers else "✗"
note = f'({odbc_status} {odbc_driver_name})'
else:
if 'note' in info:
note = f'({info.get("note")})'
else:
note = ''
row = driver_rec(f' {name}', pri, status, version, note)
print(row.to_line(True))
print("\n* Lower priority = preferred")
print("\nConfig Health")
print("-" * 40)
for status, msg in config.diagnose_config():
print(f"{status} {msg}")
[docs]
def main():
parser = argparse.ArgumentParser(prog='dbtk', description='DBTK command-line utilities')
subparsers = parser.add_subparsers(dest='command')
subparsers.required = True # required= kwarg not supported in Python 3.6
# checkup
subparsers.add_parser('checkup', help='Check for dependencies and configuration issues')
# config-setup
subparsers.add_parser('config-setup', help='Interactive configuration setup wizard')
# generate-key
subparsers.add_parser('generate-key', help='Generate encryption key')
# store-key
key_parser = subparsers.add_parser('store-key',
help='Store encryption key in system keyring (generate if not provided)')
key_parser.add_argument('key', nargs='?', default=None,
help='Encryption key to store. If omitted, a new key is generated and stored.')
key_parser.add_argument('--force', action='store_true',
help='Overwrite existing encryption key in system keyring')
# encrypt-config
encrypt_parser = subparsers.add_parser('encrypt-config', help='Encrypt passwords in config file')
encrypt_parser.add_argument('config_file', nargs='?', help='Config file path')
# encrypt-password
pwd_parser = subparsers.add_parser('encrypt-password', help='Encrypt a password')
pwd_parser.add_argument('password', nargs='?', help='Password to encrypt')
# migrate-config
migrate_parser = subparsers.add_parser('migrate-config', help='Migrate config to new key')
migrate_parser.add_argument('old_file', help='Old config file')
migrate_parser.add_argument('new_file', help='New config file')
migrate_parser.add_argument('--new-key', help='New encryption key')
args = parser.parse_args()
if args.command == 'checkup':
return checkup()
elif args.command == 'config-setup':
return config.setup_config()
elif args.command == 'generate-key':
return config.generate_encryption_key()
elif args.command == 'store-key':
return config.store_key(args.key, force=args.force)
elif args.command == 'encrypt-config':
return config.encrypt_config_file(args.config_file)
elif args.command == 'encrypt-password':
return config.encrypt_password(args.password)
elif args.command == 'migrate-config':
return config.migrate_config(args.old_file, args.new_file, args.new_key)
if __name__ == '__main__':
main()