如何使用RabbitMQ控制Scrapy爬虫

如何使用RabbitMQ控制Scrapy爬虫

本文将介绍如何使用RabbitMQ整合Scrapy来控制Scrapy爬虫进行目标网页内容爬取。我们假设你已经使用过Scrapy和RabbitMQ或者对其有一定了解。

为什么要使用RabbitMQ来控制爬虫?

Scrapy爬虫其实有自己的一套生产环境部署控制系统Scrapyd,这是一个开源的项目,他给Scrapy提供了服务器端的HTTP API,使其拥有运行与监控Scrapy爬虫的能力,使用Scrapyd需要将我们的爬虫部署至Scrapyd服务器。

相比Scrapyd,RabbitMQ则多了一个队列的特性,同时也能监控与控制爬虫的,并且不需要将爬虫部署到特定的服务器,随时运行,同时与队列与我们整个项目的整合也更加平滑自如。目前淘秀网爬虫端就是使用整合RabbitMQ这种解决方案来自动化控制与监控Scrapy爬虫。

如何整合RabbitMQ与Scrapy爬虫?

RabbitMQ在Python端的客户端实现是叫做Pika,这个RabbitMQ客户端提供了与别的RabbitMQ客户端大致相同的功能,如连接服务端,服务端连接管理,交换器管理,队列管理等等。我们将在我们的Scrapy中使用Pika来对RabbitMQ进行整合。

首先我们在scrapy.cfg同级目录下创建python运行文件begin.py,这个文件用于写我们的运行爬虫已经连接RabbitMQ相应的代码。

连接RabbitMQ服务端部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
while(True):
try:
# 创建连接认证条件
pika_credentials = pika.credentials.PlainCredentials(rabbit_user, rabbit_pass)
# 创建一个Blocking的连接
connection = pika.BlockingConnection(pika.ConnectionParameters(
rabbit_host, port=rabbit_port, credentials=pika_credentials, connection_attempts=10, socket_timeout=20,
heartbeat=360))
channel = connection.channel()
# 声明交换器,这里应该和你消息生产者端保持一致设置
channel.exchange_declare(exchange=rabbit_exchange, exchange_type='topic', durable=True, auto_delete=False)
# 声明队列,这里应该和你消息生产者端保持一致设置
channel.queue_declare(queue=rabbit_queue, durable=True, exclusive=False, auto_delete=False)
# 绑定操作
channel.queue_bind(exchange=rabbit_exchange,
queue=rabbit_queue,
routing_key=rabbit_queue)
# 类似权重,按能力分发,如果有一个消息,就不在给你发,控制单个蜘蛛消费数量
channel.basic_qos(prefetch_count=1)
channel.basic_consume( # 消费消息
callback, # 如果收到消息,就调用Callback
# 这里的Callback函数我们下文将会提到,其实就是运行我们的Scrapy蜘蛛语句
queue=rabbit_queue, # 队列
# no_ack=True # 一般不写,处理完接收处理结果。宕机则发给其他消费者
)

logger.info(' [*] Waiting for messages. To exit press CTRL+C')
# 开启RabbitMQ接收模式,这代码会阻塞运行,直到爬虫完成任务,才会继续
channel.start_consuming()
# 对一些连接错误进行处理,将继续执行循环,从而完成重连
except pika.exceptions.ConnectionClosed:
# Uncomment this to make the example not attempt recovery
# from server-initiated connection closure, including
# when the node is stopped cleanly
#
# break
continue
# Do not recover on channel errors
except pika.exceptions.AMQPChannelError as err:
print("Caught a channel error: {}, stopping...".format(err))
break
# Recover on all other connection errors
except pika.exceptions.AMQPConnectionError:
print("Connection was closed, retrying...")
continue

首先我们需要创建一个连接至RabbitMQ服务端的连接

1
2
3
4
 # 创建一个Blocking的连接
connection = pika.BlockingConnection(pika.ConnectionParameters(
rabbit_host, port=rabbit_port, credentials=pika_credentials, connection_attempts=10, socket_timeout=20,
heartbeat=360))

