Fork me on GitHub

Building a data lake 1: Weather and time

Introduction

I am currently building a data lake which will be used to improve operations at an energy company using machine learning. Among the many interesting topics the following are prioritized:

  • Can we predict the energy production of hydroelectric, solar and wind power plants?
  • Can we predict the energy consumption using weather reports? After all, home owners need to heat their homes more on a cold winter day than on a sunny day in October.
  • Under what conditions to equipment fail? Can we predict the need for maintenance and optimize scheduled down time?

Hence, getting access to weather data is necessary. In today's post I want to share what I did yesterday to

  • import weather data from an external service provider
  • deal with time stamps.

I intend to follow up this post with the subsequent steps in building the data lake, the predictive analytics and the delivery of these insights as a service.

Getting weather data

There are multiple ways to get access to weather data, but my preferred method is using an API designed for this purpose. My favourite provider is Weather Underground, https://www.wunderground.com/. They have both free and paid options, but no matter what you opt for, you do need to register to get an API key.

After importing some libraries that we will use we query the API with the longitude and latitude of the location of interest.

In [2]:
import urllib2
import json
import time
import os
import pandas as pd

f = urllib2.urlopen('http://api.wunderground.com/api/'+apikey+'/geolookup/conditions/forecast/q/46.94809341,7.44744301.json')
json_string = f.read()

It is also possible to query by city name, for example for Bern, the capital of Switzerland (CH):

In [3]:
f = urllib2.urlopen('http://api.wunderground.com/api/'+apikey+'/geolookup/conditions/q/CH/Bern.json')

I sometimes am interested in writing the raw data I get back from the API:

In [4]:
with open('weather.json', 'w') as file:
  file.write(json_string)
file.close()

However, we might want to extract specific fields of the JSON document the API returned. Fortunately, parsing the JSON document is trivial in python.

In [5]:
parsed_json = json.loads(json_string)
location = parsed_json['location']['city']
temp_c = parsed_json['current_observation']['temp_c']
print "Current temperature in %s is: %s" % (location, temp_c)
Current temperature in Berne is: 19.6

Time information

To blend this weather data with the data from the sensor data from the power plants, we have to make sure we take the data from the correct time window. We can get the time of the last observation just like we got the temperature above.

In [6]:
print parsed_json['current_observation']['observation_time']
Last Updated on August 30, 7:50 AM CEST

While this "pretty print" is human readible, it is harder for something like Hive or SQL to interpret. For this purpose, it is good practice to make use of the time epoch.

In [7]:
obs_time = parsed_json['current_observation']['observation_epoch']
print obs_time
1472536249

This value can be used to extract things like the year, month, day etc, which makes it easier to define in which user defined time window the observation belongs to.

In [8]:
print "Observation time", time.strftime('%Y-%m-%d %H:%M:%S %Z', time.localtime(float(obs_time)))
Observation time 2016-08-30 07:50:49 CEST

Now, since I am running this as a script on an R-server edge node on the Azure cloud, the time zone of the node and the observation point can be different. To ensure that the correct time zone is used, we can prepend the previous command with this:

In [9]:
os.environ['TZ'] = 'Europe/Zurich'
time.tzset()

CSV output

While nested JSON files provide a high level of flexibility, they add a degree of complication if we want to query them with SQL of Hive. For this reason I decided to select only the fields that I am interested in and store them as flat comma separated files. (Incidentally, this also saves storage space, although it is not relevant in this project.)

Given a target dictionary, look for keys in a list of keys to keep in the root of another nested dictionary:

In [10]:
def addToFlatDict(mydict, root, keep_list):
    for k in root.keys():
        if k in keep_list :
            mydict[k] = root[k]

We can then define a dictionary of field that we want to keep.

In [11]:
mydict = dict()
addToFlatDict(mydict, parsed_json['current_observation'], ['temp_c', 'wind_kph', 'wind_gust_kph', 'local_epoch', 'wind_degrees', 'dewpoint_c', 'precip_1hr_metric', 'precip_today_metric'] )
addToFlatDict(mydict, parsed_json['current_observation']['observation_location'], ['latitude', 'longitude'] )

Then create a pandas dataframe from the dictionary we just produced.

