官方文档:https://miyakogi.github.io/pyppeteer/index.html?highlight=textcontent

Reference: API Reference — Pyppeteer 0.0.25 documentation

Pyppeteer RPC

核心 API

  • 执行js语句:page.evaluate
  • 设置拦截:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 和chrome浏览器不兼容
await page.setRequestInterception(True)
# 响应
self.page.on('response', lambda res: asyncio.ensure_future(self.intercept_response(res)))
# 请求(必须返回continue,否则请求全被拦截)
self.page.on('request', lambda req: asyncio.ensure_future(self.intercept_request(req)))
# 捕获控制台输出
self.page.on(
"console", lambda msg: asyncio.ensure_future(self.log_console(msg))
)

async def intercept_request(self, request):
if request.method == "POST":
...
# 拦截 css、media,停止加载
if request.resourceType in ["stylesheet", "media", "image"]:
await request.abort()
# 拦截目标文件,替换
if request.url.startswith("xxx"):
js_path = "xxx.js"
content = open(js_path, "rb").read()
await request.respond(response={"body": content})
await request.continue_()

async def intercept_response(self, response):
if "/api/order/searchlist" in response.url:
data = json.loads(await response.text())

async def log_console(self, msg):
text = await msg.text()
print(msg.type, ">>>", msg.text)

fetch ajax 数据

使用 Fetch - Web API 接口参考 | MDN

GET
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
async function getData() {
var params = {
"param1": "value1",
"param2": "value2"
};
var queryString = new URLSearchParams(params).toString();
var _url = "https://www.baidu.com?" + queryString;
return new Promise(function (resolve, reject) {
fetch(_url)
.then(response => response.json())
.then(data => {
resolve(data);
});
})
};
Payload
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
result = await self.page.evaluate(
"""async () => {
async function postData() {
var _payload = {
"pageNum": 1,
"pageSize": 25,
"startTime": 1675440000,
"endTime": 1676044799,
"keywords": ""
}
var _url = "https://mms.pinduoduo.com/latitude/search/message/getMessagesUsers";
return new Promise(function (resolve, reject) {
fetch(_url, {
method: 'POST',
mode: 'cors',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(_payload)
})
.then(response => response.json())
.then(data => {
resolve(data);
});
})
};
var res=await postData()
return res
}"""
)

# 直接返回文本的话,then(response => response.text())
FormData

构造一个包含表单数据的对象,比如:

1
2
3
4
const formData = {
name: 'John',
email: 'john@example.com'
}

使用 URLSearchParams 类将对象编码为 urlencode 格式:

1
const params = new URLSearchParams(formData).toString()

在 fetch 请求中,设置请求头的 Content-Type 为 application/x-www-form-urlencoded

1
2
3
4
5
6
7
fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
},
body: params
})

XMLHttpRequest 主动请求,添加代理

1

Connect 连接浏览器实例

参考:https://medium.com/@jaredpotter1/connecting-puppeteer-to-existing-chrome-window-8a10828149e0

设置 --remote-debugging-port,查看地址:

1
http://127.0.0.1:9222//json/version

image.png

1
2
3
4
5
6
7
8
9
self.browser = await pyppeteer.connect(
{
"browserWSEndpoint": "ws://127.0.0.1:9222/devtools/browser/63ba1a24-c615-4448-9621-0a8c94f02318"
}
)
pages = await self.browser.pages()
self.page = pages[0]
# 清空控制台日志
await self.page.evaluate('console.clear()')

subprocess 启动无痕chrome

1
subprocess.Popen("chrome --incognito --remote-debugging-port=9222 http://127.0.0.1:9222/json/version")

获取跨域iframe

在初始化参数中,添加下面两项,记得使用默认的chromium

1
2
3
4
args: [
'--disable-web-security',
'--disable-features=IsolateOrigins,site-per-process'
]

Chrome文件替换的两种方式

Fiddler Auto Response

fiddler语法https://docs.telerik.com/fiddler/knowledge-base/fiddlerscript/modifyrequestorresponse

image.png

文件替换跨域问题

https://blog.csdn.net/fan13938409755/article/details/126416660

