创建连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session

MYSQL_URI = "mysql+pymysql://user:password@host:port/db"
engine = create_engine(
MYSQL_URI,
# echo=True, # 操作日志输出,用于调试
pool_timeout=3600, # 连接回收周期,防断开
pool_recycle=3600 # 连接超时时间(秒)
pool_size=200, # 连接池并发数
pool_pre_ping=True,
)
session_maker = sessionmaker(bind=engine)
db_session = scoped_session(session_maker)
创建模型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from sqlalchemy import Column, Integer, String, Text, TIMESTAMP, PrimaryKeyConstraint, BigInteger, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func

Base = declarative_base()


class XinHongTable(Base):
__tablename__ = "t_spider_xinhong_note_search"

_id = Column(String(100), primary_key=True, autoincrement=True, unique=True) # 主键,自增,唯一索引
dt = Column(String(255), nullable=False) # 不可为空
create_time = Column(TIMESTAMP)
category = Column(BigInteger)
timeRange = Column(String)
noteId = Column(Integer)
content = Column(Text)
插入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from sqlalchemy.dialects.mysql import insert

# 第一种
new_items = []
for item in item_lst:
_item = dict(
dt=datetime.now().strftime("%Y-%m-%d"),
create_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
item_id=item.get("item_id"),
shop_id=item.get("shop_id"),
comment_id=item.get("comment_id"),
comment_content=item.get("comment_content"),
comment_seller_name=item.get("comment_seller_name", None),
comment_sku=item.get("comment_sku", None),
comment_time=item.get("comment_time", None),
sale_day=item.get("sale_day", None),
)
new_items.append(_item)
insert_stmt = (
insert(TMallGoodsCommentNew).values(new_items).prefix_with("IGNORE")
) # 有id冲突时,不影响其它插入
session.execute(insert_stmt)
session.commit()

# 第二种
record = DayInfoTable(
dt=datetime.now(),
device_id=device_id,
collect_type=collect_type,
req_success_count=0,
req_fail_count=0,
slide1_success_count=0,
slide1_fail_count=0,
slide2_success_count=0,
slide2_fail_count=0,
create_time=datetime.now(),
)
db_session.add(record)
db_session.commit()
查询 / 更新
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
query = session.query(BaseInfoTable).filter_by(device_id=item["device_id"])
query = session.query(BaseInfoTable).filter(
StatusInfoTable.x5_status.in_(item["x5_status_list"]), # 数组
StatusInfoTable.collect_type == item["collect_type"],
BaseInfoTable.support_interface.like(f"%{collect_type}%"), # 模糊查询
not_(TMallGoodsCommentNew.dt == today), # 非
BaseInfoTable.update_time > ten_days_ago, # 大于
)
query.order_by(desc(TMallGoodsCommentNew.comment_id)) # 倒序排序

ret = query.all()
ret = query.first()

ret.__dict__.copy() # 转换字典

session.commit()

# 联表查询
# 创建表别名,用于同一张表引用两次
config_table_alias = aliased(ConfigInfoTable)
all_config = (
db_session.query(ConfigInfoTable, config_table_alias)
.join(
config_table_alias,
ConfigInfoTable.config_value == config_table_alias.config_value,
)
.filter(ConfigInfoTable.config_key == f"domain_interface_{collect_type}")
.all()
)