#!/usr/bin/env python
# coding:UTF-8
"""
@version: python3.x
@author:曹新健
@contact: 617349013@qq.com
@software: PyCharm
@file: dbSql.py
@time: 2018/9/22 17:47
"""
import pymysql
import logging
import sys
# 加入日志
# 获取logger实例
logger = logging.getLogger("dbSql")
# 指定输出格式
formatter = logging.Formatter('%(asctime)s\
%(levelname)-8s:%(message)s')
# 文件日志
file_handler = logging.FileHandler("dbSql.log")
file_handler.setFormatter(formatter)
# 控制台日志
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
# 为logge添加具体的日志处理器
logger.addHandler(file_handler)
logger.addHandler(console_handler)
logger.setLevel(logging.INFO)
class DbManager:
# 构造函数
def __init__(self, host='127.0.0.1',port=3306, user='cxj',
passwd='123456', db='cxjtest',charset='utf8'):
self.host = host
self.port = port
self.user = user
self.passwd = passwd
self.db = db
self.charset = charset
self.conn = None
self.cur = None
# 连接数据库
def connectDatabase(self):
try:
self.conn = pymysql.connect(host=self.host,port=self.port,user=self.user,passwd=self.passwd,db=self.db,charset=self.charset)
except:
logger.error("connectDatabase failed")
return False
self.cur = self.conn.cursor()
return True
# 关闭数据库
def close(self):
# 如果数据打开,则关闭;否则没有操作
if self.conn and self.cur:
self.cur.close()
self.conn.close()
return True
# 执行数据库的sq语句,主要用来做插入操作
def execute(self, sql,params=None,commit=False,):
# 连接数据库
res = self.connectDatabase()
if not res:
return False
try:
if self.conn and self.cur:
# 正常逻辑,执行sql,提交操作
rowcount = self.cur.execute(sql, params)
#print(rowcount)
if commit:
self.conn.commit()
else:
pass
except:
logger.error("execute failed: " + sql)
logger.error("params: " + str(params))
self.close()
return False
return rowcount
# 查询所有数据
def fetchall(self, sql, params=None):
res = self.execute(sql, params)
if not res:
logger.info("查询失败")
return False
self.close()
results = self.cur.fetchall()
logger.info("查询成功" + str(results))
return results
# 查询一条数据
def fetchone(self, sql, params=None):
res = self.execute(sql, params)
if not res:
logger.info("查询失败")
return False
self.close()
result = self.cur.fetchone()
logger.info("查询成功" + str(result))
return result
# 增删改数据
def edit(self, sql,params=None):
res = self.execute(sql,params,True)
if not res:
logger.info("操作失败")
return False
self.conn.commit()
self.close()
logger.info("操作成功" + str(res))
return res
if __name__ == '__main__':
dbManager = DbManager()
"""
sql = "select * from bandcard WHERE money>%s;"
values = [1000]
result = dbManager.fetchall(sql, values)
"""
sql = "insert into bandcard values %s,%s,%s;"
values = [(0, 100), (0, 200), (0, 300)]
result = dbManager.edit(sql,values)
import re
import pymysql
"""
connect连接对象的方法:
close() --关闭的方法
commit() --如果支持事务则提交挂起的事务
rollback() --回滚挂起的事务
cursor() --返回连接的游标对象
游标对象的方法:
callproc(name,[params]) --用来执行存储过程,接收的参数为存储过程的名字和参数列表,返回受影响的行数
close() --关闭游标
execute(sql,[params])--执行sql语句,可以使用参数,(使用参数时,sql语句中用%s进行站位注值),返回受影响的行数
executemany(sql,params)--执行单挑sql语句,但是重复执行参数列表里的参数,返回受影响的行数
fetchone() --返回结果的下一行
fetchall() --返回结果的 所有行
fetchmany(size)--返回size条记录,如果size大于返回结果行的数量,则会返回cursor.arraysize条记录
nextset() --条至下一行
setinputsizes(size)--定义cursor
游标对象的属性:
description--结果列的描述,只读
rowcount --结果中的行数,只读
arraysize --fetchmany返回的行数,默认为1
"""
class MysqldbHelper(object):
"""操作mysql数据库,基本方法
"""
def __init__(self , host="localhost", username="root", password="", port=3306, database="python_test"):
self.host = host
self.username = username
self.password = password
self.database = database
self.port = port
self.con = None
self.cur = None
try:
self.con = pymysql.connect(host=self.host, user=self.username, passwd=self.password, port=self.port, db=self.database)
# 所有的查询,都在连接 con 的一个???cursor 上面运行的
self.cur = self.con.cursor()
except:
raise "DataBase connect error,please check the db config."
def close(self):
"""关闭数据库连接
"""
if not self.con:
self.con.close()
else:
raise "DataBase doesn't connect,close connectiong error;please check the db config."
def getVersion(self):
"""获取数据库的版本号
"""
self.cur.execute("SELECT VERSION()")
return self.getOneData()
def getOneData(self):
# 取得上个查询的结果,是单个结果
data = self.cur.fetchone()
return data
def creatTable(self, tablename, attrdict, constraint):
"""创建数据库表
args:
tablename :表名字
attrdict :属性键值对,{'book_name':'varchar(200) NOT NULL'...}
constraint :主外键约束,PRIMARY KEY(`id`)
"""
if self.isExistTable(tablename):
return
sql = ''
sql_mid = '`id` bigint(11) NOT NULL AUTO_INCREMENT,'
for attr,value in attrdict.items():
sql_mid = sql_mid + '`'+attr + '`'+' '+ value+','
sql = sql + 'CREATE TABLE IF NOT EXISTS %s ('%tablename
sql = sql + sql_mid
sql = sql + constraint
sql = sql + ') ENGINE=InnoDB DEFAULT CHARSET=utf8'
print('creatTable:'+sql)
self.executeCommit(sql)
def executeSql(self,sql=''):
"""执行sql语句,针对读操作返回结果集
args:
sql :sql语句
"""
try:
self.cur.execute(sql)
records = self.cur.fetchall()
return records
except pymysql.Error as e:
error = 'MySQL execute failed! ERROR (%s): %s' %(e.args[0],e.args[1])
print(error)
def executeCommit(self,sql=''):
"""执行数据库sql语句,针对更新,删除,事务等操作失败时回滚
"""
try:
self.cur.execute(sql)
self.con.commit()
except pymysql.Error as e:
self.con.rollback()
error = 'MySQL execute failed! ERROR (%s): %s' %(e.args[0],e.args[1])
print("error:", error)
return error
def insert(self, tablename, params):
"""插入数据库
args:
tablename :表名字
key :属性键
value :属性值
"""
key = []
value = []
for tmpkey, tmpvalue in params.items():
key.append(tmpkey)
if isinstance(tmpvalue, str):
value.append("\'" + tmpvalue + "\'")
else:
value.append(tmpvalue)
attrs_sql = '('+','.join(key)+')'
values_sql = ' values('+','.join(value)+')'
sql = 'insert into %s'%tablename
sql = sql + attrs_sql + values_sql
print('_insert:'+sql)
self.executeCommit(sql)
def select(self, tablename, cond_dict='', order='', fields='*'):
"""查询数据
args:
tablename :表名字
cond_dict :查询条件
order :排序条件
example:
print mydb.select(table)
print mydb.select(table, fields=["name"])
print mydb.select(table, fields=["name", "age"])
print mydb.select(table, fields=["age", "name"])
"""
consql = ' '
if cond_dict!='':
for k, v in cond_dict.items():
consql = consql+k + '=' + v + ' and'
consql = consql + ' 1=1 '
if fields == "*":
sql = 'select * from %s where ' % tablename
else:
if isinstance(fields, list):
fields = ",".join(fields)
sql = 'select %s from %s where ' % (fields, tablename)
else:
raise "fields input error, please input list fields."
sql = sql + consql + order
print('select:' + sql)
return self.executeSql(sql)
def insertMany(self,table, attrs, values):
"""插入多条数据
args:
tablename :表名字
attrs :属性键
values :属性值
example:
table='test_mysqldb'
key = ["id" ,"name", "age"]
value = [[101, "liuqiao", "25"], [102,"liuqiao1", "26"], [103 ,"liuqiao2", "27"], [104 ,"liuqiao3", "28"]]
mydb.insertMany(table, key, value)
"""
values_sql = ['%s' for v in attrs]
attrs_sql = '('+','.join(attrs)+')'
values_sql = ' values('+','.join(values_sql)+')'
sql = 'insert into %s'% table
sql = sql + attrs_sql + values_sql
print('insertMany:'+sql)
try:
print(sql)
for i in range(0,len(values),20000):
self.cur.executemany(sql,values[i:i+20000])
self.con.commit()
except pymysql.Error as e:
self.con.rollback()
error = 'insertMany executemany failed! ERROR (%s): %s' %(e.args[0],e.args[1])
print(error)
def delete(self, tablename, cond_dict):
"""删除数据
args:
tablename :表名字
cond_dict :删除条件字典
example:
params = {"name" : "caixinglong", "age" : "38"}
mydb.delete(table, params)
"""
consql = ' '
if cond_dict!='':
for k, v in cond_dict.items():
if isinstance(v, str):
v = "\'" + v + "\'"
consql = consql + tablename + "." + k + '=' + v + ' and '
consql = consql + ' 1=1 '
sql = "DELETE FROM %s where%s" % (tablename, consql)
print (sql)
return self.executeCommit(sql)
def update(self, tablename, attrs_dict, cond_dict):
"""更新数据
args:
tablename :表名字
attrs_dict :更新属性键值对字典
cond_dict :更新条件字典
example:
params = {"name" : "caixinglong", "age" : "38"}
cond_dict = {"name" : "liuqiao", "age" : "18"}
mydb.update(table, params, cond_dict)
"""
attrs_list = []
consql = ' '
for tmpkey, tmpvalue in attrs_dict.items():
attrs_list.append("`" + tmpkey + "`" + "=" +"\'" + tmpvalue + "\'")
attrs_sql = ",".join(attrs_list)
print("attrs_sql:", attrs_sql)
if cond_dict!='':
for k, v in cond_dict.items():
if isinstance(v, str):
v = "\'" + v + "\'"
consql = consql + "`" + tablename +"`." + "`" + k + "`" + '=' + v + ' and '
consql = consql + ' 1=1 '
sql = "UPDATE %s SET %s where%s" % (tablename, attrs_sql, consql)
print(sql)
return self.executeCommit(sql)
def dropTable(self, tablename):
"""删除数据库表
args:
tablename :表名字
"""
sql = "DROP TABLE %s" % tablename
self.executeCommit(sql)
def deleteTable(self, tablename):
"""清空数据库表
args:
tablename :表名字
"""
sql = "DELETE FROM %s" % tablename
self.executeCommit(sql)
def isExistTable(self, tablename):
"""判断数据表是否存在
args:
tablename :表名字
Return:
存在返回True,不存在返回False
"""
sql = "select * from %s" % tablename
result = self.executeCommit(sql)
if result is None:
return True
else:
if re.search("doesn't exist", result):
return False
else:
return True
if __name__ == "__main__":
mydb = MysqldbHelper()
print(mydb.getVersion())
table='test_mysqldb'
attrs={'name':'varchar(200) DEFAULT NULL','age':'int(11) DEFAULT NULL'}
constraint='PRIMARY KEY(`id`)'
print(mydb.creatTable(table, attrs, constraint))
params = {"name" : "caixinglong", "age" : "38"}
mydb.insert('test_mysqldb', params)
print(mydb.select(table))
print(mydb.select(table, fields=["name", "age"]))
print( mydb.select(table, fields=["age", "name"]))
key = ["id" ,"name", "age"]
value = [[101, "liuqiao", "25"], [102,"liuqiao1", "26"], [103 ,"liuqiao2", "27"], [108 ,"liuqiao3", "28"]]
mydb.insertMany(table, key, value)
mydb.delete(table, params)
cond_dict = {"name" : "liuqiao", "age" : "18"}
mydb.update(table, params, cond_dict)
# mydb.deleteTable(table)
# mydb.dropTable(table)
print(mydb.select(table+ "1"))
print( mydb.isExistTable(table+ "1"))