# 创建一个postgreSQL数据库连接类
import psycopg2
from psycopg2 import pool
from psycopg2.extras import DictCursor
from psycopg2.extras import RealDictCursor
class Connector:
def __init__(self,database_name):
db_config = {
'user': 'postgres',
'password': 'postgres',
'host': 'localhost',
'port': '5432',
'database': database_name
}
self.db_config = db_config
self.pool = None
def connect(self):
self.pool = psycopg2.pool.SimpleConnectionPool(1, 20, **self.db_config)
def get_connection(self):
return self.pool.getconn()
def close_connection(self, conn):
self.pool.putconn(conn)
def close_all_connection(self):
self.pool.closeall()
def execute_query(self, query, params=None, cursor_factory=None):
conn = self.get_connection()
cursor = conn.cursor(cursor_factory=cursor_factory)
cursor.execute(query, params)
conn.commit()
result = cursor.fetchall()
cursor.close()
self.close_connection(conn)
return result
def execute_query_one(self, query, params=None, cursor_factory=None):
conn = self.get_connection()
cursor = conn.cursor(cursor_factory=cursor_factory)
cursor.execute(query, params)
conn.commit()
result = cursor.fetchone()
cursor.close()
self.close_connection(conn)
return result
def execute_update(self, query, params=None):
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute(query, params)
conn.commit()
cursor.close()
self.close_connection(conn)
def execute_insert(self, query, params=None):
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute(query, params)
conn.commit()
cursor.close()
self.close_connection(conn)
def execute_delete(self, query, params=None):
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute(query, params)
conn.commit()
cursor.close()
self.close_connection(conn)
def execute_insert_returning(self, query, params=None):
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute(query, params)
conn.commit()
result = cursor.fetchone()
cursor.close()
self.close_connection(conn)
return result
def execute_insert_many(self, query, params=None):
conn = self.get_connection()
cursor = conn.cursor()
cursor.executemany(query, params)
conn.commit()
cursor.close()
self.close_connection(conn)
def execute_query_dict(self, query, params=None):
return self.execute_query(query, params, cursor_factory=DictCursor)