Combing Pig and Python to explore raw datasets via Pydoop

Taking the raw data processing capabilities of Hadoop and seamlessly integrating that into an python notebook to start doing from exploratory data analysis directly from my raw electricity dataset.

Loading the libaries

In [1]:
import pandas as pd
import sys
import numpy as np
import math

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

Write the pig script out to the server (this gets saved in the working directory of the ipython notebook). The purpose of this script is to take the raw json files that get generated every day from my logger (one for each day) and turns it into something that can be read into python. I also convert the date into something useful and create columns to represnt the hour and day of week to help the anlaysis. It then writes this all out to csv file(s). This will create a directory called electricty.csv and in that directory a number of csv files 1 for each split in the map reduce.

In [2]:
%%writefile ElectricityReadings.pig
REGISTER '/edge/elephant-bird-pig-4.3.jar';
REGISTER '/edge/elephant-bird-hadoop-compat-4.3.jar';
REGISTER '/edge/json-simple-1.1.jar';

A = LOAD '/user/hive/warehouse/sarafand.db/stg_raw_loop/*.json' 
   USING com.twitter.elephantbird.pig.load.JsonLoader() as (json:map[]);
                                                     
                                                     
B = FOREACH A GENERATE ToDate((long)ROUND_TO((double)$0#'Current Time'*1000,0)) as TimeStamp,
                       $0#'Type' AS Type,
                       $0#'Electricity Reading' AS ElectricityReading;                     
                       
C = FILTER B By ((chararray)Type == 'Electricity');
                
D = FOREACH C GENERATE 
   TimeStamp,
   GetHour(TimeStamp) as hour,
   ToString( TimeStamp, 'EEE' ) as DayofWeek,
   ElectricityReading;

store D INTO '/user/olloyd/Pig Learning/electricty.csv' 
    USING PigStorage(',');
Overwriting ElectricityReadings.pig

The job will fail if a previous run has been performed so we need to delete these. This just runs a command on the analysis server to delete the electricty.csv directory from the hadoop file system.

In [3]:
%%bash
hdfs dfs -rm -r '/user/olloyd/Pig Learning/electricty.csv'
Deleted /user/olloyd/Pig Learning/electricty.csv
17/07/02 09:59:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/07/02 10:00:03 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.

Now run the ElectricityReadings pig scipt created above. The second block of code is optional and allows you to see the progress of the job as it runs.

In [4]:
%%bash --err pig_out --bg 
pig -f ElectricityReadings.pig
Starting job # 0 in a separate thread.
In [5]:
while True:
    line = pig_out.readline()
    if not line: 
        break
    sys.stdout.write("%s" % line)
    sys.stdout.flush()  
17/07/02 10:00:09 INFO pig.ExecTypeProvider: Trying ExecType : LOCAL
17/07/02 10:00:09 INFO pig.ExecTypeProvider: Trying ExecType : MAPREDUCE
17/07/02 10:00:09 INFO pig.ExecTypeProvider: Picked MAPREDUCE as the ExecType
2017-07-02 10:00:09,733 [main] INFO  org.apache.pig.Main - Apache Pig version 0.15.0-SNAPSHOT (r: unknown) compiled May 15 2017, 21:33:30
2017-07-02 10:00:09,738 [main] INFO  org.apache.pig.Main - Logging error messages to: /home/olloyd/Untitled Folder/pig_1498986009706.log
2017-07-02 10:00:12,258 [main] WARN  org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2017-07-02 10:00:13,548 [main] INFO  org.apache.pig.impl.util.Utils - Default bootup file /home/olloyd/.pigbootup not found
2017-07-02 10:00:14,495 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
2017-07-02 10:00:14,510 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2017-07-02 10:00:14,516 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://master01:54310
2017-07-02 10:00:17,892 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
2017-07-02 10:00:17,902 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2017-07-02 10:00:18,210 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
2017-07-02 10:00:18,228 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
SNIP!!
2017-07-02 10:03:44,114 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
2017-07-02 10:03:44,732 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
2017-07-02 10:03:44,760 [main] INFO  org.apache.pig.tools.pigstats.mapreduce.SimplePigStats - Script Statistics: 

HadoopVersion	PigVersion	UserId	StartedAt	FinishedAt	Features
2.6.4	0.15.0-SNAPSHOT	olloyd	2017-07-02 10:00:27	2017-07-02 10:03:44	FILTER

Success!

Job Stats (time in seconds):
JobId	Maps	Reduces	MaxMapTime	MinMapTime	AvgMapTime	MedianMapTime	MaxReduceTime	MinReduceTime	AvgReduceTime	MedianReducetime	Alias	Feature	Outputs
job_1498419277295_0036	16	0	78	34	59	59	0	0	0	0	A,B,C,D	MAP_ONLY	/user/olloyd/Pig Learning/electricty.csv,

Input(s):
Successfully read 871076 records (79444243 bytes) from: "/user/hive/warehouse/sarafand.db/stg_raw_loop/*.json"

Output(s):
Successfully stored 860330 records (36173906 bytes) in: "/user/olloyd/Pig Learning/electricty.csv"

Counters:
Total records written : 860330
Total bytes written : 36173906
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:
job_1498419277295_0036
2017-07-02 10:03:45,438 [main] INFO  org.apache.pig.Main - Pig script completed in 3 minutes, 37 seconds and 56 milliseconds (217056 ms)

Pydoop has a built in funtion that allows us to read in data directly from the hadoop file system. As the map reduce program outputs a number of files rather than a single csv we need to loop through them for each one append them into a pandas dataframe ready for anlaysis in python.

In [6]:
import pydoop.hdfs as hdfs
def read_csv_from_hdfs(path, cols, col_types=None):
  files = hdfs.ls(path);
  pieces = []
  for f in files:
    fhandle = hdfs.open(f)
    pieces.append(pd.read_csv(fhandle, names=cols, dtype=col_types))
    fhandle.close()
  return pd.concat(pieces, ignore_index=True)
In [7]:
ElecReadings = read_csv_from_hdfs('/user/olloyd/Pig Learning/electricty.csv',["TimeStamp","Hour","DayofWeek","ElectricityReading"])

As this step can take a few minutes and I might want to pick up this analysis again later I write out the data to the pickle jar and then another statement to read it. OK I know its not really called the pickle jar but seriously.. it should be.

In [8]:
ElecReadings.to_pickle('ElecReadings')
In [9]:
ElecReadings = pd.read_pickle('ElecReadings')

Now we can take a quick look and see what we are dealing with. My pandas dataframe isnt too large - just over 860K records with just 4 columns.

In [10]:
ElecReadings.shape
Out[10]:
(860330, 4)

To take a look at the column names we can either list them out or use the .columns attribute in pandas.

In [11]:
list(ElecReadings)
ElecReadings.columns
Out[11]:
['TimeStamp', 'Hour', 'DayofWeek', 'ElectricityReading']
Out[11]:
Index([u'TimeStamp', u'Hour', u'DayofWeek', u'ElectricityReading'], dtype='object')

No we want to start taking a quick look at the data itself. We can the .values attribute we can get a Numpy representation of the data frame.

In [12]:
ElecReadings.values
Out[12]:
array([['2017-05-04T00:00:10.216+01:00', 0.0, 'Thu', 376.0],
       ['2017-05-04T00:00:21.218+01:00', 0.0, 'Thu', 381.0],
       ['2017-05-04T00:00:32.218+01:00', 0.0, 'Thu', 381.0],
       ..., 
       ['2017-04-07T23:59:30.660+01:00', 23.0, 'Fri', 343.0],
       ['2017-04-07T23:59:41.659+01:00', 23.0, 'Fri', 319.0],
       ['2017-04-07T23:59:52.658+01:00', 23.0, 'Fri', 336.0]], dtype=object)

I personally dont find that very useful when trying to get a feel for the data. Instead I use the commands below. Head and Tail show the top and bottom n records where as sample will show as a random number of records taken from across the whole dataframe,

In [13]:
ElecReadings.head(5)
ElecReadings.tail(10)
ElecReadings.sample(10)
Out[13]:
TimeStamp Hour DayofWeek ElectricityReading
0 2017-05-04T00:00:10.216+01:00 0.0 Thu 376.0
1 2017-05-04T00:00:21.218+01:00 0.0 Thu 381.0
2 2017-05-04T00:00:32.218+01:00 0.0 Thu 381.0
3 2017-05-04T00:00:43.266+01:00 0.0 Thu 372.0
4 2017-05-04T00:00:58.194+01:00 0.0 Thu 396.0
Out[13]:
TimeStamp Hour DayofWeek ElectricityReading
860320 2017-04-07T23:58:13.667+01:00 23.0 Fri 331.0
860321 2017-04-07T23:58:24.666+01:00 23.0 Fri 343.0
860322 2017-04-07T23:58:35.665+01:00 23.0 Fri 345.0
860323 2017-04-07T23:58:46.663+01:00 23.0 Fri 338.0
860324 2017-04-07T23:58:57.662+01:00 23.0 Fri 345.0
860325 2017-04-07T23:59:08.661+01:00 23.0 Fri 345.0
860326 2017-04-07T23:59:19.661+01:00 23.0 Fri 343.0
860327 2017-04-07T23:59:30.660+01:00 23.0 Fri 343.0
860328 2017-04-07T23:59:41.659+01:00 23.0 Fri 319.0
860329 2017-04-07T23:59:52.658+01:00 23.0 Fri 336.0
Out[13]:
TimeStamp Hour DayofWeek ElectricityReading
785929 2017-04-13T04:25:49.576+01:00 4.0 Thu 220.0
549490 2017-04-12T17:23:05.721+01:00 17.0 Wed 465.0
622591 2017-03-18T02:20:33.387Z 2.0 Sat 376.0
40702 2017-06-26T05:21:41.521+01:00 5.0 Mon 273.0
407637 2017-04-21T13:38:54.473+01:00 13.0 Fri 463.0
850733 2017-03-31T17:01:47.119+01:00 17.0 Fri 408.0
747063 2017-04-06T03:03:32.425+01:00 3.0 Thu 223.0
414218 2017-06-27T09:51:17.412+01:00 9.0 Tue 453.0
584891 2017-04-11T06:14:02.394+01:00 6.0 Tue 232.0
741433 2017-06-09T09:44:43.085+01:00 9.0 Fri 556.0

We can also get some basic statistics

In [14]:
ElecReadings.describe()
Out[14]:
Hour ElectricityReading
count 860330.000000 860330.000000
mean 11.517785 573.221439
std 6.934797 667.531266
min 0.000000 134.000000
25% 5.000000 278.000000
50% 12.000000 381.000000
75% 18.000000 501.000000
max 23.000000 8054.000000

These figures are shown as watts per hour. So my maximum usage in my house so far has been just over 8kwH. We must have had every appliance on at the time! We can filter the records to take a look.

In [15]:
ElecReadings.query('ElectricityReading > 8e+3')
Out[15]:
TimeStamp Hour DayofWeek ElectricityReading
697694 2017-03-22T17:58:16.617Z 17.0 Wed 8054.0
697695 2017-03-22T17:58:24.611Z 17.0 Wed 8030.0
697696 2017-03-22T17:58:32.607Z 17.0 Wed 8006.0

We could equally have run the below. Results are excactly the same.

In [16]:
ElecReadings[ElecReadings.ElectricityReading > 8e+3]
Out[16]:
TimeStamp Hour DayofWeek ElectricityReading
697694 2017-03-22T17:58:16.617Z 17.0 Wed 8054.0
697695 2017-03-22T17:58:24.611Z 17.0 Wed 8030.0
697696 2017-03-22T17:58:32.607Z 17.0 Wed 8006.0

Null values can cause a lot of issues so I just want to check to see if my dataset has any that I need to deal with.

In [17]:
ElecReadings[ElecReadings.ElectricityReading.isnull()]
Out[17]:
TimeStamp Hour DayofWeek ElectricityReading

Luckly the data I have collected so far doesnt have any.

From the describe a few steps about it looks like 75% of my readings are below 500wH but there is a large gap between this and my maximum reading of 8kwH. I will create a subset of data just looking above the 500W mark to get some flavour of the higer readings.

In [18]:
HighElecReadings = ElecReadings.query('ElectricityReading > 5e+2')

HighElecReadings.sample(10)
HighElecReadings.describe()
Out[18]:
TimeStamp Hour DayofWeek ElectricityReading
315770 2017-03-25T19:37:31.862Z 19.0 Sat 571.0
38420 2017-06-10T22:15:34.187+01:00 22.0 Sat 578.0
298849 2017-05-22T15:45:27.896+01:00 15.0 Mon 3242.0
720029 2017-04-09T15:07:43.569+01:00 15.0 Sun 506.0
503158 2017-05-24T19:01:13.408+01:00 19.0 Wed 1922.0
472400 2017-04-22T20:31:35.205+01:00 20.0 Sat 573.0
826910 2017-05-05T12:55:22.340+01:00 12.0 Fri 1711.0
54733 2017-03-11T17:44:54.524Z 17.0 Sat 568.0
759267 2017-04-10T17:03:52.800+01:00 17.0 Mon 501.0
113495 2017-06-18T00:13:03.871+01:00 0.0 Sun 1466.0
Out[18]:
Hour ElectricityReading
count 217846.000000 217846.000000
mean 14.510810 1277.234799
std 5.163628 1036.561291
min 0.000000 501.000000
25% 11.000000 552.000000
50% 15.000000 648.000000
75% 19.000000 1809.000000
max 23.000000 8054.000000

So it looks like the higer readings are quite rare with the distibution definitely skewed towards the lower end. Not unsurprising if you think about it - how many times a day do you actually boil a kettle?

Basic statistics can help a little bit but really we need to plot some of this out to get a real understanding of the data. matplotlib is a good libary for doing 2D plots and as I also use ggplot within R I'm really happy that is it able to mirror the style.

In [19]:
import matplotlib.pyplot as plt
plt.style.use('ggplot')
%matplotlib inline
In [20]:
fig1 = plt.hist((ElecReadings['ElectricityReading']), bins= np.arange(min(ElecReadings['ElectricityReading'])
   ,max(ElecReadings['ElectricityReading']) + 100,100))

Yup definitely skewed! We need to split this into 2 charts. The first looking in more detail at the lower readings by limiting the x axis to values between 0 and 1kwH and shrinking the bin size to 10. The other looking at values over 1000kwH

In [21]:
fig2 = plt.hist((ElecReadings['ElectricityReading']), bins= np.arange(min(ElecReadings['ElectricityReading'])
   ,max(ElecReadings['ElectricityReading']) + 10,10));plt.xlim(0,1000)

plt.show()

fig3= plt.hist((ElecReadings['ElectricityReading']), bins= np.arange(min(ElecReadings['ElectricityReading'])
   ,max(ElecReadings['ElectricityReading']) + 100,100));plt.xlim(1000);plt.ylim(0,8000)

plt.show()
Out[21]:
(0, 1000)
Out[21]:
(1000, 8534.0)
Out[21]:
(0, 8000)

The second chart we start seeing a series of peaks every 1.5 KwH or so. We could get a little bit more insight maybe by change the y axis scale to a logarithmic one.

In [22]:
fig4 = plt.hist((ElecReadings['ElectricityReading']), bins= np.arange(min(ElecReadings['ElectricityReading'])
   ,max(ElecReadings['ElectricityReading']) + 100,100));plt.yscale('log')

There is definitely a pattern of peaks every 1.5 KwH. 1.5, 3, 4.5, 6 and just below 7.5 KwH. It just so happens that our kettle is a 1.5KwH one.. And Anita does like her Tea. I suspect if I look around we might find a number of our appliances are around the 1.5 KwH mark.

Summary

What we have done here is to take raw data directly from my Utility Sensor log files, process them on the hadoop cluster using pig and do some basic exploratory data anlysis using python and the matplotlib libaries. We have identified a pattern in the data with peaks every 1.5kwH that I balme on Anita's Tea habit.In another post I will look in a little more detail as well as use Machine Learning to understand what variables have an impact on our electricity usage. Hopefully settle an age old argument in the house about uses the most energy.. or most likely reignite it!