1.连接的基本语法如下:
先下载pymySQL
pip install PyMySQL
pyimport pymysql
#创建连接
conn = pymysql.Connect(host="127.0.0.1", port=3306, user="root", passwd="123456",db="test")
# 创建游标
cursor = conn.cursor()
#SQL语句增
sql1 = "insert into user(name) values ('大猪')"
#SQL语句查
sql2 = "select * from user"
#SQL语句改
sql3 = "update user set name = '大大猪' where id = 3"
#SQL语句删
sql4 = "delete from user where id = 3"
# 读取数据
number = cursor.execute(sql4)
print(number)
# 提交事务
conn.commit()
# 关闭游标
cursor.close()
# 关闭连接
conn.close()
2.创建连接模块
# -*- coding: UTF-8 -*-
import pymysql, traceback
class Mysql(object):
# 初始化
def __init__(self):
#配置mysql数据库信息
config = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'passwd': '123456',
'db': 'api',
'charset': 'utf8'
}
try:
self.con = pymysql.connect(**config)
# 所有的查询,都在连接 con 的一个模块 cursor 上面运行的
self.cursor = self.con.cursor(cursor=pymysql.cursors.DictCursor)
except:
# pass
print('数据库连接错误,请检查数据库连接配置。')
# 关闭数据库连接和对象
def __del__(self):
# 关闭cursor对象
self.cursor.close()
self.con.close()
# 查询记录数
def get_count(self, sql='', payload=()):
count = 0
try:
count = self.cursor.execute(sql,payload)
except pymysql.Error:
print('查询记录数失败:' + traceback.format_exc())
return count
# 查询单行记录
def get_one(self, sql='', payload=()):
res = None
try:
self.cursor.execute(sql,payload)
res = self.cursor.fetchone()
except pymysql.Error:
print("查询单行记录失败:" + traceback.format_exc())
return res
# 查询列表记录
def get_all(self, sql='', payload=()):
res = None
try:
self.cursor.execute(sql,payload)
res = self.cursor.fetchall()
except pymysql.Error:
print("查询全部记录失败:" + traceback.format_exc())
return res
# 新增数据
def db_insert(self, sql='', payload=()):
count = 0
try:
count = self.cursor.execute(sql,payload)
self.con.commit()
except pymysql.Error:
print("新增/修改/删除记录失败:" + traceback.format_exc())
self.con.rollback()
return count
# 新增数据返回自增主键值
def db_insert_id(self, sql='', payload=()):
id = None
try:
self.cursor.execute(sql, payload)
self.con.commit()
id = self.cursor.lastrowid
except pymysql.Error:
print("新增记录失败:" + traceback.format_exc())
self.con.rollback()
return id
# 更新记录
def db_update(self, sql='', payload=()):
return self.db_insert(sql, payload)
# 删除记录
def db_delete(self, sql='', payload=()):
return self.db_insert(sql,payload)
3.注意事项
上方的工具类有一个问题,就是在web使用此工具类的时候,因为长时间保持着连接却没有操作,数据库会关闭连接,比如mysql默认在8小时无操作后关闭链接,这时候工具类会因为连接关闭而无法操作数据库,进而抛出异常。
解决的办法就是通过dbutils数据库连接池来解决。
先下载数据库连接池,注意这里使用最新的2.0版本,不同版本语法可能有些许差别:
pip install DBUtils==2.0
改进后的工具类如下:
# -*- coding: UTF-8 -*-
import pymysql, traceback
from dbutils.pooled_db import PooledDB
class Mysql(object):
# 初始化
def __init__(self):
#配置mysql数据库信息
config = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'passwd': '123456',
'db': 'api',
'charset': 'utf8'
}
self.poolDB = PooledDB(
# 指定数据库连接驱动
creator=pymysql,
# 连接池允许的最大连接数,0和None表示没有限制
maxconnections=3,
# 初始化时,连接池至少创建的空闲连接,0表示不创建
mincached=2,
# 连接池中空闲的最多连接数,0和None表示没有限制
maxcached=6,
# 连接池中最多共享的连接数量,0和None表示全部共享(其实没什么卵用)
maxshared=3,
# 连接池中如果没有可用共享连接后,是否阻塞等待,True表示等等,
# False表示不等待然后报错
blocking=True,
# 开始会话前执行的命令列表
setsession=[],
# ping Mysql服务器检查服务是否可用
ping=0,
**config
)
# 创建连接
def create_conn(self):
# 从数据库连接池中取出一条连接
conn = self.poolDB.connection()
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
return conn, cursor
# 关闭连接
def close_conn(self, conn, cursor):
if conn:
conn.close()
if cursor:
cursor.close()
# 查询记录数
def get_count(self, sql='', payload=()):
conn, cursor = self.create_conn()
count = 0
try:
count = cursor.execute(sql,payload)
except pymysql.Error:
print('查询记录数失败:' + traceback.format_exc())
finally:
self.close_conn(conn, cursor)
return count
# 查询单行记录
def get_one(self, sql='', payload=()):
conn, cursor = self.create_conn()
res = None
try:
cursor.execute(sql,payload)
res = cursor.fetchone()
except pymysql.Error:
print("查询单行记录失败:" + traceback.format_exc())
finally:
self.close_conn(conn, cursor)
return res
# 查询列表记录
def get_all(self, sql='', payload=()):
conn, cursor = self.create_conn()
res = None
try:
cursor.execute(sql,payload)
res = cursor.fetchall()
except pymysql.Error:
print("查询全部记录失败:" + traceback.format_exc())
finally:
self.close_conn(conn, cursor)
return res
# 新增数据
def db_insert(self, sql='', payload=()):
conn, cursor = self.create_conn()
count = 0
try:
count = cursor.execute(sql,payload)
conn.commit()
except pymysql.Error:
print("新增/修改/删除记录失败:" + traceback.format_exc())
conn.rollback()
finally:
self.close_conn(conn, cursor)
return count
# 新增数据返回自增主键值
def db_insert_id(self, sql='', payload=()):
conn, cursor = self.create_conn()
id = None
try:
cursor.execute(sql, payload)
conn.commit()
id = cursor.lastrowid
except pymysql.Error:
print("新增记录失败:" + traceback.format_exc())
conn.rollback()
finally:
self.close_conn(conn, cursor)
return id
# 更新记录
def db_update(self, sql='', payload=()):
return self.db_insert(sql, payload)
# 删除记录
def db_delete(self, sql='', payload=()):
return self.db_insert(sql,payload)
评论