Blog available for sell
This blog is available for sale. Please 'contact us' if interested.
Advertise with us
Scraping 10000 tweets in 60 seconds using celery, RabbitMQ and Docker cluster with rotating proxy

In previous articles we used requests and BeautifulSoup to scrape the data. Scraping data this way is slow (Using selenium is even slower).

Sometimes we need data quickly. But if we try to speed up the process of scraping data using multi-threading or any other technique, we will start getting http status 429  i.e. too may requests. We might get banned from the site as well.

Purpose of this article is to scrape lots of data quickly without getting banned and we will do this by using docker cluster of celery and RabbitMQ along with Tor.

For this to achieve we will follow below steps:

  1. Install docker and docker-compose
  2. Download the boilerplate code to setup docker cluster in 5 minutes
  3. Understanding the code
  4. Experiment with docker cluster
  5. Update the code to download tweets
  6. Using Tor to avoid getting banned.
  7. Speeding up the process by increasing workers and concurrency
Note: This article is for educational purpose only. Do not send too many requests to any server. Respect the robot.txt file. Use API if possible.

Let's start.


Installing docker and docker-compose:

  • We will be using docker version Docker version 17.12.0-ce, build c97c6d6 . Install docker by following the instructions on docker's official page.
  • Docker-compose version being used is docker-compose version 1.8.0, build unknown. Install docker compose by following instructions on their page.


Boilerplate code:

Clone the code from this Github repository. README file is added to help you start with cluster.

Dockerfile used to build the worker image is using python:3 docker image.

Directory structure of code:

.
--- celery_main
|   --- celery.py
|   --- __init__.py
|   --- task_receiver.py
|   --- task_submitter.py
--- docker-compose.yml
--- dockerfile
--- README.md
--- requirements.txt
1 directory, 8 files


Run the below command to start the docker cluster:

sudo docker-compose up


This will run one container for each worker and RabbitMQ.  Once you see something like

worker_1  | [2018-03-01 10:46:30,013: INFO/MainProcess] celery@5af881b83b97 ready.

on your screen at the end of output, you are good to go.

Now you can submit the tasks. But before going any further lets try to understand the code while it is simple and small.


Understanding the code:

celery.py:

from celery import Celery

app = Celery(
    'celery_main',
    broker='amqp://myuser:mypassword@rabbit:5672',
    backend='rpc://',
    include=['celery_main.task_receiver']
)


The first argument to Celery is the name of the current module. This is only needed so that names can be automatically generated when the tasks are defined in the __main__ module.

The second argument is the broker keyword argument, specifying the URL of the message broker you want to use. Here using RabbitMQ (also the default option).

The third argument is backend. A backend in Celery is used for storing the task results.


task_submitter.py:

from .task_receiver import do_work


if __name__ == '__main__':
    for i in range(10):
        result = do_work.delay(i)
        print('task submitted' + str(i))


This code will submit the tasks to workers. We need to call do_work method with delay so that it can be executed in async manner.

Flow returns immediately without waiting for result. If you try to print the result without waiting, it will print None.


task_receiver.py:

from celery_main.celery import app
import time
import random


@app.task(bind=True,default_retry_delay=10)
def do_work(self, item):
    print('Task received ' + str(item))
    # sleep for random seconds to simulate a really long task
    time.sleep(random.randint(1, 3))

    result = item + item
    return result


We can easily create a task from any callable by using the task() decorator. This is what we are doing here.

bind=True means the first argument to the task will always be the task instance (self). Bound tasks are needed for retries, for accessing information about the current task request.


Experimenting with Docker cluster:

Run the containers by using command sudo docker-compose up.

We will not be running containers in detached mode (-d ) as we need to see the output. By default it will create one worker.

In another terminal go inside the worker container using command sudo docker exec -it [container-name] bash. It will start the bash session in working directory defined by WORKDIR in dockerfile.

Run the task submitter by using command python -m celery_main.task_submitter. Task submitter will submit the tasks to workers and exit without waiting for results. 

You can see the output (info, debug and warnings) in previous terminal. Find out how much seconds cluster took to complete 10 tasks. Now stop all containers, remove them and restart them. But this time keep the worker count to 10. Use command sudo docker-compose up --scale worker=10.

Repeat the process and find the time taken to complete the tasks. Repeat above step by changing the worker count and concurrency value in dockerfile to find the best value for your machine where it took least time.

Increasing concurrency value beyond a limit will no longer improve the performance as workers will keep switching the context instead of doing actual job. Similarly increasing the worker count beyond a limit will make your machine go unresponsive. Keep a tab on CPU and memory consumed by running top command in another terminal.


Let's start downloading tweets:

