初始配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| download_delay = 20 custom_settings = { "HTTPERROR_ALLOWED_CODES": [404], "COOKIES_ENABLED": True, "DOWNLOAD_DELAY": 5, "DOWNLOAD_TIMEOUT": 5, "REFERER_ENABLED": False, "REDIRECT_ENABLED": False, "RETRY_HTTP_CODES": [429, 401, 403, 408, 414, 500, 502, 503, 504], "DEFAULT_REQUEST_HEADERS": { ...... "cookie": "...", } }
|
调试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| process = CrawlerProcess(get_project_settings()) process.crawl(TmallSpider) process.start()
spider_loader = spiderloader.SpiderLoader.from_settings(settings) Spider = spider_loader.load('spider_name') process.crawl(Spider)
spider_names = spider_loader.list() for name in spider_names: ...
def main(): process = CrawlerProcess(get_project_settings()) process.crawl(TmallSpider) process.start()
if __name__ == "__main__": import multiprocessing
for _ in range(5): p = multiprocessing.Process(target=main) p.start()
|
传参
1 2 3 4 5 6 7 8 9 10 11 12 13
| class DouyinSpider(BaseSpider): name = "t_spider_douyin_strategy_market_brand"
def __init__(self, crawl_type="day", start_day=None, end_day=None, *args, **kwargs): super(BaseSpider, self).__init__(*args, **kwargs) self.start_day = start_day self.end_day = end_day self.crawl_type = crawl_type
process.crawl( DouyinSpider, crawl_type="month", start_day="2024-05-01", end_day="2024-06-30" )
|
运行
1
| scrapy crawl spider_name
|
1
| scrapy runspider spider.py
|
1
| scrapy crawl t_spider_douyin_strategy_market_brand -a crawl_type="day" -a start_day="2024-07-07" -a end_day="2024-07-09"
|
params
1 2 3 4
| from w3lib.url import add_or_replace_parameters
search_url = add_or_replace_parameters(self.web_api, params) yield scrapy.Request(search_url)
|
cookiejar 用于标识多个会话
1
| yield scrapy.Request(meta={"cookiejar": uuid4()})
|
JsonRequest
1
| yield JsonRequest(self.page_url, data=self.data)
|
Cookie
cookie 设置
- 第一种,
custom_settings
里面设置
- 第二种
1 2 3 4 5 6 7
| ck_jar = {} for d in cookie.split(";"): if d: name, value = d.split("=", maxsplit=1) ck_jar[name] = value yield scrapy.Request(url, cookies=ch_jar)
|
cookie 列表转字典
get(key)
getlist(key)
也可用于获取headers中的其他值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| ck_lst = response.headers.getlist('Set-Cookie')
ck_dct = {} for ck in cookie.split(";"): if ck: name, value = ck.split("=", 1) ck_dct[name] = value
def get_cookie_dict(ck_lst): cookies = {} for ck in ck_lst: a = ck.decode() v = a.split(";")[0] k = v.split("=") cookies[k[0]] = k[1] return cookies
|
在middleware中发起请求
已失效,直接用request
1 2 3 4 5
| req = FormRequest( self.api, dont_filter=True, formdata=data, callback=spider.cb_fun, ) yield spider.crawler.engine.crawl(req, spider=spider)
|
文件上传
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| data = { "token": "4l10x2T4B/4=", "bucket": "common", "filekey": pdf_name, "mime_type": "application/pdf", } files = {"file": (pdf_name, pdf_body, "application/pdf")} body, content_type = RequestEncodingMixin._encode_files(data=data, files=files) req = scrapy.Request( url=oss_api, method="POST", body=body, headers={"Content-Type": content_type}, meta={ "dy_proxy_type": False, }, )
|
中间件
设置IP代理
1 2 3 4 5 6 7 8 9 10 11 12
| class DblProxyMiddleware: """ 多倍云代理 """
def process_request(self, request, spider): request.meta["proxy"] = "http://http-proxy-t1.dobel.cn:9180" proxy_user_pass = "user:pass" encoded_user_pass = "Basic " + base64.urlsafe_b64encode( bytes((proxy_user_pass), "ascii") ).decode("utf8") request.headers["Proxy-Authorization"] = encoded_user_pass
|
fiddler 代理
1 2 3 4 5 6 7
| class FiddlerProxyMiddleware: """ fiddler代理转发插件,方便本地调试 中间件数字越小,越早执行 """ def process_request(self, request, spider): request.meta["proxy"] = "http://127.0.0.1:8888"
|
随机ja3指纹
参考:https://mp.weixin.qq.com/s/Zi26P1bAO85jOlEmSAZRgg
1 2 3 4 5 6 7 8 9 10 11 12 13
| def shuffle_ciphers(): ORIGIN_CIPHERS ='TLS13-AES-256-GCM-SHA384:TLS13-CHACHA20-POLY1305-SHA256:TLS13-AES-128-GCM-SHA256:ECDH+AESGCM:ECDH+CHACHA20:DH+AESGCM:DH+CHACHA20:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:RSA+AESGCM:RSA+AES' ciphers = ORIGIN_CIPHERS.split(":") random.shuffle(ciphers) ciphers = ":".join(ciphers) return ciphers + ":!aNULL:!MD5:!DSS"
class MyHTTPDownloadHandler(HTTPDownloadHandler): def download_request(self, request, spider): tls_cliphers = shuffle_ciphers() self._contextFactory = ScrapyClientContextFactory(tls_cliphers=tls_cliphers) return super().download_request(request, spider)
|
然后在初始化配置中
1 2 3 4
| "DOWNLOAD_HANDLERS": { "http": "utils.handler.MyHTTPDownloadHandler", "https": "utils.handler.MyHTTPDownloadHandler", }
|
Curl指纹请求
参考:https://github.com/jxlil/scrapy-impersonate?tab=readme-ov-file
安装:pip install scrapy-impersonate
用法:
1 2 3 4 5
| DOWNLOAD_HANDLERS = { "http": "scrapy_impersonate.ImpersonateDownloadHandler", "https": "scrapy_impersonate.ImpersonateDownloadHandler", } TWISTED_REACTOR = "twisted.internet.asyncioreactor.AsyncioSelectorReactor"
|
1 2 3 4 5 6 7
| meta={ "impersonate": random.choice(["chrome124", "chrome123", "edge101", "safari15_5"]), "impersonate_args": { "verify": False, "timeout": 10, }, }
|
信号量
signals 所回调的函数不能是生成器
1 2 3 4 5 6 7 8
| @classmethod def from_crawler(cls, crawler, *args, **kwargs): spider = super(QimaiDetailSpider, cls).from_crawler(crawler) crawler.signals.connect(spider.spider_closed, signal=signals.spider_closed) return spider
def spider_closed(self): pass
|
pipelines
同步写入mysql
1 2 3 4 5 6 7 8 9 10 11 12
| class MysqlPipeline(object): def __init__(self): self.conn = MySQLdb.connect('xxx.xxx.0.106', 'root', 'root', 'article_spider', charset="utf8", use_unicode=True) self.cursor = self.conn.cursor()
def process_item(self, item, spider): insert_sql = """ insert into jobbole_article(title, url, create_date, fav_nums) VALUES (%s, %s, %s, %s) """ self.cursor.execute(insert_sql, (item["title"], item["url"], item["create_date"], item["fav_nums"])) self.conn.commit()
|
twisted异步写入mysql
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| from twisted.enterprise import adbapi
class MysqlTwistedPipline(object): def __init__(self, dbpool): self.dbpool = dbpool
@classmethod def from_settings(cls, settings): dbparms = dict( host = settings["MYSQL_HOST"], db = settings["MYSQL_DBNAME"], user = settings["MYSQL_USER"], passwd = settings["MYSQL_PASSWORD"], charset='utf8', cursorclass=MySQLdb.cursors.DictCursor, use_unicode=True, ) dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)
return cls(dbpool)
def process_item(self, item, spider): query = self.dbpool.runInteraction(self.do_insert, item) query.addErrback(self.handle_error, item, spider)
def handle_error(self, failure, item, spider): print (failure)
def do_insert(self, cursor, item): insert_sql, params = item.get_insert_sql() print(insert_sql, params) cursor.execute(insert_sql, params)
|
ImagesPipeline
1 2 3 4 5 6 7 8 9 10 11 12
| from scrapy.pipelines.images import ImagesPipeline
class ArticleImagePipeline(ImagesPipeline): def item_completed(self, results, item, info): if "front_image_url" in item: for ok, value in results: image_file_path = value["path"] item["front_image_path"] = image_file_path
return item
|