FastAPI

基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from fastapi import FastAPI, Form, File

import uvicorn


app = FastAPI()

# 根路径
@app.get("/")
async def root():
return "Hello World

# 子路径
@app.get("/items/{item_id}")
async def read_item(item_id: int):
return fake_items_db[item_id]

# 设置查询参数,GET /items?skip=1&limit=2
@app.get("/items")
async def read_item(skip:int = 0, limit:int = 10):
return fake_items_db[skip:skip + limit]

# Formdata: {text: a, language: b}
@app.post("/google-fy")
async def google(text=Form(...), language=Form(...)):
return {"code": , "data": result}

# Payload
async @app.post('/tmall-login')
def tmall_login(shop: dict):
await conruti

# File
@app.post("/captcha")
def recognize_captcha(image: bytes = File(...)):
text = muggle_captcha(image)
return {"result": text}

# 添加路由前缀
app.mount("/items", sub_app)

if __name__ == '__main__':
'''
reload=True, 修改代码后会自动加载, DEBUG模式
'''
uvicorn.run('文件名:app', host='0.0.0.0', port=8848, workers=2)

默认接口文档:http://127.0.0.1:8000/docs

分组路由

模块文件 xxx.py

1
2
3
4
router = APIRouter()

@router.post("/bind_cookie")
def bind_cookie(item: dict):

主文件 app.py

1
2
app = FastAPI()
app.include_router(xxx.router, tags=["path"])

aiohttp

使用教程:https://blog.csdn.net/weixin_41173374/article/details/81516304

启动服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from aiohttp import web


async def handle(request):
name = request.match_info.get("name", "Anonymous")
text = "Hello, " + name
return web.Response(text=text)


app = web.Application()
app.add_routes(
[
web.get('/', handle),
web.get('/{name}', handle),
web.post()
]
)


if __name__ == "__main__":
# Running on http://0.0.0.0:8080
web.run_app(app)

文件流上传 / 流读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import aiohttp
import asyncio


async def file_sender(session, file_url):
"""
读取文件块
"""
async with session.get(file_url) as response:
while True:
chunk = await response.content.read(100 * 1024)
if not chunk:
break
yield chunk



async def main(upload_api, file_url):
timeout = aiohttp.ClientTimeout(total=600)
async with aiohttp.ClientSession(timeout=timeout) as session:
data = aiohttp.FormData()
data.add_field(
name='file',
value=file_sender(session, file_url), # value 可以是 bytes,也可以是 ioliu
filename='BBC南极洲',
content_type='',
)

async with session.post(upload_api, data=data) as response:
result = await response.json()


if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(upload_api, file_url))

请求多条url

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
async def main():
async with aiohttp.ClientSession() as session:
await asyncio.gather(
*[parse_imgs(session, url) for url in url_lst]
)


async def parse_imgs(session, art_url):
async with session.get(art_url) as response:
dom = Selector(await response.text())
img_urls = dom.css("div#js_content>img::attr(data-src)").getall()
# 断言,如果长度小于10,终止程序
assert len(img_urls) > 10
await asyncio.gather(
*[save_img(session, url) for url in img_urls]
)