1
2
3
if (oSession.uriContains("https://mms-static.pddpic.com/main/_next/static/chunks/commons.a62cbe396db21ce5c8ce.js")) {
oSession.oResponse["Access-Control-Allow-Origin"] = "https://mms.pinduoduo.com"
}

Chrome Local Overrides

新建一个本地文件夹作为 workspace

image.png

保存目标资源到 Overrides,然后在本地文件夹中替换同名文件,或者直接修改保存

image.png

初始化

launch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
chrome_path = "C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe"
args = [
"--start-maximized",
"--enable-automation",
"--disable-blink-features",
f"--proxy-server={self.proxy_server}"
]
auth_dt = {
"username": proxy_dt["account"],
"password": proxy_dt["password"],
}

browser = await launch(
headless=False,
userDataDir=self.user_dir,
args=args,
defaultViewport={"width": 1920, "height": 1080},
executablePath=chrome_path,
ignoreDefaultArgs=['--enable-automation'], # 隐藏正受...控制
devtools=True, # 开启调试面板
)
self.page = await self.browser.newPage()
# 使用代理
await self.page.authenticate(auth_dt)

launch 不指定 executablePath 的话,会自动下载chronium:

chromium下载太慢:https://blog.csdn.net/qq_39377418/article/details/106984835

1
DEFAULT_DOWNLOAD_HOST = 'https://npm.taobao.org/mirrors'

直接下载对应版本的chromium,然后解压再指定executablePath

connect(连接本地browser服务)
1
2
3
4
5
6
7
opts = {
"browserWSEndpoint": 'ws://192.168.99.100:3000',
# 或者
# "browserURL": "http://127.0.0.1:9222",
}

browser = await connect(options=opt)

启动方式

1
2
3
4
import asyncio
asyncio.get_event_loop().run_until_complete(main())
# 或者
asyncio.run(main())

打开一个虚拟桌面(windows无法运行)

1
2
3
from pyvirtualdisplay import Display
display = Display(visible=0, size=(800, 800))
display.start()

创建新标签页

1
2
3
4
5
6
7
8
9
page = await browser.newPage()
await page.goto(url)

# 网页源码
content = await page.content()
# 截屏
screenshot = await page.screenshot()
# 关闭自动缓存
await self.page.setCacheEnabled(False)

无痕页面

1
2
browser_context = await self.browser.createIncognitoBrowserContext()
self.page = await browser_context.newPage()

隐藏webdriver

1
2
3
4
5
6
7
8
9
10
func_str = """
()=>{
Object.defineProperties(navigator, {
webdriver: {
get: ()=>false
}
})
}
"""
await page.evaluateOnNewDocument(pageFunction=func_str)

设置cookies

1
2
3
4
for ck in cookies:
await page.setCookie(ck)

cookies = await page.cookies()

goto参数

1
await self.page.goto("https://item.jd.com/{self.sku}.html", {"timeout": 10000})

timeout,请求超时

默认30s,可通过 page.setDefaultNavigationTimeout() 方法改变

