初始配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
download_delay = 20  # 下载延迟
custom_settings = {
"HTTPERROR_ALLOWED_CODES": [404], # 允许404
"COOKIES_ENABLED": True,
"DOWNLOAD_DELAY": 5,
"DOWNLOAD_TIMEOUT": 5,
"REFERER_ENABLED": False, # 关闭自动refer
"REDIRECT_ENABLED": False, # 禁跳转
"RETRY_HTTP_CODES": [429, 401, 403, 408, 414, 500, 502, 503, 504], # 重试http码
"DEFAULT_REQUEST_HEADERS": {
......
"cookie": "...", # 如果要生效,COOKIES_ENABLED需设置为false
}
}

调试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
process = CrawlerProcess(get_project_settings())
process.crawl(TmallSpider)
process.start()


# 根据爬虫名调试
spider_loader = spiderloader.SpiderLoader.from_settings(settings)
Spider = spider_loader.load('spider_name')
process.crawl(Spider)
# 获取全部现有爬虫
spider_names = spider_loader.list()
for name in spider_names:
...

# 多进程调试
def main():
process = CrawlerProcess(get_project_settings())
process.crawl(TmallSpider)
process.start()


if __name__ == "__main__":
import multiprocessing

for _ in range(5):
p = multiprocessing.Process(target=main)
p.start()

传参

1
2
3
4
5
6
7
8
9
10
11
12
13
class DouyinSpider(BaseSpider):
name = "t_spider_douyin_strategy_market_brand"

def __init__(self, crawl_type="day", start_day=None, end_day=None, *args, **kwargs):
super(BaseSpider, self).__init__(*args, **kwargs)
self.start_day = start_day
self.end_day = end_day
self.crawl_type = crawl_type

# 调用方法
process.crawl(
DouyinSpider, crawl_type="month", start_day="2024-05-01", end_day="2024-06-30"
)

运行

1
scrapy crawl spider_name
1
scrapy runspider spider.py
1
scrapy crawl t_spider_douyin_strategy_market_brand -a crawl_type="day" -a start_day="2024-07-07" -a end_day="2024-07-09"

params

1
2
3
4
from w3lib.url import add_or_replace_parameters

search_url = add_or_replace_parameters(self.web_api, params)
yield scrapy.Request(search_url)

cookiejar 用于标识多个会话

1
yield scrapy.Request(meta={"cookiejar": uuid4()})

JsonRequest

1
yield JsonRequest(self.page_url, data=self.data)
  • 第一种,custom_settings 里面设置
  • 第二种
1
2
3
4
5
6
7
ck_jar = {}
for d in cookie.split(";"):
if d:
name, value = d.split("=", maxsplit=1)
ck_jar[name] = value

yield scrapy.Request(url, cookies=ch_jar)

get(key) getlist(key) 也可用于获取headers中的其他值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ck_lst = response.headers.getlist('Set-Cookie')

ck_dct = {}
for ck in cookie.split(";"):
if ck:
name, value = ck.split("=", 1)
ck_dct[name] = value


def get_cookie_dict(ck_lst):
cookies = {}
for ck in ck_lst:
a = ck.decode()
v = a.split(";")[0]
k = v.split("=")
cookies[k[0]] = k[1]
return cookies

在middleware中发起请求

已失效,直接用request

1
2
3
4
5
# 已失效
req = FormRequest(
self.api, dont_filter=True, formdata=data, callback=spider.cb_fun,
)
yield spider.crawler.engine.crawl(req, spider=spider)

文件上传

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
data = {
"token": "4l10x2T4B/4=",
"bucket": "common",
"filekey": pdf_name,
"mime_type": "application/pdf",
}
files = {"file": (pdf_name, pdf_body, "application/pdf")}
body, content_type = RequestEncodingMixin._encode_files(data=data, files=files)
req = scrapy.Request(
url=oss_api,
method="POST",
body=body,
headers={"Content-Type": content_type},
meta={
"dy_proxy_type": False,
},
)

中间件

设置IP代理

1
2
3
4
5
6
7
8
9
10
11
12
class DblProxyMiddleware:
"""
多倍云代理
"""

def process_request(self, request, spider):
request.meta["proxy"] = "http://http-proxy-t1.dobel.cn:9180"
proxy_user_pass = "user:pass"
encoded_user_pass = "Basic " + base64.urlsafe_b64encode(
bytes((proxy_user_pass), "ascii")
).decode("utf8")
request.headers["Proxy-Authorization"] = encoded_user_pass

fiddler 代理

