Murphy’s API

Data Loader

A data loader that converts json.bz2 files into a functional Dask Dataframe.

While this module has a ton of functionality, most of it has been abstracted into it’s constructor (__init__)

class murphy.data_loader.DataLoader(file_find_expression: Union[str, pathlib.Path, List[pathlib.Path]], remove_emoji: bool = True, remove_retweets_symbols: bool = True, remove_truncated_tweets: bool = True, add_usernames: bool = True, tokenize: bool = True, filter_stopwords: bool = True, lemmatize: bool = True, language: str = 'english')[source]

Bases: object

This is where you can specify how you want to configure the twitter dataset before you can load it. It’s functionality includes:

  • removing emojis

  • removing retweets symbols

  • lemmatizing the text

  • filtering by language

  • and more!

Parameters
  • file_find_expression – unix-like path that is used for listing out all of the files that we need

  • remove_emoji – flag for removing emojis from all of the twitter text

  • remove_retweets_symbols – flag for removing retweet strings from all of the twitter text (RT @<retweet_username>:)

  • remove_truncated_tweets – flag for removing all tweets that are truncated, as not all information can be found in them

  • add_usernames – flag for adding in the user names from who tweeted as a separate column instead of parsing it from the user column

  • tokenize – tokenize tweets to make them easier to process

  • filter_stopwords – remove stopwords from the tweets to make them easier to process

  • lemmatize – lemmatize text to make it easier to process

  • language – select the language that you want to work with

static get_files_list(pathname: Union[str, pathlib.Path], recursive: bool = False, suffix: str = '*.json*')List[str][source]

Function to get files from the given pathname.

Searches in the directory when pathname leads to a directory with the option for adding a custom suffix

If pathname given is a directory, searches in the directory

Parameters
  • pathname – pathname from where we can get the files

  • recursive – Flag for searching recursively

  • suffix – suffix to search for files when a pathname leads to a directory is given

Raises

ValueError – When no files are found based on the pathname

Returns

Filters

author: v2thegreat (v2thegreat@gmail.com)

Package to filter out irrelevant rows that might not be wanted for processing

TODO:
  • This package is written with the hopes to better understand what problems processing such a dataset would be

encountered, and it is hence written with the understanding that this and other scripts will be refactored - Add tests

class murphy.filters.Filter(remove_emoji: bool = True, remove_retweets: bool = False, remove_truncated_tweets: bool = False)[source]

Bases: object

static filter_emoji(twitter_dataframe: Union[dask.dataframe.core.DataFrame, pandas.core.frame.DataFrame])Union[dask.dataframe.core.DataFrame, pandas.core.frame.DataFrame][source]
static filter_retweet_text(twitter_dataframe: Union[dask.dataframe.core.DataFrame, pandas.core.frame.DataFrame])Union[dask.dataframe.core.DataFrame, pandas.core.frame.DataFrame][source]
static mark_truncated_tweets(tweet_dataframe: Union[dask.dataframe.core.DataFrame, pandas.core.frame.DataFrame])Union[dask.dataframe.core.DataFrame, pandas.core.frame.DataFrame][source]
static remove_truncated_tweets(tweet_dataframe: Union[dask.dataframe.core.DataFrame, pandas.core.frame.DataFrame])Union[dask.dataframe.core.DataFrame, pandas.core.frame.DataFrame][source]
run_filters(twitter_dataframe: Union[dask.dataframe.core.DataFrame, pandas.core.frame.DataFrame])Union[dask.dataframe.core.DataFrame, pandas.core.frame.DataFrame][source]

NLP Tools

class murphy.nlp_tools.NLPTools(tokenize: bool = True, filter_stopwords: bool = True, lemmatize: bool = True, language: str = 'english')[source]

Bases: object

filter_stopwords(tweet_dataframe: dask.dataframe.core.DataFrame)Union[dask.dataframe.core.DataFrame, pandas.core.frame.DataFrame][source]
lemmatize_tweets(tweet_dataframe: dask.dataframe.core.DataFrame)Union[dask.dataframe.core.DataFrame, pandas.core.frame.DataFrame][source]
nlp = <spacy.lang.en.English object>
run_tools(tweet_dataframe: dask.dataframe.core.DataFrame)Union[dask.dataframe.core.DataFrame, pandas.core.frame.DataFrame][source]
tokenize_tweets(tweet_dataframe: Union[dask.dataframe.core.DataFrame, pandas.core.frame.DataFrame])Union[dask.dataframe.core.DataFrame, pandas.core.frame.DataFrame][source]

Batch Processing

Note

This module is written with the hopes to better understand what problems processing such a dataset would be encountered, and hence written in it’s current flexible manner

Module to process tweets from the data_loading in batches to reduce the workload on the scheduler by applying various functions in batches

class murphy.batch_processing.Batches[source]

Bases: object

static process_in_batches(file_paths: Iterable[str], read_func: Callable[[str], Any], func_to_apply: Callable[[Any], Any], verbose: bool = True)Dict[str, Any][source]

Function to process data in batches to circumvent Dask Scheduler’s limitations (max of 100k tasks for example)