waitUntilhttps://microlink.io/docs/api/parameters/waitUntil)

  • load 全部事件加载完成(默认,某些情况下可能根本不会成功)
  • domcontentloaded DOM树加载完成(推荐使用这个
  • networkidle0 当页面半秒没有网络活动时,它认为导航成功(某些情况下可能根本不会成功)
  • networkidle2 当页面的网络活动**<=2个**时,认为导航成功

加载时长对比:networkidle0 > networkidle2 > load > domcontentloaded

等待

1
2
3
4
5
6
7
8
9
10
11
12
13
# 毫秒
await page.waitFor(3000)

# 当链接跳转时
await asyncio.wait([
page.click('a.my-link'),
# page.goto(other_url),
page.waitForNavigation(),
])

await page.waitForRequest('http://example.com/resource')
await page.waitForResponse('http://example.com/resource')
await page.waitForSelector('a.cxs')

定位到 iframe(https://chercher.tech/puppeteer/iframes-puppeteer)

  1. ```python
    frameHandle = await page.querySelector(“div#captcha_container>ifra”)
    frame = await frameHandle.contentFrame()
    await frame.type(“input#fm-login-id”, info[“user”])
    await frame.click(“button[type=’submit’]”)
    1
    2
    3
    4
    5

    2. ```python
    frame = page.frames
    iframe = frame[1]
    await iframe.hover(slider_css)

stealth.min.js 指纹隐藏

1
2
3
4
5
6
7
8
9
with open("stealth.min.js", "r") as f:
js_code = f.read()

await page.evaluateOnNewDocument(
f"""
() => {{
{js_code}
}}
"""

效果很一般啊,连淘宝滑块都过不了??? 🤷‍♂️🤷‍♀️🤦‍♀️🤦‍♂️

获取标签内文本

1
2
input_tips = await page.querySelector("div.input-tips>div.input-error")
tips_info = await page.evaluate('(element) => element.textContent', input_tips)

模拟鼠标 / 键盘操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 鼠标
await page.mouse.down("left")
await page.mouse.up("left")
await page.mouse.click(x, y)
await page.mouse.move(x, y)

# 键盘
await page.keyboard.down("Control")
await page.keyboard.press("A")
await page.keyboard.up("Control")
await page.keyboard.press("Backspace")

await page.focus("input#q")
await page.type("input#q", kw, {"delay": random.randint(100, 151)}) # 随机yan
await page.click("button[type='submit']")

选择下拉框

适用于 <option value="my-value"> 的元素

1
await page.select('#telCountryInput', 'my-value')

阿里滑块实战

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
slider_css = "span#nc_1_n1z"
slider = await page.querySelector(slider_css)
sd_box = await slider.boundingBox()
cdn = {
"y": sd_box["y"] + sd_box["height"] / 2,
"x": sd_box["x"] + 500,
}

await page.hover(slider_css)
await page.mouse.down()
await page.mouse.move(**cdn, {"delay": random.randint(1000, 2000), "steps": 3}) # steps越大,滑动越慢
await page.mouse.up()
await page.waitFor(1000)

await page.click("div.form-btn")
await page.goto(apply_url)
await page.waitForSelector("input.uploadfile")

轨迹函数

from: https://blog.csdn.net/qq_43984282/article/details/123279475

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def slide_list(total_length):
'''
拿到移动轨迹,模仿人的滑动行为,先匀加速后匀减速
匀变速运动基本公式:
①v=v0+at
②s=v0t+½at²
③v²-v0²=2as
:param total_length: 需要移动的距离
:return: 每段移动的距离列表
'''
v = 0 # 初速度
t = 1 # 单位时间为0.3s来统计轨迹,轨迹即0.3内的位移
slide_result = [] # 位移/轨迹列表,列表内的一个元素代表一个T时间单位的位移,t越大,每次移动的距离越大
current = 0 # 当前的位移
mid = total_length * 3 / 5 # 到达mid值开始减速
while current < total_length:
if current < mid:
a = 0.4 # 加速度越小,单位时间的位移越小,模拟的轨迹就越多越详细
else:
a = -0.5
v0 = v # 初速度
s = v0 * t + 0.5 * a * (t ** 2) # 0.2秒时间内的位移
current += s # 当前的位置
slide_result.append(round(s)) # 添加到轨迹列表
v = v0 + a * t # 速度已经达到v,该速度作为下次的初速度
return slide_result

In [5]: result = slide_list(500)
In [6]: print(result)
[0, 1, 1, 1, 2, 2, 3, 3, 3, 4, 4, 5, 5, 5, 6, 6, 7, 7, 7, 8, 8, 9, 9, 9, 10, 10, 11, 11, 11, 12, 12, 13, 13, 13, 14, 14, 15, 15, 15, 15, 15, 14, 14, 13, 13, 12, 12, 11, 11, 10, 10, 9, 9, 8, 8, 7, 7]

向下滚动

1
await page.evaluate('_ => {window.scrollBy(0, 10000);}')

清空文本框

1
2
await page.evaluate('document.getElementById("loginname").value=""')
await page.evaluate('document.getElementById("nloginpwd").value=""')

截获请求 / 相应

参考:https://juejin.cn/post/6844903842484584462

1
2
3
4
await page.setRequestInterception(True)

page.on('request', intercept_request_cb_func)
page.on('response', intercept_response_cb_func)

京东滑块轨迹算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def generate_tracks_one(position):
position_list = []
for x in range(-5, 6):
s1 = position / (1 + math.exp(-x))
s2 = position / (1 + math.exp((-x - 1)))
a = s2 - s1
if int(a) == 0:
a = 1
else:
a = int(a)
position_list.append(a)
position_list.append(2)
position_list.append(1)
position_list.append(1)
position_list = position_list
return position_list

抖音滑块轨迹算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def get_track7(distance):
"""
根据偏移量和手动操作模拟计算移动轨迹
:param distance: 偏移量
:return: 移动轨迹
"""
# 移动轨迹
tracks = []
# 当前位移
current = 0
# 减速阈值
mid = distance * 4 / 5
# 时间间隔
t = 0.2
# 初始速度
v = 0

while current < distance:
if current < mid:
a = random.uniform(2, 5)
else:
a = -(random.uniform(12.5, 13.5))
v0 = v
v = v0 + a * t
x = v0 * t + 1 / 2 * a * t * t
current += x

if 0.6 < current - distance < 1:
x = x - 0.53
tracks.append(round(x, 2))

elif 1 < current - distance < 1.5:
x = x - 1.4
tracks.append(round(x, 2))
elif 1.5 < current - distance < 3:
x = x - 1.8
tracks.append(round(x, 2))

else:
tracks.append(round(x, 2))

print(sum(tracks))
return tracks

京东滑块模拟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def get_distance(big_img, small_img):
img = cv2.imread(big_img, 0) # 灰度读取
template = cv2.imread(small_img, 0) # 灰度读成
res = cv2.matchTemplate(img, template, cv2.TM_CCOEFF_NORMED)
value = cv2.minMaxLoc(res)[2][0]
distance = value * 278 / 360
return distance

async def move_slide(self, distance):
el = await self.page.querySelector("div.JDJRV-slide-btn")
box = await el.boundingBox()
await self.page.hover("div.JDJRV-slide-btn")
await self.page.mouse.down()
await self.page.mouse.move(
box["x"] + distance + random.uniform(30, 33), box["y"], {"steps": 30}
)
await self.page.waitFor(random.randint(300, 700))
await self.page.mouse.move(box["x"] + distance + 27, box["y"], {"steps": 30})
await self.page.mouse.up()
await self.page.waitFor(2500)

async def down_img(self):
big_ele = await self.page.querySelector("div.JDJRV-bigimg>img")
big_img_src = await self.page.evaluate("(element) => element.src", big_ele)
big_img_name = f"./jd_slide_img/{self.img_pre}_big.png"
request.urlretrieve(big_img_src, big_img_name)
small_ele = await self.page.querySelector("div.JDJRV-smallimg>img")
small_img_src = await self.page.evaluate("(element) => element.src", small_ele)
small_img_name = f"./jd_slide_img/{self.img_pre}_small.png"
request.urlretrieve(small_img_src, small_img_name)
return big_img_name, small_img_name

抖音滑块拼图

image.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from PIL import Image

img = Image.open("cxs.png")

width, height = img.size

# 横向平均切分成6张
block_width = width // 6
img1 = img.crop((0, 0, block_width, height))
img2 = img.crop((block_width, 0, 2 * block_width, height))
img3 = img.crop((2 * block_width, 0, 3 * block_width, height))
img4 = img.crop((3 * block_width, 0, 4 * block_width, height))
img5 = img.crop((4 * block_width, 0, 5 * block_width, height))
img6 = img.crop((5 * block_width, 0, width, height))

# 创建新图
new_img = Image.new("RGB", (width, height))

# 指定的拼装顺序
order = [5, 1, 4, 6, 3, 2]

for i in range(6):
img_index = order[i]
paste_x = i * block_width
new_img.paste(locals()["img" + str(img_index)], (paste_x, 0))

new_img.save("new.jpg")

特征隐藏 pyppeteer_stealth

https://github.com/MeiK2333/pyppeteer_stealth