Stream data from Twitter using Python

By Tony Sui, Lewis Fogden, Mon 27 February 2017, in category Data science

Python, Twitter

  
@

Twitter’s data can often provide valuable insight into your company's products, brand, clients, or competition. You can extract sentiment, volume, what's trending, and much more. Enough said, let’s stream some tweets!

Twitter Application

First, a Twitter Application is required. You can create one here. You will need to sign in with your twitter account. Once you’ve signed in, click on Create New App in the top right corner. You will then see the following:

img

Give your application a Name and a Description. The Website is not so relevant at this stage, so you can put down any valid URL as a placeholder. You can also ignore Callback URL for now.

Inside your application, go to the tab Keys and Access Tokens, you can find the values for your Consumer Key (API Key) and Consumer Secret (API Secret). We will use them later.

Preparation

Before we can start coding happily, we need to have the following two files ready:

search_terms.txt, where you have the term/s that you want to search. If you have multiple terms, put each of them in a new line.

config.json, which looks like this:

{
    "TERMS_FILE": "search_terms.txt",
    "APP_KEY": "Your Consumer Key (API Key)",
    "APP_SECRET": "Your Consumer Secret (API Secret)",
    "STORAGE_PATH": "./tweets/"
}

Next, create a folder and name it tweets. This is where the streamed tweets will be stored in.

Lastly, run pip install twython in your environment, since twython is the python library that we use to connect with twitter. In your script, import the following list of libraries:

import time
import json
import os
import logging
import requests
from threading import Thread
from http.client import IncompleteRead
from twython import TwythonStreamer
from twython import Twython

The Authentication class

Create an Authentication class; within the class, we have the following method:

This first method reads the configuration file and returns the parameters that we will use later:

def read_config_file(self, filename):
    with open(filename, "r") as f:
        s = f.read()
    d = json.loads(s)
    APP_KEY = d["APP_KEY"]
    APP_SECRET = d["APP_SECRET"]
    TERMS_FILE = d["TERMS_FILE"]
    STORAGE_PATH = d["STORAGE_PATH"]
    return APP_KEY, APP_SECRET, TERMS_FILE, STORAGE_PATH

To get the OAuth token and token secret, we need to first get the pin code from twitter. This method will output a link in the command line:

def get_oauth_link(self, APP_KEY, APP_SECRET):
    twitter = Twython(APP_KEY, APP_SECRET)
    auth = twitter.get_authentication_tokens()
    OAUTH_TOKEN = auth['oauth_token']
    OAUTH_TOKEN_SECRET = auth['oauth_token_secret']
    url = (auth['auth_url'])
    r = requests.get(url)
    logging.info(
        "Go to the URL below, log in, and copy-paste the PIN you get to "
        "'code.txt':"
    )
    logging.info(url)
    return url, OAUTH_TOKEN, OAUTH_TOKEN_SECRET

Copy and paste the link to a web browser, you will see the pin code. Create a file called code.txt, paste the pin code inside and save it (Do not try to web scrap the pin code as you will risk your IP address being blacklisted by Twitter). The following line is a quick way of creating the pincode file:

echo "123_my_pincode_456" > code.txt

The next method will sit and wait for the pin code in code.txt. Once it detects it, it will read and return it:

def wait_for_pin_code(self):
    while True:
        if not os.path.exists("code.txt"):
            time.sleep(5)
            logging.debug(
                "'code.txt' file doesn't exists, waiting to start listening"
                " to twitter until it is created"
            )
        else:
            pincode = 0
            with open("code.txt") as f:
                pincode = int(f.read().strip())
                logging.info("Pincode read succesfully:" + str(pincode))
            return str(pincode)

The pin code is only valid for one connection, so every time we re-run the twitter scrapper, we need to first remove the code.txt if it exists, hence we have the following method:

def remove_old_code_file(self):
    if os.path.exists("code.txt"):
        os.remove("code.txt")

Lastly, the following method will take the pincode alongside with other authentication parameters, and return the OAUTH_TOKEN and OAUTH_TOKEN_SECRET, which we will use later for creating a Twython Streamer instance:

def auth_with_pin(self, APP_KEY, APP_SECRET, OAUTH_TOKEN, 
                  OAUTH_TOKEN_SECRET, pincode):
    twitter = Twython(APP_KEY, APP_SECRET, OAUTH_TOKEN, OAUTH_TOKEN_SECRET)
    final_step = twitter.get_authorized_tokens(pincode)
    logging.debug("Old OATH_TOKEN: " + str(OAUTH_TOKEN))
    logging.debug("Old OAUTH_TOKEN_SECRET: " + str(OAUTH_TOKEN_SECRET))
    OAUTH_TOKEN = final_step['oauth_token']
    OAUTH_TOKEN_SECRET = final_step['oauth_token_secret']
    logging.debug("New OATH_TOKEN: " + str(OAUTH_TOKEN))
    logging.debug("New OAUTH_TOKEN_SECRET: " + str(OAUTH_TOKEN_SECRET))
    return OAUTH_TOKEN, OAUTH_TOKEN_SECRET

The StreamListener Class

