python数据库类封装

import pymysql
from log import MyLog as Log


class DbModel:
    # 数据库连接对象
    __db = None
    # 游标对象
    __cursor = None

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        return self

    def __new__(self, *args, **kwargs):
        if not hasattr(self, '_instance'):
            self._instance = super().__new__(self)
            # 主机
            host = 'host' in kwargs and kwargs['host'] or 'localhost'
            # 端口
            port = 'port' in kwargs and kwargs['port'] or '3306'
            # 用户名
            user = 'user' in kwargs and kwargs['user'] or 'root'
            # 密码
            passwd = 'passwd' in kwargs and kwargs['passwd'] or '123456'
            # 数据库
            db = 'db' in kwargs and kwargs['db'] or 'mypython'
            # 编码
            charset = 'charset' in kwargs and kwargs['charset'] or 'utf8mb4'
            # 打开数据库连接
            Log.info('连接数据库')
            self.__db = pymysql.connect(host = host, port = int(port), user = user, passwd = passwd, db = db, charset = charset)
            # 创建一个游标对象 cursor
            # self.__cursor = self.__db.cursor()
            self.__cursor = self.__db.cursor(cursor = pymysql.cursors.DictCursor)
        return self._instance

    # 返回执行execute()方法后影响的行数
    def execute(self, sql):
        Log.info(sql)
        self.__cursor.execute(sql)
        rowcount = self.__cursor.rowcount
        return rowcount

    def query(self, sql):
        Log.info(sql)
        self.__cursor.execute(sql)
        results = self.__cursor.fetchall()
        return results

    # 增->返回新增ID
    def insert(self, **kwargs):
        table = kwargs['table']
        cell = kwargs['cell']
        del kwargs['table']
        sql = 'insert into %s set ' % table
        for k, v in cell.items():
            sql += "`%s`='%s'," % (k, v)
        sql = sql.rstrip(',')
        Log.info(sql)
        try:
            # 执行SQL语句
            self.__cursor.execute(sql)
            # 提交到数据库执行
            self.__db.commit()
            # 获取自增id
            res = self.__cursor.lastrowid
            return res
        except Exception as e:
            Log.error(str(e))
            # 发生错误时回滚
            self.__db.rollback()

    # 删->返回影响的行数
    def delete(self, **kwargs):
        table = kwargs['table']
        where = kwargs['where']
        sql = 'DELETE FROM %s where %s' % (table, where)
        Log.info(sql)
        try:
            # 执行SQL语句
            self.__cursor.execute(sql)
            # 提交到数据库执行
            self.__db.commit()
            # 影响的行数
            rowcount = self.__cursor.rowcount
            return rowcount
        except Exception as e:
            Log.error(str(e))
            # 发生错误时回滚
            self.__db.rollback()

    # 改->返回影响的行数
    def update(self, **kwargs):
        table = kwargs['table']
        cell = kwargs['cell']
        # del kwargs['table']
        kwargs.pop('table')

        where = kwargs['where']
        kwargs.pop('where')

        sql = 'update %s set ' % table
        for k, v in cell.items():
            sql += "`%s`='%s'," % (k, v)
        sql = sql.rstrip(',')
        sql += ' where %s' % where
        Log.info(sql)
        try:
            # 执行SQL语句
            self.__cursor.execute(sql)
            # 提交到数据库执行
            self.__db.commit()
            # 影响的行数
            rowcount = self.__cursor.rowcount
            return rowcount
        except Exception as e:
            Log.error(str(e))
            # 发生错误时回滚
            self.__db.rollback()

    def insert_many(self, **kwargs):
        table = kwargs.get('table')
        data = kwargs.get('data')
        query = ""
        duplicate = kwargs.get('duplicate')
        values = []
        for data_dict in data:
            if not query:
                columns = ', '.join('`{0}`'.format(k) for k in data_dict)
                place_holders = ', '.join('%s'.format(k) for k in data_dict)
                query = "INSERT INTO {0} ({1}) VALUES ({2})".format(table, columns, place_holders)
                if duplicate:
                    duplicateStr = ', '.join('`{0}`=VALUES(`{1}`)'.format(k, k) for k in duplicate)
                    query = query + " ON DUPLICATE KEY UPDATE {0}".format(duplicateStr)
            v = list(data_dict.values())[:]
            values.append(v)
        Log.info(query)
        try:
            # 执行SQL语句
            self.__cursor.executemany(query, values)
            # 提交到数据库执行
            self.__db.commit()
            # 影响的行数
            rowcount = self.__cursor.rowcount
            return rowcount
        except Exception as e:
            Log.error(str(e))
            # 发生错误时回滚
            self.__db.rollback()

    # 查->单条数据
    def fetchone(self, **kwargs):
        table = kwargs['table']
        # 字段
        field = 'field' in kwargs and kwargs['field'] or '*'
        # where
        where = 'where' in kwargs and 'where ' + kwargs['where'] or ''
        # order
        order = 'order' in kwargs and 'order by ' + kwargs['order'] or ''

        sql = 'select %s from %s %s %s limit 1' % (field, table, where, order)
        Log.info(sql)
        try:
            # 执行SQL语句
            self.__cursor.execute(sql)
            # 使用 fetchone() 方法获取单条数据.
            data = self.__cursor.fetchone()
            return data
        except Exception as e:
            Log.error(str(e))
            # 发生错误时回滚
            self.__db.rollback()

    # 查->多条数据
    def fetchall(self, **kwargs):
        table = kwargs['table']
        # 字段
        field = 'field' in kwargs and kwargs['field'] or '*'
        # where
        where = 'where' in kwargs and 'where ' + kwargs['where'] or ''
        # order
        order = 'order' in kwargs and 'order by ' + kwargs['order'] or ''
        # limit
        limit = 'limit' in kwargs and 'limit ' + kwargs['limit'] or ''
        sql = 'select %s from %s %s %s %s' % (field, table, where, order, limit)
        Log.info(sql)
        try:
            # 执行SQL语句
            self.__cursor.execute(sql)
            # 使用 fetchone() 方法获取单条数据.
            data = self.__cursor.fetchall()
            return data
        except Exception as e:
            Log.error(str(e))
            # 发生错误时回滚
            self.__db.rollback()

    # 析构函数,释放对象时使用
    def __del__(self):
        # 关闭数据库连接
        self.__db.close()
        print('关闭数据库连接')

