Python 操作 MySQL 数据库的三个模块

网友投稿 1104 2023-04-24

Python 操作 MySQL 数据库的三个模块

Python 操作 MySQL 数据库的三个模块

​python使用MySQL主要有两个模块,pymysql(MySQLdb)和SQLAchemy。

pymysql(MySQLdb)为原生模块,直接执行sql语句,其中pymysql模块支持python 2和python3,MySQLdb只支持python2,两者使用起来几乎一样。SQLAchemy为一个ORM框架,将数据对象转换成SQL,然后使用数据API执行SQL并获取执行结果另外DBUtils模块提供了一个数据库连接池,方便多线程场景中python操作数据库。

1.pymysql模块

安装:pip install pymysql

创建表格操作(注意中文格式设置)

增删改查:

注意execute执行sql语句参数的两种情况:

execute("insert into t_sales(nickName, size) values('%s','%s');" % ("zack","L") )  #此时的%s为字符窜拼接占位符,需要引号加'%s'  (有sql注入风险)execute("insert into t_sales(nickName, size) values(%s,%s);" , ("zack","L") ) #此时的%s为sql语句占位符,不需要引号%s

批量插入和自增id

获取查询数据

注:在fetch数据时按照顺序进行,可以使用cursor.scroll(num,mode)来移动游标位置,如:

cursor.scroll(1,mode='relative')  # 相对当前位置移动cursor.scroll(2,mode='absolute') # 相对绝对位置移动

fetch获取数据类型

fetch获取的数据默认为元组格式,还可以获取字典类型的,如下:

2.SQLAlchmy框架

SQLAlchemy的整体架构如下,建立在第三方的DB API上,将类和对象操作转换为数据库sql,然后利用DB API执sql语句得到结果。其适用于多种数据库。另外其内部实现了数据库连接池,方便进行多线程操作。

Engine,框架的引擎Connection Pooling ,数据库连接池​​Dialect​​​,选择连接数据库的DB API种类,(pymysql,mysqldb等)``Schema/Types,架构和类型SQL Exprression Language,SQL表达式语言​​DB API​​:Python Database API Specification

2.1 执行原生sql

安装:pip install sqlalchemy

SQLAlchmy也可以不利用ORM,使用数据库连接池,类似pymysql模块执行原生sql。

#coding:utf-8from sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, String, Integerimport threadingengine = create_engine( "mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8", max_overflow = 0, #超过连接池大小外最多创建的连接,为0表示超过5个连接后,其他连接请求会阻塞 (默认为10) pool_size = 5, #连接池大小(默认为5) pool_timeout = 30, #连接线程池中,没有连接时最多等待的时间,不设置无连接时直接报错 (默认为30) pool_recycle = -1) #多久之后对线程池中的线程进行一次连接的回收(重置) (默认为-1) # def task(): # conn= engine.raw_connection() #建立原生连接,和pymysql的连接一样 # cur = conn.cursor() # cur.execute("select * from t_sales where id>%s",(2,)) # result = cur.fetchone() # cur.close() # conn.close() # print(result) # def task(): # conn = engine.contextual_connect() #建立上下文管理器连接,自动打开和关闭 # with conn: # cur = conn.execute("select * from t_sales where id>%s",(2,)) # result = cur.fetchone() # print(result) def task(): cur =engine.execute("select * from t_sales where id>%s",(2,)) #engine直接执行 result = cur.fetchone() cur.close() print(result)if __name__=="__main__": for i in range(10): t = threading.Thread(target=task) t.start()

2.2 执行ORM语句

A. 创建和删除表