In [12]:
df = pd.DataFrame(mydict, index=[0])
df['observation_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(float(obs_time)))

I want to start a new CSV file once a day, and append all the measurements for that day as new rows of that file. This requires that each row follows the same schema.

Let's use a convenient helper method for checking the validity. (I copied this method from a discussion on StackOverflow.)

In [13]:
def appendDFToCSV_void(df, csvFilePath, sep=","):
    import os
    if not os.path.isfile(csvFilePath):
        df.to_csv(csvFilePath, mode='a', index=False, sep=sep)
    elif len(df.columns) != len(pd.read_csv(csvFilePath, nrows=1, sep=sep).columns):
        raise Exception("Columns do not match!! Dataframe has " + str(len(df.columns)) + " columns. CSV file has " + str(len(pd.read_csv(csvFilePath, nrows=1, sep=sep).columns)) + " columns.")
    elif not (df.columns == pd.read_csv(csvFilePath, nrows=1, sep=sep).columns).all():
        raise Exception("Columns and column order of dataframe and csv file do not match!!")
    else:
        df.to_csv(csvFilePath, mode='a', index=False, sep=sep, header=False)
        
appendDFToCSV_void(df, 'weather'+time.strftime('%Y-%m-%d', time.localtime(float(obs_time)))+'.csv')

The finished script

Putting all the pieces together we end up with the script below:

In [16]:
# coding: utf-8
#!/usr/bin/python
import urllib2
import json
import time
import os
import pandas as pd
#os.environ['TZ'] = 'Europe/Zurich'
#time.tzset()

# Given a target dictionary, look for keys in a list of keys to keep in the root of another nested dictionary
def addToFlatDict(mydict, root, keep_list):
    for k in root.keys():
        if k in keep_list :
            mydict[k] = root[k]

# Write a new csv file with headers from an input pandas DataFrame if the file does not exist. If the file exists append data as new rows.
def appendDFToCSV_void(df, csvFilePath, sep=","):
    import os
    if not os.path.isfile(csvFilePath):
        df.to_csv(csvFilePath, mode='a', index=False, sep=sep)
    elif len(df.columns) != len(pd.read_csv(csvFilePath, nrows=1, sep=sep).columns):
        raise Exception("Columns do not match!! Dataframe has " + str(len(df.columns)) + " columns. CSV file has " + str(len(pd.read_csv(csvFilePath, nrows=1, sep=sep).columns)) + " columns.")
    elif not (df.columns == pd.read_csv(csvFilePath, nrows=1, sep=sep).columns).all():
        raise Exception("Columns and column order of dataframe and csv file do not match!!")
    else:
        df.to_csv(csvFilePath, mode='a', index=False, sep=sep, header=False)

# Get the weather information as a json file
f = urllib2.urlopen('http://api.wunderground.com/api/'+apikey+'/geolookup/conditions/forecast/q/46.94809341,7.44744301.json')

# Write out the full response of the API call as a json file
json_string = f.read()
parsed_json = json.loads(json_string)
obs_time = parsed_json['current_observation']['observation_epoch']
with open('weather'+time.strftime('%Y-%m-%d-%H-%M-%S', time.localtime(float(obs_time)))+'.json', 'w') as file:
  file.write(json_string)
file.close()

# Define a dictionary of field that we want to keep.
mydict = dict()
addToFlatDict(mydict, parsed_json['current_observation'], ['temp_c', 'wind_kph', 'wind_gust_kph', 'local_epoch', 'wind_degrees', 'dewpoint_c', 'precip_1hr_metric', 'precip_today_metric'] )
addToFlatDict(mydict, parsed_json['current_observation']['observation_location'], ['latitude', 'longitude'] )

# Turn the dictionary into a DataFrame and extract time fields for easier joins further downstream
df = pd.DataFrame(mydict, index=[0])
df['observation_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(float(obs_time)))

# Write the DataFrame to disk as a csv file
appendDFToCSV_void(df, 'weather'+time.strftime('%Y-%m-%d', time.localtime(float(obs_time)))+'.csv')
f.close()

Conclusion

This mini tutorial has shown how to query and access weather data, and how to deal with the time and time zone of the measurement. With this we are ready to start blending this data with other data that is stored on the data lake. The next steps will be included in a future blog post.

Share on: LinkedInTwitterFacebookGoogle+Email

Comments !

blogroll

social