Now lets start extending the boilerplate code. All the code we are going to write below is available on Github. You can download and run the code to scrape the tweets.

All the twitter handles are in handles.txt file placed in root directory of code.

Update the task_submitter.py file to read the handles and submit them to to the task receiver. Task Receiver will get the response from twitter and parse the response to extract the tweets available on first page. For simplicity we are not going to the second page.

Code to extract the tweets is as below:

@app.task(bind=True,default_retry_delay=10)
def do_work(self, handle):
    print('handle received ' + handle)
    url = "https://twitter.com/" + handle
    session = requests.Session()
    response = session.get(url, timeout=5)
    print("-- STATUS " + str(response.status_code) + " -- " + url)
    if response.status_code == 200:
        parse_tweets(response, handle)


def parse_tweets(response, handle):
    soup = BeautifulSoup(response.text, 'lxml')
    tweets_list = list()
    tweets = soup.find_all("li", {"data-item-type": "tweet"})
    for tweet in tweets:
        tweets_list.append(get_tweet_text(tweet))

    print(str(len(tweets_list)) + " tweets found.")
    # save to DB or flat files.


def get_tweet_text(tweet):
    try:
        tweet_text_box = tweet.find("p", {"class": "TweetTextSize TweetTextSize--normal js-tweet-text tweet-text"})
        images_in_tweet_tag = tweet_text_box.find_all("a", {"class": "twitter-timeline-link u-hidden"})
        tweet_text = tweet_text_box.text
        for image_in_tweet_tag in images_in_tweet_tag:
            tweet_text = tweet_text.replace(image_in_tweet_tag.text, '')
        return tweet_text
    except Exception as e:
        return None


Now if you run this code, it will start throwing too many requests i.e. HTTP status 429 error after few hits. To avoid this we need to use tor network to send the requests from different IPs and we will also use different user agent in each request.


Adding Rotating Proxy:

- Clone this git repository.

git clone https://github.com/mattes/rotating-proxy

- Change anything in the code as per you requirement.

- Build the image and use the same name in docker-compose file.

    rproxy:
        hostname: rproxy
        image: anuragrana/rotating-proxy
        environment:
            - tors=25
        ports:
            - "5566:5566"
            - "4444:4444"


- You may skip above steps as docker image with tag used in docker-compose is already present in docker hub.

- Create a file proxy.py and write the below code in it.

import requests
import user_agents
import random


def get_session():
    session = requests.session()
    session.proxies = {'http': 'rproxy:5566',
                       'https': 'rproxy:5566'}
    session.headers = get_headers()
    return session


def get_headers():
    headers = {
        "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
        "accept-language": "en-GB,en-US;q=0.9,en;q=0.8",
        "User-Agent": random.choice(user_agents.useragents)
    }

    return headers


- Create a new file user_agents.py. This will contain the list of user agents and we will use one of these, selected randomly, in each request.


If you will run the container now, IP will be changed after every few requests and user agent will be changed on each hit, resulting in almost zero 429 status responses.

 

Performance:

I was able to download approx 10000 tweets in 60 seconds using 15 workers and 5 concurrency in each worker. Increasing the worker count beyond 15 started making machine unresponsive. You may try with different numbers and find out the configuration which will help scrape the maximum tweets in minimum time.


Source code:

Source code is available on Github.

References:
  • https://docs.docker.com/
  • https://medium.com/@tonywangcn/how-to-build-docker-cluster-with-celery-and-rabbitmq-in-10-minutes-13fc74d21730
  • http://docs.celeryproject.org/en/latest/getting-started/
  • https://github.com/mattes/rotating-proxy

1 comment on 'Scraping 10000 Tweets In 60 Seconds Using Celery, Rabbitmq And Docker Cluster With Rotating Proxy'
Login to comment

Ethan Kerdelhue April 16, 2024, 7:50 p.m.
You completly crazy ! Thats some serious awesome work you do :) Thanks!

Related Articles:
Python Script 14: Scraping news headlines using python beautifulsoup
Scraping news headlines using python beautifulsoup, web scraping using python, python script to scrape news, web scraping using beautifulsoup, news headlines scraping using python, python programm to get news headlines from web...
py_instagram_dl - The Python Package to Download All pictures of an Instagram User
Download all instagram images for any user using this python package....
How to create completely automated telegram channel with python
Creating a completely automated telegram channel to generate and post content using python code on regular basis. Automating the Telegram channel using python script...
Python Script 2 : Crawling all emails from a website
Website crawling for email address, web scraping for emails, data scraping and fetching email adress, python code to scrape all emails froma websites, automating the email id scraping using python script, collect emails using python script...
DigitalOcean Referral Badge

© 2022-2023 Python Circle   Contact   Sponsor   Archive   Sitemap