#coding:utf-8import datetimefrom sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, String, Integer, DateTime, TextBase = declarative_base()class User(Base): __tablename__="users" id = Column(Integer,primary_key=True) name = Column(String(32),index=True, nullable=False) #创建索引,不为空 email = Column(String(32),unique=True) ctime = Column(DateTime, default = datetime.datetime.now) #传入方法名datetime.datetime.now extra = Column(Text,nullable=True) __table_args__ = { # UniqueConstraint('id', 'name', name='uix_id_name'), #设置联合唯一约束 # Index('ix_id_name', 'name', 'email'), # 创建索引 }def create_tbs(): engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8",max_overflow=2,pool_size=5) Base.metadata.create_all(engine) #创建所有定义的表def drop_dbs(): engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8",max_overflow=2,pool_size=5) Base.metadata.drop_all(engine) #删除所有创建的表if __name__=="__main__": create_tbs() #创建表 #drop_dbs() #删除表

B.表中定义外键关系(一对多,多对多)

思考:下面代码中的一对多关系,relationship 定义在了 customer 表中,应该定义在 PurchaseOrder 更合理?

注意:mysql 数据库中避免使用 order做为表的名字,order 为一个 mysql 关键字,做为表名字时必须用反引号order (键盘数字1旁边的符号)。

#coding:utf-8from sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column,Integer,String,Text,DateTime,ForeignKey,Floatfrom sqlalchemy.orm import relationshipimport datetimeengine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8") #数据库有密码时,//root:12345678@127.0.0.1:3306/Base = declarative_base()class Customer(Base): __tablename__="customer" #数据库中保存的表名字 id = Column(Integer,primary_key=True) name = Column(String(64),index=True,nullable=False) phone = Column(String(16),nullable=False) address = Column(String(256),nullable=False) purchase_order_id = Column(Integer,ForeignKey("purchase_order.id")) #关键关系,关联表的__tablename__="purchase_order" # 和建立表结构无关,方便外键关系查询,backref反向查询时使用order_obj.customer purchase_order = relationship("PurchaseOrder",backref="customer") class PurchaseOrder(Base): __tablename__ = "purchase_order" #mysql数据库中表的名字避免使用order,order为一个关键字,使用时必须用反引号`order` (键盘数字1旁边的符号) id=Column(Integer,primary_key=True) cost = Column(Float,nullable=True) ctime = Column(DateTime,default =datetime.datetime.now) desc = Column(String(528)) #多对多关系时,secondary为中间表 product = relationship("Product",secondary="order_to_product",backref="purchase_order") class Product(Base): __tablename__ = "product" id = Column(Integer,primary_key=True) name = Column(String(256)) price = Column(Float,nullable=False) class OrdertoProduct(Base): __tablename__ = "order_to_product" id = Column(Integer,primary_key=True) product_id = Column(Integer,ForeignKey("product.id")) purchase_order_id = Column(Integer,ForeignKey("purchase_order.id")) if __name__ == "__main__": Base.metadata.create_all(engine) #Base.metadata.drop_all(engine)

C.增删改查操作

增删改查

查询语句

# 条件ret = session.query(Users).filter_by(name='alex').all()ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()from sqlalchemy import and_, or_ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()ret = session.query(Users).filter( or_( Users.id < 2, and_(Users.name == 'eric', Users.id > 3), Users.extra != "" )).all()# 通配符ret = session.query(Users).filter(Users.name.like('e%')).all()ret = session.query(Users).filter(~Users.name.like('e%')).all()# 限制ret = session.query(Users)[1:2]# 排序ret = session.query(Users).order_by(Users.name.desc()).all()ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()# 分组from sqlalchemy.sql import funcret = session.query(Users).group_by(Users.extra).all()ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).all()ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()# 连表ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()ret = session.query(Person).join(Favor).all()ret = session.query(Person).join(Favor, isouter=True).all()# 组合q1 = session.query(Users.name).filter(Users.id > 2)q2 = session.query(Favor.caption).filter(Favor.nid < 2)ret = q1.union(q2).all()q1 = session.query(Users.name).filter(Users.id > 2)q2 = session.query(Favor.caption).filter(Favor.nid < 2)ret = q1.union_all(q2).all()

补充

D.多线程操作

E. 通过relationship操纵一对多和多对多关系

一对多

多对多

3.数据库连接池

对于ORM框架,其内部维护了链接池,可以直接通过多线程操控数据库。对于pymysql模块,通过多线程操控数据库容易出错,得加锁串行执行。进行并发时,可以利用DBUtils模块来维护数据库连接池。