1
2
3
4
5
6
7
class FiddlerProxyMiddleware:
"""
fiddler代理转发插件,方便本地调试
中间件数字越小,越早执行
"""
def process_request(self, request, spider):
request.meta["proxy"] = "http://127.0.0.1:8888"

随机ja3指纹

参考:https://mp.weixin.qq.com/s/Zi26P1bAO85jOlEmSAZRgg

1
2
3
4
5
6
7
8
9
10
11
12
13
def shuffle_ciphers():
ORIGIN_CIPHERS ='TLS13-AES-256-GCM-SHA384:TLS13-CHACHA20-POLY1305-SHA256:TLS13-AES-128-GCM-SHA256:ECDH+AESGCM:ECDH+CHACHA20:DH+AESGCM:DH+CHACHA20:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:RSA+AESGCM:RSA+AES'
ciphers = ORIGIN_CIPHERS.split(":")
random.shuffle(ciphers)
ciphers = ":".join(ciphers)
return ciphers + ":!aNULL:!MD5:!DSS"


class MyHTTPDownloadHandler(HTTPDownloadHandler):
def download_request(self, request, spider):
tls_cliphers = shuffle_ciphers()
self._contextFactory = ScrapyClientContextFactory(tls_cliphers=tls_cliphers)
return super().download_request(request, spider)

然后在初始化配置中

1
2
3
4
"DOWNLOAD_HANDLERS": {
"http": "utils.handler.MyHTTPDownloadHandler",
"https": "utils.handler.MyHTTPDownloadHandler",
}

Curl指纹请求

参考:https://github.com/jxlil/scrapy-impersonate?tab=readme-ov-file

安装:pip install scrapy-impersonate

用法:

1
2
3
4
5
DOWNLOAD_HANDLERS = {
"http": "scrapy_impersonate.ImpersonateDownloadHandler",
"https": "scrapy_impersonate.ImpersonateDownloadHandler",
}
TWISTED_REACTOR = "twisted.internet.asyncioreactor.AsyncioSelectorReactor"
1
2
3
4
5
6
7
meta={
"impersonate": random.choice(["chrome124", "chrome123", "edge101", "safari15_5"]),
"impersonate_args": {
"verify": False,
"timeout": 10,
},
}

信号量

signals 所回调的函数不能是生成器

1
2
3
4
5
6
7
8
@classmethod
def from_crawler(cls, crawler, *args, **kwargs):
spider = super(QimaiDetailSpider, cls).from_crawler(crawler)
crawler.signals.connect(spider.spider_closed, signal=signals.spider_closed)
return spider

def spider_closed(self):
pass

pipelines

同步写入mysql

1
2
3
4
5
6
7
8
9
10
11
12
class MysqlPipeline(object):
def __init__(self):
self.conn = MySQLdb.connect('xxx.xxx.0.106', 'root', 'root', 'article_spider', charset="utf8", use_unicode=True)
self.cursor = self.conn.cursor()

def process_item(self, item, spider):
insert_sql = """
insert into jobbole_article(title, url, create_date, fav_nums)
VALUES (%s, %s, %s, %s)
"""
self.cursor.execute(insert_sql, (item["title"], item["url"], item["create_date"], item["fav_nums"]))
self.conn.commit()

twisted异步写入mysql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from twisted.enterprise import adbapi


class MysqlTwistedPipline(object):
def __init__(self, dbpool):
self.dbpool = dbpool

@classmethod
def from_settings(cls, settings):
dbparms = dict(
host = settings["MYSQL_HOST"],
db = settings["MYSQL_DBNAME"],
user = settings["MYSQL_USER"],
passwd = settings["MYSQL_PASSWORD"],
charset='utf8',
cursorclass=MySQLdb.cursors.DictCursor,
use_unicode=True,
)
dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)

return cls(dbpool)

def process_item(self, item, spider):
query = self.dbpool.runInteraction(self.do_insert, item)
query.addErrback(self.handle_error, item, spider)

def handle_error(self, failure, item, spider):
# 处理异常
print (failure)

def do_insert(self, cursor, item):
# 根据不同的item 构建不同的sql语句
insert_sql, params = item.get_insert_sql()
print(insert_sql, params)
cursor.execute(insert_sql, params)

ImagesPipeline

1
2
3
4
5
6
7
8
9
10
11
12
from scrapy.pipelines.images import ImagesPipeline


class ArticleImagePipeline(ImagesPipeline):

def item_completed(self, results, item, info):
if "front_image_url" in item:
for ok, value in results:
image_file_path = value["path"]
item["front_image_path"] = image_file_path

return item