"""
封装log方法
"""

import os
import time
import platform
import logging.handlers

# 日志打印等级
LEVELS = {
    'debug': logging.DEBUG,
    'info': logging.INFO,
    'warning': logging.WARNING,
    'error': logging.ERROR,
    'critical': logging.CRITICAL
}

# 创建一个日志
logger = logging.getLogger()
level = 'default'


# 创建日志文件方法
def create_file(filename):
    path = filename[0:filename.rfind('/')]
    if not os.path.isdir(path):
        os.makedirs(path)
    if not os.path.isfile(filename):
        fd = open(filename, mode = 'w', encoding = 'utf-8')
        fd.close()
    else:
        pass


# 给logger添加handler 添加内容到日志句柄中
def set_handler(levels):
    if levels == 'error':
        logger.addHandler(MyLog.err_handler)
    logger.addHandler(MyLog.handler)


# 在记录日志之后移除句柄
def remove_handler(levels):
    if levels == 'error':
        logger.removeHandler(MyLog.err_handler)
    logger.removeHandler(MyLog.handler)


def get_current_time():
    return time.strftime(MyLog.date, time.localtime(time.time()))


class MyLog:
    path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
    host_os = platform.system()
    if host_os == 'Linux':
        log_file = r'/var/log/info.log'
        err_file = r'/var/log/err.log'
    else:
        log_file = path + '/Log/info.log'
        err_file = path + '/Log/err.log'
    logger.setLevel(LEVELS.get(level, logging.NOTSET))
    create_file(log_file)
    create_file(err_file)
    date = '%Y-%m-%d %H:%M:%S'

    # 创建一个handler,用于写入日志文件
    handler = logging.FileHandler(log_file, encoding = 'utf-8')
    err_handler = logging.FileHandler(err_file, encoding = 'utf-8')

    @staticmethod
    def debug(log_meg):
        set_handler('debug')
        # 文件中输出模式
        logger.debug("[" + get_current_time() + "] - [DEBUG]:" + log_meg)
        remove_handler('debug')

    @staticmethod
    def info(log_meg):
        set_handler('info')
        logger.info("[" + get_current_time() + "] - [INFO]:" + log_meg)
        remove_handler('info')

    @staticmethod
    def warning(log_meg):
        set_handler('warning')
        logger.warning("[" + get_current_time() + "] - [WARNING]:" + log_meg)
        remove_handler('warning')

    @staticmethod
    def error(log_meg):
        set_handler('error')
        logger.error("[" + get_current_time() + "] - [ERROR]:" + log_meg)
        remove_handler('error')

    @staticmethod
    def critical(log_meg):
        set_handler('critical')
        logger.error("[" + get_current_time() + "] - [CRITICAL]:" + log_meg)
        remove_handler('critical')

    # # 设置控制台输出格式
    # formatter = logging.Formatter(
    #     '[%(asctime)s] [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S')
    # # 再创建一个handler,用于输出到控制台
    # console = logging.StreamHandler()
    # console.setFormatter(formatter)
    # logger.addHandler(console)
    # console.setLevel(logging.NOTSET)


if __name__ == "__main__":
    MyLog.debug("This is debug message")
    MyLog.info("This is info message")
    MyLog.warning("This is warning message")
    MyLog.error("This is error")
    MyLog.critical("This is critical message")
最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,100评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,308评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,718评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,275评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,376评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,454评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,464评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,248评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,686评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,974评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,150评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,817评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,484评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,140评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,374评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,012评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,041评论 2 351

推荐阅读更多精彩内容