This is the class that will do the actual heavy lifting – streaming data from twitter. But before that, we need to have a TooLongTermException class defined:

class TooLongTermException(Exception):
    def __init__(self, index):
        self.index = index

    def get_too_long_index(self):
        return self.index

This exception will be raised if the search term you specified is too long.


Next is the StreamListener class:

class StreamListener(TwythonStreamer):
    def __init__(self, APP_KEY, APP_SECRET, OAUTH_TOKEN, OAUTH_TOKEN_SECRET,
                 comm_list):
        super().__init__(APP_KEY, APP_SECRET, OAUTH_TOKEN, OAUTH_TOKEN_SECRET)
        self.tweet_list = comm_list

    def on_success(self, data):
        self.tweet_list.append(data)
        logging.info("tweet captured")

    def on_error(self, status_code, data):
        logging.error(status_code)
        logging.error(data)
        if int(status_code) == 406:
            data = str(data)
            try:
                index = int(data.strip().split()[4])
                logging.error("to remove index:" + str(index))
                raise TooLongTermException(index)
            except ValueError:
                logging.debug("ValueError while trying to extract number")

Getting the authentication

This function will instantiate the Authentication class, and return all parameters we need:

def get_authentication():
    auth = Authentication()
    logging.basicConfig(
        format='%(levelname)s: %(asctime)s - %(message)s',
        datefmt='%m/%d/%Y %I:%M:%S %p',
        level=logging.INFO
    )

    logging.info("Removing old pincode file")
    auth.remove_old_code_file()

    logging.info("Loading config file")
    APP_KEY, APP_SECRET, TERMS_FILE, STORAGE_PATH = auth.read_config_file("config.json")

    logging.info("Getting OAuth data")
    url, OAUTH_TOKEN, OAUTH_TOKEN_SECRET = auth.get_oauth_link(APP_KEY, APP_SECRET)

    logging.info("Waiting for pin code")
    pincode = auth.wait_for_pin_code()

    logging.info("Authorizing with pin code")
    OAUTH_TOKEN, OAUTH_TOKEN_SECRET = auth.auth_with_pin(
        APP_KEY, APP_SECRET, OAUTH_TOKEN, OAUTH_TOKEN_SECRET, pincode
    )

    logging.info("Start listening....")

    filter_terms = []
    with open(TERMS_FILE) as f:
        for term in f:
            filter_terms.append(term.strip())
    logging.info("List of terms to filter" + str(filter_terms))

    return APP_KEY, APP_SECRET, OAUTH_TOKEN, OAUTH_TOKEN_SECRET, filter_terms, 
    STORAGE_PATH

Listening and writing twitter’s data

The following two functions will listen and write the streamed tweets into files. Currently it is writing to file every 100 tweets streamed.

def twitter_listener(
    APP_KEY, APP_SECRET, OAUTH_TOKEN, OAUTH_TOKEN_SECRET, comm_list):    
    streamer = StreamListener(APP_KEY, APP_SECRET, OAUTH_TOKEN, OAUTH_TOKEN_SECRET,
    comm_list)
    while True:
        try:
            streamer.statuses.filter(track=[', '.join(filter_terms)], language='en')
        except requests.exceptions.ChunkedEncodingError:
            print('error, but under control\n')
            pass
        except IncompleteRead:
            print('incompletetereaderror, but under control')
            pass
        except TooLongTermException as e:
            index_to_remove = e.get_too_long_index()
            filter_terms.pop(index_to_remove)

:::python
def twitter_writer(comm_list):
    internal_list = []
    time_start = time.time()
    while True:
        if len(internal_list) > 100:
            file_name = STORAGE_PATH + str(round(time.time())) + ".json"
            with open(file_name, 'w+', encoding='utf-8') as output_file:
                json.dump(internal_list, output_file, indent=4)
                internal_list = []
                logging.info('------- Data dumped -------')
                time_stop = time.time()
                logging.info('Time taken for 100 tweets: {0:.2f}s'.format(
                    time_stop - time_start
                ))
                time_start = time.time()
        else:
            for i in range(len(comm_list)):
                internal_list.append(comm_list.pop())
            time.sleep(1)

Execute

Finally, we run the twitter scraper inside a name == 'main' block. This ensures that the scraper is only launched if we run the script with interpreter directly; if it is imported into another script as a module, only the defined classes and functions will be imported. The listener and writer will run in different threads.

if __name__ == '__main__':
    # Get the authentication
    APP_KEY, APP_SECRET, OAUTH_TOKEN, OAUTH_TOKEN_SECRET, filter_terms, 
    STORAGE_PATH = get_authentication()
    comm_list =[]

    # Start the threads
    listener = Thread(target = twitter_listener, args = (
        APP_KEY, APP_SECRET, OAUTH_TOKEN, OAUTH_TOKEN_SECRET, comm_list ))
    listener.start()
    writer = Thread(target = twitter_writer, args = (comm_list,))
    writer.start()
    writer.join()
    listener.join()

If you have followed through to here, great job! Now you can run the twitter script, sit back and relax, and look at all the tweets streaming in. This should feel awesome!