import pymysql
from log import MyLog as Log
class DbModel:
# 数据库连接对象
__db = None
# 游标对象
__cursor = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
return self
def __new__(self, *args, **kwargs):
if not hasattr(self, '_instance'):
self._instance = super().__new__(self)
# 主机
host = 'host' in kwargs and kwargs['host'] or 'localhost'
# 端口
port = 'port' in kwargs and kwargs['port'] or '3306'
# 用户名
user = 'user' in kwargs and kwargs['user'] or 'root'
# 密码
passwd = 'passwd' in kwargs and kwargs['passwd'] or '123456'
# 数据库
db = 'db' in kwargs and kwargs['db'] or 'mypython'
# 编码
charset = 'charset' in kwargs and kwargs['charset'] or 'utf8mb4'
# 打开数据库连接
Log.info('连接数据库')
self.__db = pymysql.connect(host = host, port = int(port), user = user, passwd = passwd, db = db, charset = charset)
# 创建一个游标对象 cursor
# self.__cursor = self.__db.cursor()
self.__cursor = self.__db.cursor(cursor = pymysql.cursors.DictCursor)
return self._instance
# 返回执行execute()方法后影响的行数
def execute(self, sql):
Log.info(sql)
self.__cursor.execute(sql)
rowcount = self.__cursor.rowcount
return rowcount
def query(self, sql):
Log.info(sql)
self.__cursor.execute(sql)
results = self.__cursor.fetchall()
return results
# 增->返回新增ID
def insert(self, **kwargs):
table = kwargs['table']
cell = kwargs['cell']
del kwargs['table']
sql = 'insert into %s set ' % table
for k, v in cell.items():
sql += "`%s`='%s'," % (k, v)
sql = sql.rstrip(',')
Log.info(sql)
try:
# 执行SQL语句
self.__cursor.execute(sql)
# 提交到数据库执行
self.__db.commit()
# 获取自增id
res = self.__cursor.lastrowid
return res
except Exception as e:
Log.error(str(e))
# 发生错误时回滚
self.__db.rollback()
# 删->返回影响的行数
def delete(self, **kwargs):
table = kwargs['table']
where = kwargs['where']
sql = 'DELETE FROM %s where %s' % (table, where)
Log.info(sql)
try:
# 执行SQL语句
self.__cursor.execute(sql)
# 提交到数据库执行
self.__db.commit()
# 影响的行数
rowcount = self.__cursor.rowcount
return rowcount
except Exception as e:
Log.error(str(e))
# 发生错误时回滚
self.__db.rollback()
# 改->返回影响的行数
def update(self, **kwargs):
table = kwargs['table']
cell = kwargs['cell']
# del kwargs['table']
kwargs.pop('table')
where = kwargs['where']
kwargs.pop('where')
sql = 'update %s set ' % table
for k, v in cell.items():
sql += "`%s`='%s'," % (k, v)
sql = sql.rstrip(',')
sql += ' where %s' % where
Log.info(sql)
try:
# 执行SQL语句
self.__cursor.execute(sql)
# 提交到数据库执行
self.__db.commit()
# 影响的行数
rowcount = self.__cursor.rowcount
return rowcount
except Exception as e:
Log.error(str(e))
# 发生错误时回滚
self.__db.rollback()
def insert_many(self, **kwargs):
table = kwargs.get('table')
data = kwargs.get('data')
query = ""
duplicate = kwargs.get('duplicate')
values = []
for data_dict in data:
if not query:
columns = ', '.join('`{0}`'.format(k) for k in data_dict)
place_holders = ', '.join('%s'.format(k) for k in data_dict)
query = "INSERT INTO {0} ({1}) VALUES ({2})".format(table, columns, place_holders)
if duplicate:
duplicateStr = ', '.join('`{0}`=VALUES(`{1}`)'.format(k, k) for k in duplicate)
query = query + " ON DUPLICATE KEY UPDATE {0}".format(duplicateStr)
v = list(data_dict.values())[:]
values.append(v)
Log.info(query)
try:
# 执行SQL语句
self.__cursor.executemany(query, values)
# 提交到数据库执行
self.__db.commit()
# 影响的行数
rowcount = self.__cursor.rowcount
return rowcount
except Exception as e:
Log.error(str(e))
# 发生错误时回滚
self.__db.rollback()
# 查->单条数据
def fetchone(self, **kwargs):
table = kwargs['table']
# 字段
field = 'field' in kwargs and kwargs['field'] or '*'
# where
where = 'where' in kwargs and 'where ' + kwargs['where'] or ''
# order
order = 'order' in kwargs and 'order by ' + kwargs['order'] or ''
sql = 'select %s from %s %s %s limit 1' % (field, table, where, order)
Log.info(sql)
try:
# 执行SQL语句
self.__cursor.execute(sql)
# 使用 fetchone() 方法获取单条数据.
data = self.__cursor.fetchone()
return data
except Exception as e:
Log.error(str(e))
# 发生错误时回滚
self.__db.rollback()
# 查->多条数据
def fetchall(self, **kwargs):
table = kwargs['table']
# 字段
field = 'field' in kwargs and kwargs['field'] or '*'
# where
where = 'where' in kwargs and 'where ' + kwargs['where'] or ''
# order
order = 'order' in kwargs and 'order by ' + kwargs['order'] or ''
# limit
limit = 'limit' in kwargs and 'limit ' + kwargs['limit'] or ''
sql = 'select %s from %s %s %s %s' % (field, table, where, order, limit)
Log.info(sql)
try:
# 执行SQL语句
self.__cursor.execute(sql)
# 使用 fetchone() 方法获取单条数据.
data = self.__cursor.fetchall()
return data
except Exception as e:
Log.error(str(e))
# 发生错误时回滚
self.__db.rollback()
# 析构函数,释放对象时使用
def __del__(self):
# 关闭数据库连接
self.__db.close()
print('关闭数据库连接')
"""
封装log方法
"""
import os
import time
import platform
import logging.handlers
# 日志打印等级
LEVELS = {
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'critical': logging.CRITICAL
}
# 创建一个日志
logger = logging.getLogger()
level = 'default'
# 创建日志文件方法
def create_file(filename):
path = filename[0:filename.rfind('/')]
if not os.path.isdir(path):
os.makedirs(path)
if not os.path.isfile(filename):
fd = open(filename, mode = 'w', encoding = 'utf-8')
fd.close()
else:
pass
# 给logger添加handler 添加内容到日志句柄中
def set_handler(levels):
if levels == 'error':
logger.addHandler(MyLog.err_handler)
logger.addHandler(MyLog.handler)
# 在记录日志之后移除句柄
def remove_handler(levels):
if levels == 'error':
logger.removeHandler(MyLog.err_handler)
logger.removeHandler(MyLog.handler)
def get_current_time():
return time.strftime(MyLog.date, time.localtime(time.time()))
class MyLog:
path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
host_os = platform.system()
if host_os == 'Linux':
log_file = r'/var/log/info.log'
err_file = r'/var/log/err.log'
else:
log_file = path + '/Log/info.log'
err_file = path + '/Log/err.log'
logger.setLevel(LEVELS.get(level, logging.NOTSET))
create_file(log_file)
create_file(err_file)
date = '%Y-%m-%d %H:%M:%S'
# 创建一个handler,用于写入日志文件
handler = logging.FileHandler(log_file, encoding = 'utf-8')
err_handler = logging.FileHandler(err_file, encoding = 'utf-8')
@staticmethod
def debug(log_meg):
set_handler('debug')
# 文件中输出模式
logger.debug("[" + get_current_time() + "] - [DEBUG]:" + log_meg)
remove_handler('debug')
@staticmethod
def info(log_meg):
set_handler('info')
logger.info("[" + get_current_time() + "] - [INFO]:" + log_meg)
remove_handler('info')
@staticmethod
def warning(log_meg):
set_handler('warning')
logger.warning("[" + get_current_time() + "] - [WARNING]:" + log_meg)
remove_handler('warning')
@staticmethod
def error(log_meg):
set_handler('error')
logger.error("[" + get_current_time() + "] - [ERROR]:" + log_meg)
remove_handler('error')
@staticmethod
def critical(log_meg):
set_handler('critical')
logger.error("[" + get_current_time() + "] - [CRITICAL]:" + log_meg)
remove_handler('critical')
# # 设置控制台输出格式
# formatter = logging.Formatter(
# '[%(asctime)s] [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S')
# # 再创建一个handler,用于输出到控制台
# console = logging.StreamHandler()
# console.setFormatter(formatter)
# logger.addHandler(console)
# console.setLevel(logging.NOTSET)
if __name__ == "__main__":
MyLog.debug("This is debug message")
MyLog.info("This is info message")
MyLog.warning("This is warning message")
MyLog.error("This is error")
MyLog.critical("This is critical message")