这句话支持一些RabbitMQ队列连接相关的参数设置,如验证信息,重试次数,超时时间,心跳间隔等等,具体参数列表可以查看官方文档关于参数的介绍

然后就是相关的队列设置,交换器设置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
channel = connection.channel()
# 声明交换器,这里应该和你消息生产者端保持一致设置
channel.exchange_declare(exchange=rabbit_exchange, exchange_type='topic', durable=True, auto_delete=False)
# 声明队列,这里应该和你消息生产者端保持一致设置
channel.queue_declare(queue=rabbit_queue, durable=True, exclusive=False, auto_delete=False)
# 绑定操作
channel.queue_bind(exchange=rabbit_exchange,
queue=rabbit_queue,
routing_key=rabbit_queue)
# 类似权重,按能力分发,如果有一个消息,就不在给你发,控制单个蜘蛛消费数量
channel.basic_qos(prefetch_count=1)
channel.basic_consume( # 消费消息
callback, # 如果收到消息,就调用Callback
# 这里的Callback函数我们下文将会提到,其实就是运行我们的Scrapy蜘蛛语句
queue=rabbit_queue, # 队列
# no_ack=True # 一般不写,处理完接收处理结果。宕机则发给其他消费者
)

使用下面这句话开启我们的RabbitMQ Python客户端,之后便可以连接上RabbitMQ服务端

1
2
# 开启RabbitMQ接收模式,这代码会阻塞运行,直到爬虫完成任务,才会继续
channel.start_consuming()

最后我们使用while语句,处理可能出现的连接异常,使我们的代码可以在出现连接异常的情况下,自动重新连接运行。

如何运行我们的Scrapy爬虫?

上文我们提到了当有消息分配给我们这个客户端的时候,代码会触发callback函数,很明显我们需要在callback函数中运行我们的Scrapy蜘蛛。接下来我们看看如何写这个callback函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#!/usr/bin/python
# !/usr/bin/env python
# -*- coding: utf-8 -*-
import logging

import pika
from crochet import setup
from scrapy.crawler import CrawlerRunner
from scrapy.utils.log import configure_logging
from scrapy.utils.project import get_project_settings
# 导入我们自己写的蜘蛛
from mars.spiders.myspider import Spider

setup()

settings = get_project_settings()

def callback(used_channel, basic_deliver, properties, body):
# 获取到消息队列中的消息
decodebody = bytes.decode(body)
logger.info(" [x] Received %r" % decodebody)
try:
run_spider(key_word_arg=decodebody, used_channel=used_channel, delivery_tag=basic_deliver.delivery_tag)
except Exception as error:
logger.error(error)
# 告诉生产者,消息未处理完成
channel.basic_reject(delivery_tag=basic_deliver.delivery_tag)


def run_spider(key_word_arg, used_channel, delivery_tag):
# 使用CrawlerRunner运行蜘蛛
crawler = CrawlerRunner(settings)
# 运行我们自己写的蜘蛛
crawler.crawl(Spider, key_word_arg=key_word_arg, channel=used_channel, delivery_tag=delivery_tag)

官方文档中,运行Scrapy蜘蛛有两种方式,一种叫做CrawlerProcess,另外一种叫做CrawlerRunner,我们在这里无法使用CrawlerProcess运行蜘蛛,因为当蜘蛛完成一次爬行后,接收到第二个消息,准备再次爬行时,使用CrawlerProcess会报twisted.internet.error.ReactorNotRestartable错误,导致蜘蛛无法再运行。而使用CrawlerRunner将完美解决这个问题。

这个解决方案中关键的一步是使用from crochet import setup导入setup(),并将setup()置顶放置。解决方法详细内容参见这里

最后运行蜘蛛,这里可以通过传参,传入一些自定义参数,像下面我代码中channel=used_channel,传入channel的目的是为了让我们有能力在蜘蛛中向RabbitMQ客户端实现消息确认。这些都不是强制的,关键看你自己的蜘蛛如何实现。

1
2
# 运行我们自己写的蜘蛛
crawler.crawl(Spider, key_word_arg=key_word_arg, channel=used_channel, delivery_tag=delivery_tag)

