FastAPI
基本使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| from fastapi import FastAPI, Form, File
import uvicorn
app = FastAPI()
@app.get("/") async def root(): return "Hello World
# 子路径 @app.get("/items/{item_id}") async def read_item(item_id: int): return fake_items_db[item_id]
# 设置查询参数,GET /items?skip=1&limit=2 @app.get("/items") async def read_item(skip:int = 0, limit:int = 10): return fake_items_db[skip:skip + limit]
# Formdata: {text: a, language: b} @app.post("/google-fy") async def google(text=Form(...), language=Form(...)): return {"code": , "data": result} # Payload async @app.post('/tmall-login') def tmall_login(shop: dict): await conruti # File @app.post("/captcha") def recognize_captcha(image: bytes = File(...)): text = muggle_captcha(image) return {"result": text}
# 添加路由前缀 app.mount("/items", sub_app) if __name__ == '__main__': ''' reload=True, 修改代码后会自动加载, DEBUG模式 ''' uvicorn.run('文件名:app', host='0.0.0.0', port=8848, workers=2)
|
默认接口文档:http://127.0.0.1:8000/docs
分组路由
模块文件 xxx.py
1 2 3 4
| router = APIRouter()
@router.post("/bind_cookie") def bind_cookie(item: dict):
|
主文件 app.py
1 2
| app = FastAPI() app.include_router(xxx.router, tags=["path"])
|
aiohttp
使用教程:https://blog.csdn.net/weixin_41173374/article/details/81516304
启动服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| from aiohttp import web
async def handle(request): name = request.match_info.get("name", "Anonymous") text = "Hello, " + name return web.Response(text=text) app = web.Application() app.add_routes( [ web.get('/', handle), web.get('/{name}', handle), web.post() ] )
if __name__ == "__main__": web.run_app(app)
|
文件流上传 / 流读取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| import aiohttp import asyncio
async def file_sender(session, file_url): """ 读取文件块 """ async with session.get(file_url) as response: while True: chunk = await response.content.read(100 * 1024) if not chunk: break yield chunk
async def main(upload_api, file_url): timeout = aiohttp.ClientTimeout(total=600) async with aiohttp.ClientSession(timeout=timeout) as session: data = aiohttp.FormData() data.add_field( name='file', value=file_sender(session, file_url), filename='BBC南极洲', content_type='', ) async with session.post(upload_api, data=data) as response: result = await response.json()
if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(main(upload_api, file_url))
|
请求多条url
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| async def main(): async with aiohttp.ClientSession() as session: await asyncio.gather( *[parse_imgs(session, url) for url in url_lst] )
async def parse_imgs(session, art_url): async with session.get(art_url) as response: dom = Selector(await response.text()) img_urls = dom.css("div#js_content>img::attr(data-src)").getall() assert len(img_urls) > 10 await asyncio.gather( *[save_img(session, url) for url in img_urls] )
|