1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| from fastapi import FastAPI, Form, File
import uvicorn
app = FastAPI()
@app.get("/") async def root(): return "Hello World
# 子路径 @app.get("/items/{item_id}") async def read_item(item_id: int): return fake_items_db[item_id]
# 设置查询参数,GET /items?skip=1&limit=2 @app.get("/items") async def read_item(skip:int = 0, limit:int = 10): return fake_items_db[skip:skip + limit]
# Formdata: {text: a, language: b}"/google-fy") async def google(text=Form(...), language=Form(...)): return {"code": , "data": result} # Payload async'/tmall-login') def tmall_login(shop: dict): await conruti # File"/captcha") def recognize_captcha(image: bytes = File(...)): text = muggle_captcha(image) return {"result": text}
# 添加路由前缀 app.mount("/items", sub_app) if __name__ == '__main__': ''' reload=True, 修改代码后会自动加载, DEBUG模式 ''''文件名:app', host='', port=8848, workers=2)
1 2 3 4
| router = APIRouter()"/bind_cookie") def bind_cookie(item: dict):
1 2
| app = FastAPI() app.include_router(xxx.router, tags=["path"])
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| from aiohttp import web
async def handle(request): name = request.match_info.get("name", "Anonymous") text = "Hello, " + name return web.Response(text=text) app = web.Application() app.add_routes( [ web.get('/', handle), web.get('/{name}', handle), ] )
if __name__ == "__main__": web.run_app(app)
文件流上传 / 流读取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| import aiohttp import asyncio
async def file_sender(session, file_url): """ 读取文件块 """ async with session.get(file_url) as response: while True: chunk = await * 1024) if not chunk: break yield chunk
async def main(upload_api, file_url): timeout = aiohttp.ClientTimeout(total=600) async with aiohttp.ClientSession(timeout=timeout) as session: data = aiohttp.FormData() data.add_field( name='file', value=file_sender(session, file_url), filename='BBC南极洲', content_type='', ) async with, data=data) as response: result = await response.json()
if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(main(upload_api, file_url))
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| async def main(): async with aiohttp.ClientSession() as session: await asyncio.gather( *[parse_imgs(session, url) for url in url_lst] )
async def parse_imgs(session, art_url): async with session.get(art_url) as response: dom = Selector(await response.text()) img_urls = dom.css("div#js_content>img::attr(data-src)").getall() assert len(img_urls) > 10 await asyncio.gather( *[save_img(session, url) for url in img_urls] )