import sys try: import psycopg2 except ImportError: psycopg2 = None try: import sqlite3 except ImportError: sqlite3 = None class SQLUtils: def __init__(self, db_type, **kwargs): self.db_type = db_type.lower() self.conn = None self.cursor = None self.kwargs = kwargs self.connect() def connect(self): if self.db_type == 'pgsql' or self.db_type == 'postgresql': if not psycopg2: raise ImportError('psycopg2 is not installed') self.conn = psycopg2.connect(**self.kwargs) elif self.db_type == 'sqlite' or self.db_type == 'sqlite3': if not sqlite3: raise ImportError('sqlite3 is not installed') self.conn = sqlite3.connect(self.kwargs.get('database', ':memory:')) else: raise ValueError(f'Unsupported db_type: {self.db_type}') self.cursor = self.conn.cursor() def execute_query(self, sql, params=None): if params is None: params = () self.cursor.execute(sql, params) self.conn.commit() def execute_update(self, sql, params=None): try: self.cursor.execute(sql,params) self.conn.commit() except Exception as e: self.conn.rollback() raise e def begin_transaction(self) -> None: """开始事务""" if self.db_type in ['sqlite', 'sqlite3']: self.execute_query('BEGIN TRANSACTION') else: self.conn.autocommit = False def commit_transaction(self) -> None: """提交事务""" self.conn.commit() if self.db_type not in ['sqlite', 'sqlite3']: self.conn.autocommit = True def rollback_transaction(self) -> None: """回滚事务""" self.conn.rollback() if self.db_type not in ['sqlite', 'sqlite3']: self.conn.autocommit = True def fetchone(self): return self.cursor.fetchone() def fetchall(self): return self.cursor.fetchall() def close(self): if self.cursor: self.cursor.close() if self.conn: self.conn.close()