Redis

相关操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
'''
建立连接
'''

# 本地
redis_con = redis.Redis()

# 指定host
redis_con = redis.Redis(
host="10.82.193.194", port=6380, password="", db=2
)

'''
集合(用于去重)
'''
redis_con.sismember(key, value) # 是否存在
redis_con.sadd(key, value) # 添加单条数据
redis_con.sadd(key, *lst) # 添加列表
redis_con.smembers(key) # 获取全部值
# return [bytes, bytes ...]

'''
列表
'''
length = redis_conn.llen(key)
redis_conn.lpush(queue_key, *data_lst)

'''
字符串()
'''
redis_conn.set(key, value)
# 设置过期时间,过期则get为none
redis_conn.expire(key, seconds)


'''
存储字典
'''
redis_cli.lpush(key, json.dumps(doc))
obj = redis_cli.rpop(key)
doc = json.loads(obj)

复制表结构

从原表导出sql文件

image-20240329153500770

目标数据库运行sql

image-20240329153538974

快捷筛选操作

image-20240329181601423

入库时间自动生成

image-20240401162513659

查询服务器命令

1
2
show processlist;
kill id;

image-20240409110341302

复制迁移表

image-20240507173913279

MongoDB

motor的使用 - 文档

Motor 是一个异步实现的 MongoDB 存储库,Motor 与 Pymongo 的配置基本类似,详见

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import asyncio

from motor import motor_asyncio

'''
创建连接
'''
mongo_con = motor_asyncio.AsyncIOMotorClient() # 本地
mongo_url = "mongodb://spiderManager:@10.82.193.194:30000/admin"
mongo_con = motor_asyncio.AsyncIOMotorClient(mongo_url) # 指定
db = mongo_con.get_database("spider")
collection = db.get_collection("common_wx_article")


async def find_one(query):
query = {"title_md5": "2afbdbe48dd130d3be2f3f691295def9"}

# 基本
result = await user_col.find_one({"user_id": user_id})

# 进行排序、跳过、限制操作
cursor = collection.find(query)
cursor.sort('name', -1).skip(1).limit(2)
documents = await cursor.to_list(length=None)

async def find_all():
async for doc in game_col.find({}):
user_id = doc["user_id"]

await new_col.insert_one(new_doc)

"""
两种启动方式
"""
# 第一种
loop = asyncio.get_event_loop()
loop.run_until_complete(find_one(query))

# 第二种
asyncio.run(find_one(query))

PyMongo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
'''
创建连接
'''
mongo_con = pymongo.MongoClient()
cxs_col = mongo_con.get_database("spider").get_collection("baikecitiao_cxs")

# 创建索引
>>> from pymongo import IndexModel, ASCENDING, DESCENDING
>>> index1 = IndexModel([("hello", DESCENDING),
... ("world", ASCENDING)], name="hello_world")
>>> index2 = IndexModel([("goodbye", DESCENDING)], unique=True)
>>> db.test.create_indexes([index1, index2]) # 多个
>>> db.test.create_index([("world", ASCENDING)]) # 单个
## 检查是否存在索引?
index_info = self.collection_order.index_information()
if "tradeId_-1" in index_info:
pass



'''
查询条件
'''
col.find({}, {"html": 0}) # 隐藏字段
col.find({}, {"html": 1}) # 只返回html字段
{"_id": {"$gt": start_id}} # 大于
{"user_id": {"$in": [a, b, c]}} # 同一字段多个值
{appendReview: {$ne: []}} # 不等于
{"$or": [{"category": "30011"}, {"user": "cxs"}, {"age": "25"}]} # 不同字段多个值
{_field: {"$regex": "2022-08-29"}} # 正则匹配,模糊查询
{"data.product_item": {$size: 2}} # 匹配数组长度
{"data.product_item.1": {$exists: true}} # 数组长度大于等于2


'''
相关操作
'''
for doc in col.find({}).limit(100).skip(100).sort('age', pymongo.ASCENDING):
pass

doc = col.find_one({"_id": doc["_id"]})

col.update_one(
filter={"_id": doc["_id"]}, # 定位
update={"$set": {"html": html}}, # 更新部分字段,也可以整个doc
upsert=True, # upsert就是 update+insert的意思,记录存在的话就更新,不存在就插入
)

# 删除数据,不存在也可以
result = collection.delete_one(filter=_query)
result.deleted_count

'''
批量拉取数据
'''
fetch_all = collection.find({})
for doc in fetch_all.batch_size(10):
pass

# 统计数量,count() 已经弃用
_count = collection.count_documents(filter={})

