An important part of the everyday tasks of a data scientist is data wrangling. Input data must be accessed, retrieved, understood, and transformed before machine learning can be applied to create predictive models. While most talk is around deep learning these days, this less sexy topic is arguably more important for real life situations.
A common example of data wrangling is dealing with time series data and resample this data to custom time periods. The python library Pandas is well suited to this task, but what if the data volume is in the range of terabytes or larger? This blog post introduces Spark dataframes and shows how to perform the same data manipulation on Spark dataframes and Pandas dataframes.
from __future__ import print_function
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import dates as dates
from IPython.display import display
import customcolors # private class for colors
%matplotlib inline
The data we will use for this exercise is publicly available data from Swissgrid, the national energy grid company in Switzerland. As the owner of Switzerland's extra-high-voltage grid, it is responsible for the safe operation of the grid without discrimination, and for maintaining, modernizing and expanding the grid efficiently and with respect for the environment.
First we import the data into a Pandas dataframe and take a look at what data we were given:
url = 'https://www.swissgrid.ch/dam/dataimport/energy_data/en/EnergieUebersichtCH_2017.xls'
df = pd.read_excel(url, sheetname='Zeitreihen0h15')
df.drop(['Zeitstempel'], inplace=True)
df = df.apply(pd.to_numeric, errors='coerce', axis=1)
print (df.info())
display (df.describe())
df.head()
The above command can take a while to complete since we are reading in a rather large Excel file directly from Swissgrid's website. The resulting Pandas dataframe contains 64 columns with long header names, sometimes including line breaks. We also note that some columns represent data from a specific Swiss canton, while other columns contains data of two or more cantons summed up. All data is numerical with no null values, so there is no immediate data quality concern. The rows represent snapshots with 15 minute intervals.
As we saw above, the input data is already quite tidy. Features are represented by individual columns and every row is a set of measurements. This a good schema for machine learning or data analysis. However, when working with raw data from e.g., IoT devices or log files, things are not served on a silver plate. Let's make the data more "realistic" by creating a long list of energy production measurements at individual cantons, where a measurement consists of
- a time stamp
- a categorical identifier
- a numerical measurement
df_prod = pd.DataFrame(columns=['Cantons', 'Production'])
for c in df.columns:
if 'Production Canton' in c:
cantons = c.split('\n')[1].split()[2:] # array
cantons = ''.join(cantons) # string
df_temp = pd.DataFrame(index = df.index, columns=['Cantons', 'Production'])
df_temp.Production = df[c]
df_temp.Cantons = cantons
df_prod = df_prod.append(df_temp)
print ('Number of production measurements:', len(df_prod.index))
df_prod.head()
We save this "realistic" dataframe of cantonal energy production as a csv file so that we can use it later with Spark.
df_prod.to_csv('production.csv')
I am frequently using pandas when analyzing data. The reason for this is the ease at which one can manipulate and select data. Often the simplest way to perform a complex operation on a Excel spreadsheet is to load in pandas, apply the operation, and save the result back to Excel.
One of the great functionalities of pandas dataframes is how it handles time series data. The energy production dataframe we just created uses a DatetimeIndex. This makes it trivial to resample our measurements from 15 min to another time window:
df_prod_wide = df_prod.pivot(columns='Cantons').resample('1 d').sum()
df_prod_wide.head()
Resampling from 15 minutes to 1 day make sense if we want to show the production as a function of time and not have a ridiculous amount of data points along the time axis.
fig, axarr = plt.subplots(1,1, figsize =(12,8))
df_prod_wide.Production[['AG', 'AI,AR', 'BE,JU', 'BL,BS']].plot(ax=axarr, colormap=customcolors.deloitte_colormap())
axarr.set_ylabel('Energy produced per day [kWh/day]')
axarr.set_xlabel('Time');
While Pandas is convenient, the fact that it is running on a single machine is limiting its capability to handle big data volumes. We need distributed computing to spread the load across a cluster of worker nodes and avoid loading more data than a single computer can work with. My preferred solution is using Spark on HDFS, and this section will show how the equivalent of the time series resampling with Pandas above can be accomplished with Spark.
Since Spark 2.0 this has become much easier since we can now rely on Spark SQLContext dataframes and the window()
command. Spark dataframes were introduced with Spark 1.3 and have now matured to the point that they are replacing the older Spark RDD. Dataframes have the advantage of being faster to process and have a more familiar API for users coming from SQL or Pandas. Rather than repeating all information on what a dataframe is and how it can be used, I encourage readers to read the SQL programming guide.
To try this out by yourself, you can either spin up a Spark cluster on Azure or AWS, run a virtual machine or Docker container, or you can make a standalone installation on your local machine.
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
print (pyspark.__version__)
First we read in the csv file we created earlier. Using the SQLContext this can be achieved in with a single command. inferSchema=True
will automatically identify the index column as timestamps. Alternatively the index column can be manually casted: sdf._c0.cast('timestamp')
.
sqlContext = SQLContext(sc)
sdf = sqlContext.read.csv('production.csv', header=True, inferSchema=True) # requires spark 2.0 or later
print ('Number of rows: ' , sdf.count())
sdf.printSchema()
sdf.show()
Once the old DatetimeIndex column is correctly interpreted as a timestamp it is possible to effortlessly extract details from the timestamp. For example, the line below extracts the time of day and the name of weekday as separate columns. It also shows how to combine the two to provide weekday number and time of day as one combined string.
sdf.select('_c0', date_format('_c0', 'HH:mm').alias('time'), date_format('_c0', 'E').alias('weekday'), date_format('_c0', 'u HH:mm').alias('weekdaytime')).show()
In the past I had to convert the timestamp to unix_timestamp, round it to the time window boundary, and convert back to timestamp. It worked, but was awkward and cumbersome... With more recent version of Spark this can be done more elegantly, and with a syntax that resembles Pandas resample()
:
group = sdf.groupBy('Cantons', window("_c0", "1 day")).agg(sum("Production").alias('Sum Production'))
sdf_resampled = group.select(group.window.start.alias("Start"), group.window.end.alias("End"), "Cantons", "Sum Production").orderBy('Start', ascending=True)
sdf_resampled.printSchema()
sdf_resampled.show()
The time columns differ by one hour from the previous ones due to that local time (CET) is UTC + 1 hour.
Just like with Pandas, we can pivot the dataframe to have the daily energy production measurements by canton as individual columns.
sdf_wide = sdf_resampled.groupBy("Start").pivot("Cantons").agg(avg("Sum Production"))
sdf_wide.printSchema()
sdf_wide.take(2)
To visualize the data we first convert the resampled Spark dataframe to a Pandas dataframe, and rank the cantons by their total energy production since the beginning of the year.
pdf_wide = sdf_wide.toPandas().set_index("Start").divide(1e6) # convert kWh to GWh
print ("Total production by canton [GWh]:")
print (pdf_wide.sum().sort_values(ascending=False))
display (pdf_wide.tail())
fig, axarr = plt.subplots(1,1, figsize =(12,8))
pdf_wide[['AG', 'VS', 'SO', 'BE,JU']].plot(ax=axarr, colormap=customcolors.deloitte_colormap())
axarr.set_ylabel('Energy produced per day [GWh/day]')
axarr.set_xlabel('Time');
The energy production of the cantons have very different characteristics.
- Aargau (AR) is obviously dominated by man-controlled energy sources, and this is no surprise since most of Switzerland's nuclear power is situated in Aargau.
- The neighbor Solothurn (SO) also have a nuclear power plant and not much else.
- Valais (VS), on the other hand, is high alpine region and its energy production is dominated by Hydroelectric energy. This is the source of the strong dependence of seasonality.
- Bern (BE) and Jura (JU), have one nuclear power plant, but also high mountains which are home to hydroelectric, wind and photo-voltaic energy sources.
This visualization gives a hint at the energy strategy of the country; energy production is based on hydroelectric power, but nuclear compensates for periods when renewable energy is not meeting the needs of the country.
This post has demonstrated how to pivot and resample time series in Pandas and Spark. The data used for this exercise is real measurements of energy production in Switzerland. The resampled data shows evidence of where nuclear power plant and renewable energy sources are located.
Comments !