3.1 多线程操控pymysql

不采用DBUtils连接池时, pymysql多线程代码如下:

每个线程创建链接

import pymysqlimport threadind#**************************无连接池******************************* #每个线程都要创立一次连接,线程并发操作间可能有问题?def func(): conn = pymysql.connect(host="127.0.0.1",port=3306,db="learningsql",user="root",passwd="",charset="utf8") cursor = conn.cursor() cursor.execute("select * from user where nid>1;") result = cursor.fetchone() print(result) cursor.close() conn.close() if __name__=="__main__": for i in range(5): t = threading.Thread(target=func,name="thread-%s"%i) t.start()

一个连接串行执行

#**************************无连接池*******************************#创建一个连接,加锁串行执行from threading import Lockimport pymysqlimport threadingconn = pymysql.connect(host="127.0.0.1",port=3306,db="learningsql",user="root",passwd="",charset="utf8") lock = Lock() def func(): with lock: cursor = conn.cursor() cursor.execute("select * from user where nid>1;") result = cursor.fetchone() print(result) cursor.close() #conn.close()不能在线程中关闭连接,否则其他线程不可用了 if __name__=="__main__": threads = [] for i in range(5): t = threading.Thread(target=func,name="thread-%s"%i) threads.append(t) t.start() for t in threads: t.join() conn.close()

3.2 DBUtils连接池

DBUtils连接池有两种连接模式:PersistentDB和PooledDB

模式一(DBUtils.PersistentDB):

为每个线程创建一个连接,线程即使调用了close方法,也不会关闭,只是把连接重新放到连接池,供自己线程再次使用。当线程终止时,连接自动关闭。

PersistentDB使用代码如下:

#coding:utf-8from DBUtils.PersistentDB import PersistentDBimport pymysqlimport threadingpool = PersistentDB( creator = pymysql, # 使用链接数据库的模块 maxusage = None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping = 0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always closeable = False, # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接) threadlocal = None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置 host="127.0.0.1", port = 3306, user = "root", password="", database="learningsql", charset = "utf8")def func(): conn = pool.connection() cursor = conn.cursor() cursor.execute("select * from user where nid>1;") result = cursor.fetchone() print(result) cursor.close() conn.close() if __name__ == "__main__": for i in range(5): t = threading.Thread(target=func,name="thread-%s"%i) t.start()

模式二(DBUtils.PooledDB):

创建一批连接到连接池,供所有线程共享使用。

(由于pymysql、MySQLdb等threadsafety值为1,所以该模式连接池中的线程会被所有线程共享。)

PooledDB使用代码如下:

from DBUtils.PooledDB import PooledDBimport pymysqlimport threadingimport timepool = PooledDB( creator = pymysql, # 使用链接数据库的模块 maxconnections = 6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached = 2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached = 5, # 链接池中最多闲置的链接,0和None不限制 maxshared = 3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking = True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage = None, # 一个链接最多被重复使用的次数,None表示无限制 setsession = [], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping = 0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host="127.0.0.1", port = 3306, user="root", password="", database = "learningsql", charset = "utf8")def func(): conn = pool.connection() cursor = conn.cursor() cursor.execute("select * from user where nid>1;") result = cursor.fetchone() print(result) time.sleep(5) #为了查看mysql端的线程数量 cursor.close() conn.close() if __name__=="__main__": for i in range(5): t = threading.Thread(target=func,name="thread-%s"%i) t.start()

上述代码中加入了sleep(5)使线程连接数据库时间延长,方便查看mysql数据库连接线程情况,下图分别为代码执行中和执行后的线程连接情况,可以发现,代码执行时,同时有6个线程连接上了数据库(有一个为mysql命令客户端),代码执行后,只有一个线程连接数据库,但仍有5个线程等待连接。

(show status like "Threads%" 查看线程连接情况)

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:简析XDP的重定向机制
下一篇:MySQL基础架构:SQL查询语句执行过程
相关文章