如何运行整个文件

我们只需要像执行普通python文件一样执行这个begin.py文件即可,程序会自动连接至RabbitMQ服务端,自动获取消息,自动执行,当消费完一个消息后,自动获取下个消息进行消费。

完整版代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#!/usr/bin/python
# !/usr/bin/env python
# -*- coding: utf-8 -*-
import logging

import pika
from crochet import setup
from scrapy.crawler import CrawlerRunner
from scrapy.utils.log import configure_logging
from scrapy.utils.project import get_project_settings
# 导入我们自己写的蜘蛛
from mars.spiders.myspider import Spider

setup()
# 获取setting.py文件
settings = get_project_settings()

logger = logging.getLogger('begin.py')

configure_logging(settings)
logger.info(' [*] Starting begin.py...')
rabbit_host = settings.get("RABBITMQ_HOST")
rabbit_port = settings.get("RABBITMQ_PORT")
rabbit_user = settings.get("RABBITMQ_USERNAME")
rabbit_pass = settings.get("RABBITMQ_PASSWORD")
rabbit_exchange = settings.get("MY_EXCHANGE")
rabbit_queue = settings.get("MY_SCRAPY_QUEUE")


def callback(used_channel, basic_deliver, properties, body):
# 获取到消息队列中的消息
decodebody = bytes.decode(body)
logger.info(" [x] Received %r" % decodebody)
try:
run_spider(key_word_arg=decodebody, used_channel=used_channel, delivery_tag=basic_deliver.delivery_tag)
except Exception as error:
logger.error(error)
# 告诉生产者,消息未处理完成
channel.basic_reject(delivery_tag=basic_deliver.delivery_tag)


def run_spider(key_word_arg, used_channel, delivery_tag):
# 使用CrawlerRunner运行蜘蛛
crawler = CrawlerRunner(settings)
# 运行我们自己写的蜘蛛
crawler.crawl(Spider, key_word_arg=key_word_arg, channel=used_channel, delivery_tag=delivery_tag)

while(True):
try:
# 创建连接认证条件
pika_credentials = pika.credentials.PlainCredentials(rabbit_user, rabbit_pass)
# 创建一个Blocking的连接
connection = pika.BlockingConnection(pika.ConnectionParameters(
rabbit_host, port=rabbit_port, credentials=pika_credentials, connection_attempts=10, socket_timeout=20,
heartbeat=360))
channel = connection.channel()
# 声明交换器,这里应该和你消息生产者端保持一致设置
channel.exchange_declare(exchange=rabbit_exchange, exchange_type='topic', durable=True, auto_delete=False)
# 声明队列,这里应该和你消息生产者端保持一致设置
channel.queue_declare(queue=rabbit_queue, durable=True, exclusive=False, auto_delete=False)
# 绑定操作
channel.queue_bind(exchange=rabbit_exchange,
queue=rabbit_queue,
routing_key=rabbit_queue)
# 类似权重,按能力分发,如果有一个消息,就不在给你发,控制单个蜘蛛消费数量
channel.basic_qos(prefetch_count=1)
channel.basic_consume( # 消费消息
callback, # 如果收到消息,就调用Callback
# 这里的Callback函数我们下文将会提到,其实就是运行我们的Scrapy蜘蛛语句
queue=rabbit_queue, # 队列
# no_ack=True # 一般不写,处理完接收处理结果。宕机则发给其他消费者
)

logger.info(' [*] Waiting for messages. To exit press CTRL+C')
# 开启RabbitMQ接收模式,这代码会阻塞运行,直到爬虫完成任务,才会继续
channel.start_consuming()
# 对一些连接错误进行处理,将继续执行循环,从而完成重连
except pika.exceptions.ConnectionClosed:
# Uncomment this to make the example not attempt recovery
# from server-initiated connection closure, including
# when the node is stopped cleanly
#
# break
continue
# Do not recover on channel errors
except pika.exceptions.AMQPChannelError as err:
print("Caught a channel error: {}, stopping...".format(err))
break
# Recover on all other connection errors
except pika.exceptions.AMQPConnectionError:
print("Connection was closed, retrying...")
continue