Reference: API Reference — Pyppeteer 0.0.25 documentation
Pyppeteer RPC 核心 API
执行js语句:page.evaluate
设置拦截:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 await page.setRequestInterception(True )self.page.on('response' , lambda res: asyncio.ensure_future(self.intercept_response(res))) self.page.on('request' , lambda req: asyncio.ensure_future(self.intercept_request(req))) self.page.on( "console" , lambda msg: asyncio.ensure_future(self.log_console(msg)) ) async def intercept_request (self, request ): if request.method == "POST" : ... if request.resourceType in ["stylesheet" , "media" , "image" ]: await request.abort() if request.url.startswith("xxx" ): js_path = "xxx.js" content = open (js_path, "rb" ).read() await request.respond(response={"body" : content}) await request.continue_() async def intercept_response (self, response ): if "/api/order/searchlist" in response.url: data = json.loads(await response.text()) async def log_console (self, msg ): text = await msg.text() print (msg.type , ">>>" , msg.text)
fetch ajax 数据 使用 Fetch - Web API 接口参考 | MDN
GET 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 async function getData ( ) { var params = { "param1" : "value1" , "param2" : "value2" }; var queryString = new URLSearchParams (params).toString (); var _url = "https://www.baidu.com?" + queryString; return new Promise (function (resolve, reject ) { fetch (_url) .then (response => response.json ()) .then (data => { resolve (data); }); }) };
Payload 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 result = await self.page.evaluate( """async () => { async function postData() { var _payload = { "pageNum": 1, "pageSize": 25, "startTime": 1675440000, "endTime": 1676044799, "keywords": "" } var _url = "https://mms.pinduoduo.com/latitude/search/message/getMessagesUsers"; return new Promise(function (resolve, reject) { fetch(_url, { method: 'POST', mode: 'cors', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(_payload) }) .then(response => response.json()) .then(data => { resolve(data); }); }) }; var res=await postData() return res }""" )
构造一个包含表单数据的对象,比如:
1 2 3 4 const formData = { name : 'John' , email : 'john@example.com' }
使用 URLSearchParams 类将对象编码为 urlencode 格式:
1 const params = new URLSearchParams (formData).toString ()
在 fetch 请求中,设置请求头的 Content-Type 为 application/x-www-form-urlencoded
1 2 3 4 5 6 7 fetch (url, { method : 'POST' , headers : { 'Content-Type' : 'application/x-www-form-urlencoded' }, body : params })
XMLHttpRequest 主动请求,添加代理
Connect 连接浏览器实例 参考:https://medium.com/@jaredpotter1/connecting-puppeteer-to-existing-chrome-window-8a10828149e0
设置 --remote-debugging-port
,查看地址:
1 http://127.0.0.1:9222//json/version
1 2 3 4 5 6 7 8 9 self.browser = await pyppeteer.connect( { "browserWSEndpoint" : "ws://127.0.0.1:9222/devtools/browser/63ba1a24-c615-4448-9621-0a8c94f02318" } ) pages = await self.browser.pages() self.page = pages[0 ] await self.page.evaluate('console.clear()' )
subprocess 启动无痕chrome 1 subprocess.Popen("chrome --incognito --remote-debugging-port=9222 http://127.0.0.1:9222/json/version" )
获取跨域iframe 在初始化参数中,添加下面两项,记得使用默认的chromium
1 2 3 4 args: [ '--disable-web-security' , '--disable-features=IsolateOrigins,site-per-process' ]
Chrome文件替换的两种方式 Fiddler Auto Response fiddler语法 :https://docs.telerik.com/fiddler/knowledge-base/fiddlerscript/modifyrequestorresponse
文件替换跨域问题
https://blog.csdn.net/fan13938409755/article/details/126416660
1 2 3 if (oSession.uriContains ("https://mms-static.pddpic.com/main/_next/static/chunks/commons.a62cbe396db21ce5c8ce.js" )) { oSession.oResponse ["Access-Control-Allow-Origin" ] = "https://mms.pinduoduo.com" }
Chrome Local Overrides 新建一个本地文件夹作为 workspace
保存目标资源到 Overrides,然后在本地文件夹中替换同名文件,或者直接修改保存
初始化 launch 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 chrome_path = "C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe" args = [ "--start-maximized" , "--enable-automation" , "--disable-blink-features" , f"--proxy-server={self.proxy_server} " ] auth_dt = { "username" : proxy_dt["account" ], "password" : proxy_dt["password" ], } browser = await launch( headless=False , userDataDir=self.user_dir, args=args, defaultViewport={"width" : 1920 , "height" : 1080 }, executablePath=chrome_path, ignoreDefaultArgs=['--enable-automation' ], devtools=True , ) self.page = await self.browser.newPage() await self.page.authenticate(auth_dt)
launch
不指定 executablePath
的话,会自动下载chronium:
chromium下载太慢:https://blog.csdn.net/qq_39377418/article/details/106984835
1 DEFAULT_DOWNLOAD_HOST = 'https://npm.taobao.org/mirrors'
直接下载对应版本的chromium,然后解压再指定executablePath
connect(连接本地browser服务) 1 2 3 4 5 6 7 opts = { "browserWSEndpoint" : 'ws://192.168.99.100:3000' , } browser = await connect(options=opt)
启动方式 1 2 3 4 import asyncioasyncio.get_event_loop().run_until_complete(main()) asyncio.run(main())
打开一个虚拟桌面(windows无法运行) 1 2 3 from pyvirtualdisplay import Displaydisplay = Display(visible=0 , size=(800 , 800 )) display.start()
创建新标签页 1 2 3 4 5 6 7 8 9 page = await browser.newPage() await page.goto(url)content = await page.content() screenshot = await page.screenshot() await self.page.setCacheEnabled(False )
无痕页面 1 2 browser_context = await self.browser.createIncognitoBrowserContext() self.page = await browser_context.newPage()
隐藏webdriver 1 2 3 4 5 6 7 8 9 10 func_str = """ ()=>{ Object.defineProperties(navigator, { webdriver: { get: ()=>false } }) } """ await page.evaluateOnNewDocument(pageFunction=func_str)
设置cookies 1 2 3 4 for ck in cookies: await page.setCookie(ck) cookies = await page.cookies()
goto参数 1 await self.page.goto("https://item.jd.com/{self.sku}.html" , {"timeout" : 10000 })
timeout ,请求超时
默认30s,可通过 page.setDefaultNavigationTimeout()
方法改变
waitUntil (https://microlink.io/docs/api/parameters/waitUntil)
load
全部事件加载完成(默认,某些情况下可能根本不会成功)
domcontentloaded
DOM树加载完成(推荐使用这个 )
networkidle0
当页面半秒没有网络活动时,它认为导航成功(某些情况下可能根本不会成功)
networkidle2
当页面的网络活动**<=2个**时,认为导航成功
加载时长对比:networkidle0 > networkidle2 > load > domcontentloaded
等待 1 2 3 4 5 6 7 8 9 10 11 12 13 await page.waitFor(3000 )await asyncio.wait([ page.click('a.my-link' ), page.waitForNavigation(), ]) await page.waitForRequest('http://example.com/resource' )await page.waitForResponse('http://example.com/resource' )await page.waitForSelector('a.cxs' )
```python frameHandle = await page.querySelector(“div#captcha_container>ifra”) frame = await frameHandle.contentFrame() await frame.type(“input#fm-login-id”, info[“user”]) await frame.click(“button[type=’submit’]”)1 2 3 4 5 2. ```python frame = page.frames iframe = frame[1] await iframe.hover(slider_css)
stealth.min.js 指纹隐藏 1 2 3 4 5 6 7 8 9 with open ("stealth.min.js" , "r" ) as f: js_code = f.read() await page.evaluateOnNewDocument( f""" () => {{ {js_code} }} """
效果很一般啊,连淘宝滑块都过不了??? 🤷♂️🤷♀️🤦♀️🤦♂️
获取标签内文本 1 2 input_tips = await page.querySelector("div.input-tips>div.input-error" ) tips_info = await page.evaluate('(element) => element.textContent' , input_tips)
模拟鼠标 / 键盘操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 await page.mouse.down("left" )await page.mouse.up("left" )await page.mouse.click(x, y)await page.mouse.move(x, y)await page.keyboard.down("Control" )await page.keyboard.press("A" )await page.keyboard.up("Control" )await page.keyboard.press("Backspace" )await page.focus("input#q" )await page.type ("input#q" , kw, {"delay" : random.randint(100 , 151 )}) await page.click("button[type='submit']" )
选择下拉框
适用于 <option value="my-value">
的元素
1 await page.select('#telCountryInput' , 'my-value' )
阿里滑块实战 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 slider_css = "span#nc_1_n1z" slider = await page.querySelector(slider_css) sd_box = await slider.boundingBox() cdn = { "y" : sd_box["y" ] + sd_box["height" ] / 2 , "x" : sd_box["x" ] + 500 , } await page.hover(slider_css)await page.mouse.down()await page.mouse.move(**cdn, {"delay" : random.randint(1000 , 2000 ), "steps" : 3 }) await page.mouse.up()await page.waitFor(1000 )await page.click("div.form-btn" )await page.goto(apply_url)await page.waitForSelector("input.uploadfile" )
轨迹函数 from: https://blog.csdn.net/qq_43984282/article/details/123279475
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 def slide_list (total_length ): ''' 拿到移动轨迹,模仿人的滑动行为,先匀加速后匀减速 匀变速运动基本公式: ①v=v0+at ②s=v0t+½at² ③v²-v0²=2as :param total_length: 需要移动的距离 :return: 每段移动的距离列表 ''' v = 0 t = 1 slide_result = [] current = 0 mid = total_length * 3 / 5 while current < total_length: if current < mid: a = 0.4 else : a = -0.5 v0 = v s = v0 * t + 0.5 * a * (t ** 2 ) current += s slide_result.append(round (s)) v = v0 + a * t return slide_result In [5 ]: result = slide_list(500 ) In [6 ]: print (result) [0 , 1 , 1 , 1 , 2 , 2 , 3 , 3 , 3 , 4 , 4 , 5 , 5 , 5 , 6 , 6 , 7 , 7 , 7 , 8 , 8 , 9 , 9 , 9 , 10 , 10 , 11 , 11 , 11 , 12 , 12 , 13 , 13 , 13 , 14 , 14 , 15 , 15 , 15 , 15 , 15 , 14 , 14 , 13 , 13 , 12 , 12 , 11 , 11 , 10 , 10 , 9 , 9 , 8 , 8 , 7 , 7 ]
向下滚动 1 await page.evaluate('_ => {window.scrollBy(0, 10000);}' )
清空文本框 1 2 await page.evaluate('document.getElementById("loginname").value=""' )await page.evaluate('document.getElementById("nloginpwd").value=""' )
截获请求 / 相应 参考:https://juejin.cn/post/6844903842484584462
1 2 3 4 await page.setRequestInterception(True )page.on('request' , intercept_request_cb_func) page.on('response' , intercept_response_cb_func)
京东滑块轨迹算法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def generate_tracks_one (position ): position_list = [] for x in range (-5 , 6 ): s1 = position / (1 + math.exp(-x)) s2 = position / (1 + math.exp((-x - 1 ))) a = s2 - s1 if int (a) == 0 : a = 1 else : a = int (a) position_list.append(a) position_list.append(2 ) position_list.append(1 ) position_list.append(1 ) position_list = position_list return position_list
抖音滑块轨迹算法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 def get_track7 (distance ): """ 根据偏移量和手动操作模拟计算移动轨迹 :param distance: 偏移量 :return: 移动轨迹 """ tracks = [] current = 0 mid = distance * 4 / 5 t = 0.2 v = 0 while current < distance: if current < mid: a = random.uniform(2 , 5 ) else : a = -(random.uniform(12.5 , 13.5 )) v0 = v v = v0 + a * t x = v0 * t + 1 / 2 * a * t * t current += x if 0.6 < current - distance < 1 : x = x - 0.53 tracks.append(round (x, 2 )) elif 1 < current - distance < 1.5 : x = x - 1.4 tracks.append(round (x, 2 )) elif 1.5 < current - distance < 3 : x = x - 1.8 tracks.append(round (x, 2 )) else : tracks.append(round (x, 2 )) print (sum (tracks)) return tracks
京东滑块模拟 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 def get_distance (big_img, small_img ): img = cv2.imread(big_img, 0 ) template = cv2.imread(small_img, 0 ) res = cv2.matchTemplate(img, template, cv2.TM_CCOEFF_NORMED) value = cv2.minMaxLoc(res)[2 ][0 ] distance = value * 278 / 360 return distance async def move_slide (self, distance ): el = await self.page.querySelector("div.JDJRV-slide-btn" ) box = await el.boundingBox() await self.page.hover("div.JDJRV-slide-btn" ) await self.page.mouse.down() await self.page.mouse.move( box["x" ] + distance + random.uniform(30 , 33 ), box["y" ], {"steps" : 30 } ) await self.page.waitFor(random.randint(300 , 700 )) await self.page.mouse.move(box["x" ] + distance + 27 , box["y" ], {"steps" : 30 }) await self.page.mouse.up() await self.page.waitFor(2500 ) async def down_img (self ): big_ele = await self.page.querySelector("div.JDJRV-bigimg>img" ) big_img_src = await self.page.evaluate("(element) => element.src" , big_ele) big_img_name = f"./jd_slide_img/{self.img_pre} _big.png" request.urlretrieve(big_img_src, big_img_name) small_ele = await self.page.querySelector("div.JDJRV-smallimg>img" ) small_img_src = await self.page.evaluate("(element) => element.src" , small_ele) small_img_name = f"./jd_slide_img/{self.img_pre} _small.png" request.urlretrieve(small_img_src, small_img_name) return big_img_name, small_img_name
抖音滑块拼图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from PIL import Imageimg = Image.open ("cxs.png" ) width, height = img.size block_width = width // 6 img1 = img.crop((0 , 0 , block_width, height)) img2 = img.crop((block_width, 0 , 2 * block_width, height)) img3 = img.crop((2 * block_width, 0 , 3 * block_width, height)) img4 = img.crop((3 * block_width, 0 , 4 * block_width, height)) img5 = img.crop((4 * block_width, 0 , 5 * block_width, height)) img6 = img.crop((5 * block_width, 0 , width, height)) new_img = Image.new("RGB" , (width, height)) order = [5 , 1 , 4 , 6 , 3 , 2 ] for i in range (6 ): img_index = order[i] paste_x = i * block_width new_img.paste(locals ()["img" + str (img_index)], (paste_x, 0 )) new_img.save("new.jpg" )
特征隐藏 pyppeteer_stealth https://github.com/MeiK2333/pyppeteer_stealth