FastAPI
基本使用
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 
 | from fastapi import FastAPI, Form, File
 import uvicorn
 
 
 app = FastAPI()
 app = FastAPI(docs_url=None, redoc_url=None)
 
 
 
 @app.get("/")
 async def root():
 return  "Hello World
 
 # 子路径
 @app.get("/items/{item_id}")
 async def read_item(item_id: int):
 return fake_items_db[item_id]
 
 # 设置查询参数,GET /items?skip=1&limit=2
 @app.get("/items")
 async def read_item(skip:int = 0, limit:int = 10):
 return fake_items_db[skip:skip + limit]
 
 # Formdata: {text: a, language: b}
 @app.post("/google-fy")
 async def google(text=Form(...), language=Form(...)):
 return {"code": , "data": result}
 
 # Payload
 async @app.post('/tmall-login')
 def tmall_login(shop: dict):
 await conruti
 
 # File
 @app.post("/captcha")
 def recognize_captcha(image: bytes = File(...)):
 text = muggle_captcha(image)
 return {"result": text}
 
 # 添加路由前缀
 app.mount("/items", sub_app)
 
 if __name__ == '__main__':
 '''
 reload=True, 修改代码后会自动加载, DEBUG模式
 '''
 workers = 2
 uvicorn.run(
 app='文件名:app',
 host='0.0.0.0',
 port=8848,
 workers=workers,
 limit_concurrency=workers  # 限制1个并发,1个是给主线程的
 )
 
 | 
默认接口文档:http://127.0.0.1:8000/docs
分组路由
模块文件 xxx.py
| 12
 3
 4
 
 | router = APIRouter()
 @router.post("/bind_cookie")
 def bind_cookie(item: dict):
 
 | 
主文件 app.py
| 12
 
 | app = FastAPI()app.include_router(xxx.router, tags=["path"])
 
 | 
aiohttp
使用教程:https://blog.csdn.net/weixin_41173374/article/details/81516304
启动服务端
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 
 | from aiohttp import web
 
 async def handle(request):
 name = request.match_info.get("name", "Anonymous")
 text = "Hello, " + name
 return web.Response(text=text)
 
 
 app = web.Application()
 app.add_routes(
 [
 web.get('/', handle),
 web.get('/{name}', handle),
 web.post()
 ]
 )
 
 
 if __name__ == "__main__":
 
 web.run_app(app)
 
 | 
文件流上传 / 流读取
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 
 | import aiohttpimport asyncio
 
 
 async def file_sender(session, file_url):
 """
 读取文件块
 """
 async with session.get(file_url) as response:
 while True:
 chunk = await response.content.read(100 * 1024)
 if not chunk:
 break
 yield chunk
 
 
 
 async def main(upload_api, file_url):
 timeout = aiohttp.ClientTimeout(total=600)
 async with aiohttp.ClientSession(timeout=timeout) as session:
 data = aiohttp.FormData()
 data.add_field(
 name='file',
 value=file_sender(session, file_url),
 filename='BBC南极洲',
 content_type='',
 )
 
 async with session.post(upload_api, data=data) as response:
 result = await response.json()
 
 
 if __name__ == "__main__":
 loop = asyncio.get_event_loop()
 loop.run_until_complete(main(upload_api, file_url))
 
 
 | 
请求多条url
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 
 | async def main():async with aiohttp.ClientSession() as session:
 await asyncio.gather(
 *[parse_imgs(session, url) for url in url_lst]
 )
 
 
 async def parse_imgs(session, art_url):
 async with session.get(art_url) as response:
 dom = Selector(await response.text())
 img_urls = dom.css("div#js_content>img::attr(data-src)").getall()
 
 assert len(img_urls) > 10
 await asyncio.gather(
 *[save_img(session, url) for url in img_urls]
 )
 
 |