In previous articles we used requests and BeautifulSoup to scrape the data. Scraping data this way is slow (Using selenium is even slower).
Sometimes we need data quickly. But if we try to speed up the process of scraping data using multi-threading or any other technique, we will start getting http status 429 i.e. too may requests. We might get banned from the site as well.
Purpose of this article is to scrape lots of data quickly without getting banned and we will do this by using docker cluster of celery and RabbitMQ along with Tor.
For this to achieve we will follow below steps:
Let's start.
Dockerfile used to build the worker image is using python:3 docker image.
Directory structure of code:
. --- celery_main | --- celery.py | --- __init__.py | --- task_receiver.py | --- task_submitter.py --- docker-compose.yml --- dockerfile --- README.md --- requirements.txt 1 directory, 8 files
Run the below command to start the docker cluster:
sudo docker-compose up
This will run one container for each worker and RabbitMQ. Once you see something like
worker_1 | [2018-03-01 10:46:30,013: INFO/MainProcess] celery@5af881b83b97 ready.
Now you can submit the tasks. But before going any further lets try to understand the code while it is simple and small.
from celery import Celery app = Celery( 'celery_main', broker='amqp://myuser:mypassword@rabbit:5672', backend='rpc://', include=['celery_main.task_receiver'] )
The first argument to Celery is the name of the current module. This is only needed so that names can be automatically generated when the tasks are defined in the __main__
module.
The second argument is the broker keyword argument, specifying the URL of the message broker you want to use. Here using RabbitMQ (also the default option).
The third argument is backend. A backend in Celery is used for storing the task results.
task_submitter.py:
from .task_receiver import do_work if __name__ == '__main__': for i in range(10): result = do_work.delay(i) print('task submitted' + str(i))
This code will submit the tasks to workers. We need to call do_work
method with delay so that it can be executed in async manner.
Flow returns immediately without waiting for result. If you try to print the result without waiting, it will print None
.
task_receiver.py:
from celery_main.celery import app import time import random @app.task(bind=True,default_retry_delay=10) def do_work(self, item): print('Task received ' + str(item)) # sleep for random seconds to simulate a really long task time.sleep(random.randint(1, 3)) result = item + item return result
We can easily create a task from any callable by using the task()
decorator. This is what we are doing here.
bind=True
means the first argument to the task will always be the task instance (self). Bound tasks are needed for retries, for accessing information about the current task request.
sudo docker-compose up
. We will not be running containers in detached mode (-d ) as we need to see the output. By default it will create one worker.
In another terminal go inside the worker container using command sudo docker exec -it [container-name] bash
. It will start the bash session in working directory defined by WORKDIR
in dockerfile.
Run the task submitter by using command python -m celery_main.task_submitter
. Task submitter will submit the tasks to workers and exit without waiting for results.
You can see the output (info, debug and warnings) in previous terminal. Find out how much seconds cluster took to complete 10 tasks.
Now stop all containers, remove them and restart them. But this time keep the worker count to 10. Use command sudo docker-compose up --scale worker=10
.
Repeat the process and find the time taken to complete the tasks. Repeat above step by changing the worker count and concurrency value in dockerfile to find the best value for your machine where it took least time.
Increasing concurrency value beyond a limit will no longer improve the performance as workers will keep switching the context instead of doing actual job. Similarly increasing the worker count beyond a limit will make your machine go unresponsive. Keep a tab on CPU and memory consumed by running top command in another terminal.
All the twitter handles are in handles.txt
file placed in root directory of code.
Update the task_submitter.py
file to read the handles and submit them to to the task receiver.
Task Receiver will get the response from twitter and parse the response to extract the tweets available on first page. For simplicity we are not going to the second page.
Code to extract the tweets is as below:
@app.task(bind=True,default_retry_delay=10) def do_work(self, handle): print('handle received ' + handle) url = "https://twitter.com/" + handle session = requests.Session() response = session.get(url, timeout=5) print("-- STATUS " + str(response.status_code) + " -- " + url) if response.status_code == 200: parse_tweets(response, handle) def parse_tweets(response, handle): soup = BeautifulSoup(response.text, 'lxml') tweets_list = list() tweets = soup.find_all("li", {"data-item-type": "tweet"}) for tweet in tweets: tweets_list.append(get_tweet_text(tweet)) print(str(len(tweets_list)) + " tweets found.") # save to DB or flat files. def get_tweet_text(tweet): try: tweet_text_box = tweet.find("p", {"class": "TweetTextSize TweetTextSize--normal js-tweet-text tweet-text"}) images_in_tweet_tag = tweet_text_box.find_all("a", {"class": "twitter-timeline-link u-hidden"}) tweet_text = tweet_text_box.text for image_in_tweet_tag in images_in_tweet_tag: tweet_text = tweet_text.replace(image_in_tweet_tag.text, '') return tweet_text except Exception as e: return None
Now if you run this code, it will start throwing too many requests i.e. HTTP status 429 error after few hits. To avoid this we need to use tor network to send the requests from different IPs and we will also use different user agent in each request.
git clone https://github.com/mattes/rotating-proxy
- Build the image and use the same name in docker-compose file.
rproxy: hostname: rproxy image: anuragrana/rotating-proxy environment: - tors=25 ports: - "5566:5566" - "4444:4444"
- You may skip above steps as docker image with tag used in docker-compose is already present in docker hub.
- Create a file proxy.py
and write the below code in it.
import requests import user_agents import random def get_session(): session = requests.session() session.proxies = {'http': 'rproxy:5566', 'https': 'rproxy:5566'} session.headers = get_headers() return session def get_headers(): headers = { "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8", "accept-language": "en-GB,en-US;q=0.9,en;q=0.8", "User-Agent": random.choice(user_agents.useragents) } return headers
- Create a new file user_agents.py
. This will contain the list of user agents and we will use one of these, selected randomly, in each request.
If you will run the container now, IP will be changed after every few requests and user agent will be changed on each hit, resulting in almost zero 429 status responses.