Parameters
  • file_paths – path of files that need to be individually processed

  • read_func – function to read the file. This must return an object (for example: Dask Bag, Dask Array, str)

  • func_to_apply – function to apply on the object that’s returned on the read_func

  • verbose – show progress bar?

Returns

a dictionary that has the schema: {file_name: func_to_apply’s return value}

static process_in_batches_generator(file_iterator: Iterable[str], read_func: Callable[[str], Any], func_to_apply: Callable[[Any], Any])Iterable[Any][source]

Function to process data in batches to circumvent Dask Scheduler’s limitations for 100k tasks

Parameters
  • file_iterator – iterator that contains a file names

  • read_func – function to read the file. This must return an object (for example: Dask Bag, Dask Array, str)

  • func_to_apply – function to apply on the object that’s returned on the read_func

Returns

a dictionary that has the schema: {file_name: func_to_apply’s return value}

Module contents

class murphy.Batches[source]

Bases: object

static process_in_batches(file_paths: Iterable[str], read_func: Callable[[str], Any], func_to_apply: Callable[[Any], Any], verbose: bool = True)Dict[str, Any][source]

Function to process data in batches to circumvent Dask Scheduler’s limitations (max of 100k tasks for example)

Parameters
  • file_paths – path of files that need to be individually processed

  • read_func – function to read the file. This must return an object (for example: Dask Bag, Dask Array, str)

  • func_to_apply – function to apply on the object that’s returned on the read_func

  • verbose – show progress bar?

Returns

a dictionary that has the schema: {file_name: func_to_apply’s return value}

static process_in_batches_generator(file_iterator: Iterable[str], read_func: Callable[[str], Any], func_to_apply: Callable[[Any], Any])Iterable[Any][source]

Function to process data in batches to circumvent Dask Scheduler’s limitations for 100k tasks

Parameters
  • file_iterator – iterator that contains a file names

  • read_func – function to read the file. This must return an object (for example: Dask Bag, Dask Array, str)

  • func_to_apply – function to apply on the object that’s returned on the read_func

Returns

a dictionary that has the schema: {file_name: func_to_apply’s return value}

class murphy.DataLoader(file_find_expression: Union[str, pathlib.Path, List[pathlib.Path]], remove_emoji: bool = True, remove_retweets_symbols: bool = True, remove_truncated_tweets: bool = True, add_usernames: bool = True, tokenize: bool = True, filter_stopwords: bool = True, lemmatize: bool = True, language: str = 'english')[source]

Bases: object

This is where you can specify how you want to configure the twitter dataset before you can load it. It’s functionality includes:

  • removing emojis

  • removing retweets symbols

  • lemmatizing the text

  • filtering by language

  • and more!

Parameters
  • file_find_expression – unix-like path that is used for listing out all of the files that we need

  • remove_emoji – flag for removing emojis from all of the twitter text

  • remove_retweets_symbols – flag for removing retweet strings from all of the twitter text (RT @<retweet_username>:)

  • remove_truncated_tweets – flag for removing all tweets that are truncated, as not all information can be found in them

  • add_usernames – flag for adding in the user names from who tweeted as a separate column instead of parsing it from the user column

  • tokenize – tokenize tweets to make them easier to process

  • filter_stopwords – remove stopwords from the tweets to make them easier to process

  • lemmatize – lemmatize text to make it easier to process

  • language – select the language that you want to work with

static get_files_list(pathname: Union[str, pathlib.Path], recursive: bool = False, suffix: str = '*.json*')List[str][source]

Function to get files from the given pathname.

Searches in the directory when pathname leads to a directory with the option for adding a custom suffix

If pathname given is a directory, searches in the directory

Parameters
  • pathname – pathname from where we can get the files

  • recursive – Flag for searching recursively

  • suffix – suffix to search for files when a pathname leads to a directory is given

Raises

ValueError – When no files are found based on the pathname

Returns

class murphy.Filter(remove_emoji: bool = True, remove_retweets: bool = False, remove_truncated_tweets: bool = False)[source]

Bases: object

static filter_emoji(twitter_dataframe: Union[dask.dataframe.core.DataFrame, pandas.core.frame.DataFrame])Union[dask.dataframe.core.DataFrame, pandas.core.frame.DataFrame][source]
static filter_retweet_text(twitter_dataframe: Union[dask.dataframe.core.DataFrame, pandas.core.frame.DataFrame])Union[dask.dataframe.core.DataFrame, pandas.core.frame.DataFrame][source]
static mark_truncated_tweets(tweet_dataframe: Union[dask.dataframe.core.DataFrame, pandas.core.frame.DataFrame])Union[dask.dataframe.core.DataFrame, pandas.core.frame.DataFrame][source]
static remove_truncated_tweets(tweet_dataframe: Union[dask.dataframe.core.DataFrame, pandas.core.frame.DataFrame])Union[dask.dataframe.core.DataFrame, pandas.core.frame.DataFrame][source]
run_filters(twitter_dataframe: Union[dask.dataframe.core.DataFrame, pandas.core.frame.DataFrame])Union[dask.dataframe.core.DataFrame, pandas.core.frame.DataFrame][source]