jiateng_ws/utils/sql_utils.py

78 lines
2.2 KiB
Python
Raw Normal View History

2025-06-07 10:45:09 +08:00
import sys
try:
import psycopg2
except ImportError:
psycopg2 = None
try:
import sqlite3
except ImportError:
sqlite3 = None
class SQLUtils:
def __init__(self, db_type, **kwargs):
self.db_type = db_type.lower()
self.conn = None
self.cursor = None
self.kwargs = kwargs
self.connect()
def connect(self):
if self.db_type == 'pgsql' or self.db_type == 'postgresql':
if not psycopg2:
raise ImportError('psycopg2 is not installed')
self.conn = psycopg2.connect(**self.kwargs)
elif self.db_type == 'sqlite' or self.db_type == 'sqlite3':
if not sqlite3:
raise ImportError('sqlite3 is not installed')
self.conn = sqlite3.connect(self.kwargs.get('database', ':memory:'))
else:
raise ValueError(f'Unsupported db_type: {self.db_type}')
self.cursor = self.conn.cursor()
def execute_query(self, sql, params=None):
if params is None:
params = ()
self.cursor.execute(sql, params)
self.conn.commit()
def execute_update(self, sql, params=None):
try:
self.cursor.execute(sql,params)
self.conn.commit()
except Exception as e:
self.conn.rollback()
raise e
def begin_transaction(self) -> None:
"""开始事务"""
if self.db_type in ['sqlite', 'sqlite3']:
self.execute_query('BEGIN TRANSACTION')
else:
self.conn.autocommit = False
def commit_transaction(self) -> None:
"""提交事务"""
self.conn.commit()
if self.db_type not in ['sqlite', 'sqlite3']:
self.conn.autocommit = True
def rollback_transaction(self) -> None:
"""回滚事务"""
self.conn.rollback()
if self.db_type not in ['sqlite', 'sqlite3']:
self.conn.autocommit = True
def fetchone(self):
return self.cursor.fetchone()
def fetchall(self):
return self.cursor.fetchall()
def close(self):
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()