{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "To show off a recent command line tool for sketching, [dsrs](https://github.com/vlad17/datasketches-rs), let's plot the rolling 28-day average daily count of active reviewers on Amazon.\n", "\n", "The raw data here is `item,user,rating,timestamp` so this would map to a sophisticated `GROUP BY` with a `COUNT DISTINCT` over 28-day windows in SQL. But since the data's only available as CSV, how can we get to the same answer? If we're just interested in an approximate solution, can we do this without using a bunch of memory or custom (shuffle-inducing...) sliding window implementation?\n", "\n", "All timings below done on a 16-physical CPU machine (AWS r4.8xlarge)." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "# https://nijianmo.github.io/amazon/index.html\n", "# 6.7gb\n", "# May 1996 - Oct 2018, e.g.:\n", "# 0449819906,A3U4E9PIZ8OWH1,5.0,1383696000\n", "# timestamp is then unix time in seconds.\n", "prefix = 'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFilesSmall/'\n", "review_data = {\n", " 'Amazon Fashion': 'AMAZON_FASHION.csv',\n", " 'All Beauty': 'All_Beauty.csv',\n", " 'Appliances': 'Appliances.csv',\n", " 'Arts, Crafts and Sewing': 'Arts_Crafts_and_Sewing.csv',\n", " 'Automotive': 'Automotive.csv',\n", " 'Books': 'Books.csv',\n", " 'CDs and Vinyl': 'CDs_and_Vinyl.csv',\n", " 'Cell Phones and Accessories': 'Cell_Phones_and_Accessories.csv',\n", " 'Clothing, Shoes and Jewelry': 'Clothing_Shoes_and_Jewelry.csv',\n", " 'Digital Music': 'Digital_Music.csv',\n", " 'Electronics': 'Electronics.csv',\n", " 'Gift Cards': 'Gift_Cards.csv',\n", " 'Grocery and Gourmet Food': 'Grocery_and_Gourmet_Food.csv',\n", " 'Home and Kitchen': 'Home_and_Kitchen.csv',\n", " 'Industrial and Scientific': 'Industrial_and_Scientific.csv',\n", " 'Kindle Store': 'Kindle_Store.csv',\n", " 'Luxury Beauty': 'Luxury_Beauty.csv',\n", " 'Magazine Subscriptions': 'Magazine_Subscriptions.csv',\n", " 'Movies and TV': 'Movies_and_TV.csv',\n", " 'Musical Instruments': 'Musical_Instruments.csv',\n", " 'Office Products': 'Office_Products.csv',\n", " 'Patio, Lawn and Garden': 'Patio_Lawn_and_Garden.csv',\n", " 'Pet Supplies': 'Pet_Supplies.csv',\n", " 'Prime Pantry': 'Prime_Pantry.csv',\n", " 'Software': 'Software.csv',\n", " 'Sports and Outdoors': 'Sports_and_Outdoors.csv',\n", " 'Tools and Home Improvement': 'Tools_and_Home_Improvement.csv',\n", " 'Toys and Games': 'Toys_and_Games.csv',\n", " 'Video Games': 'Video_Games.csv'\n", "}\n", "review_data = {k: prefix + v for k, v in review_data.items()}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Even with a 28d sliding window, if we're sliding by a day, it's still quite a few data points." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "8188.0" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import pandas as pd\n", "(pd.Timestamp('Oct 2018') - pd.Timestamp('May 1996')) / pd.Timedelta('1d')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Store all urls in a variable" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "from shlex import quote\n", "urls = ' '.join(list(map(quote, review_data.values())))" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "%%bash -s {urls}\n", "\n", "echo 'will cite' | parallel --citation 1> /dev/null 2> /dev/null \n", "\n", "parallel curl -o \"/tmp/amazon{#}.csv\" -s {} ::: \"$@\"" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "9.0G\ttotal\n", "230139802\n" ] } ], "source": [ "%%bash\n", "\n", "# Total data size\n", "du -hsc /tmp/amazon*.csv | tail -1\n", "\n", "# How many reviews?\n", "parallel --pipepart wc -l :::: /tmp/amazon*.csv \\\n", " | awk '{s+=$1}END{print s}'" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "43404924\n" ] } ], "source": [ "%%bash\n", "\n", "# How many users?\n", "parallel --pipepart 'cut -d, -f2 | dsrs --raw' :::: /tmp/amazon*.csv \\\n", " | dsrs --merge" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Overwriting /tmp/date-user-extract.awk\n" ] } ], "source": [ "%%writefile /tmp/date-user-extract.awk\n", "#!/usr/bin/awk\n", "\n", "BEGIN {\n", " FS = \",\" \n", "}\n", "\n", "1 {\n", " user = $2;\n", " epoch_sec = $4;\n", " # round down to nearest day\n", " rounded_epoch_sec = strftime(\"%Y %m %d 00 00 00\", epoch_sec);\n", " rounded_epoch_sec = mktime(rounded_epoch_sec)\n", " for (i = 0; i < 28; i += 1) {\n", " dt = strftime(\"%F\", rounded_epoch_sec);\n", " print dt \" \" user\n", " # a day can be more than this many seconds due to leaps but\n", " # since we only decrement 28 times the undershoot doesn't matter\n", " rounded_epoch_sec -= 86400\n", " }\n", "}" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "2013-11-06 A3U4E9PIZ8OWH1\n", "2013-11-05 A3U4E9PIZ8OWH1\n", "2013-11-04 A3U4E9PIZ8OWH1\n" ] } ], "source": [ "%%bash\n", "\n", "# test date mapper\n", "echo 0449819906,A3U4E9PIZ8OWH1,5.0,1383696000 | awk -f /tmp/date-user-extract.awk | head -3" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "%%bash\n", " \n", "# How many 28d users?\n", "parallel --pipepart 'awk -f /tmp/date-user-extract.awk' :::: /tmp/amazon*.csv \\\n", " | dsrs --key >/tmp/ts" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", " | cnt | \n", "
---|---|
date | \n", "\n", " |
1996-04-23 | \n", "1 | \n", "
1996-04-24 | \n", "1 | \n", "
1996-04-25 | \n", "1 | \n", "
1996-04-26 | \n", "1 | \n", "
1996-04-27 | \n", "1 | \n", "