'''
bulk_write -> Send a batch of write operations to the server
减少网络IO
'''
>>> from pymongo import InsertOne, DeleteOne, ReplaceOne
>>> requests = [InsertOne({'y': 1}), DeleteOne({'x': 1}), ReplaceOne({'w': 1}, {'z': 1}, upsert=True)]
>>> result = db.test.bulk_write(requests)


'''
aggregate 管道查询
'''
$match 匹配
$exists 字段是否存在
$sample 随机取样

pipeline = [
{"$match": {"name": {"$regex": "cxs"}, "uploaded": {"$exists": False}}},
{"$sample": {"size": size}},
]
for doc in col.aggregate(pipeline)
doc col.aggregate(pipeline).next()

datetime生成ObjectId

1
2
3
4
5
n [19]: gen_time
Out[19]: datetime.datetime(2022, 6, 20, 2, 19, 53, 790424)

In [20]: ObjectId.from_datetime(gen_time)
Out[20]: ObjectId('62afd9490000000000000000')

Studio 3T

1
2
.deleteMany({search_keyword: "OPPO"})  # 删除
.find({insert_timestamp: {$gte: 1667232000}, shop_name: {$in: ["拜灭士官方旗舰店-抖音"]}}) # 条件

小技巧

  • 数据量大的查询,可以根据索引加筛选条件,如:timestamp > 1693483328

  • distinct 关键字,用于特定字段的分组查询,可增加筛选条件

    1
    db.getCollection("zhihu_question").distinct("question_id", {search_keyword: "理想 L9"})

创建新连接

image.png

pymysql

批量拉取数据

1
2
3
4
5
6
7
8
9
cursor = mysql_con.cursor()
sql = 'SELECT nid from tmall_goods WHERE bg_category2_id in ("50006843", "50011740", "50012029", "122650005")'
cursor.execute(sql) # 返回数量
count = 0
while True:
rows = cursor.fetchmany(size=500)
if not rows:
break
id_lst = [row[0] for row in rows]

字典插入数据库

1
2
3
4
5
6
7
8
9
item_keys = [key for key in item.keys()]
keys_str = ", ".join(item_keys)
values_str = ", ".join(["%s"] * len(item_keys))
insert_stmt = (
f"INSERT INTO t_spider_xinhong_note_search_copy1 ({keys_str}) VALUES ({values_str})"
)
value_lst = [item[key] for key in item_keys]
cursor.execute(insert_stmt, value_lst)
conn.commit()

Mysql ORM

peewee

基本使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
db = MySQLDatabase(
"",
user="",
password="",
host="",
port=3306,
)


class OrderCount(Model):
# 需要预先设置一个主键,不然会自动启动id作为主键,容易产生报错
key_name = CharField(primary_key=True)
account_name = CharField()
shop_name = CharField()
col_name = CharField()
shop_count = IntegerField()
chat_time = CharField()
chat_timestamp = IntegerField()
shop_date = TimeField() # datetime.now().strftime('%Y-%m-%d %H:%M:%S')
source = CharField()
a = DateTimeField() # datetime.now()
# id = AutoField(primary_key=False)

class Meta:
database = db
table_name = "order_count"


comment_count = CommentCount.select().where(CommentCount.chat_time == yes_str)
for comment in comment_count:

sqlalchemy

创建连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from sqlalchemy import Column, String, Text, TIMESTAMP, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()
MYSQL_URI = "mysql+pymysql://user:password@host:port/db"
engine = create_engine(
MYSQL_URI,
# echo=True, # 操作日志输出,用于调试
pool_timeout=3600, # 连接回收周期,防断开
pool_recycle=3600 # 连接超时时间(秒)
)
DBSession = sessionmaker(bind=engine)
session = DBSession()
创建模型
1
2
3
4
5
6
7
8
9
10
11
12
class XinHongTable(Base):
__tablename__ = "t_spider_xinhong_note_search"

_id = Column(String, primary_key=True)
dt = Column(String)
create_time = Column(TIMESTAMP)
category = Column(String)
timeRange = Column(String)
frequency = Column(String)
noteId = Column(String)
title = Column(Text)
content = Column(Text)
增删改查
1
2
3
4
5
6
7
8
9
10
session.query(XinHongTable).all()
session.query(XinHongTable).first()

device = session.query(BaseInfoTable).filter_by(device_id=item["device_id"]).first()
device = session.query(BaseInfoTable).filter(
StatusInfoTable.x5_status.in_(item["x5_status_list"]),
StatusInfoTable.collect_type == item["collect_type"],
).first()

session.commit()