Introduction
In a previous blog post we started gathering the building blocks for creating a data lake for an energy company. It showed how to import information of the current and forecasted weather. Today's post will use this data and join it with the power plant production data in preparation for Hadoop, Spark and machine learning jobs. This is a very technical post, so bear with me.
Since both sources of input data is in JSON format, I will spend most of this post demonstrating different ways to read JSON files using Hive. As was shown in the previous blog post, python has a easier way of extracting data from JSON files, so using pySpark should be considered as an alternative if you are already running a Spark cluster.
Reading flat JSON files with Hive
The production data has a simple structure. The files contain a couple of thousand JSON documents, with one document per row, and there are no nested documents. As such, this data could easily have been stored in a regular table format, such as an SQL table, CSV or Excel, which would have made the Hive queries on the data a lot easier, but as data scientists we rarely choose our own input format. Below is an example of two documents.
{"MeteringPointId":42, "MeteringTimeStamp":"2016-08-05T00:50:00Z","MeasuredValue":"79.498291","Unit":"A"}
{"MeteringPointId":1337,"MeteringTimeStamp":"2016-08-05T00:50:00Z","MeasuredValue":"62.5","Unit":"kW"}
The data is being streamed to a blob storage on the Azure cloud, where the folder names are given by the current date, e.g., "/[year]/[month]/[day]/[filename]_[time].json". To read in all files it is sufficient to give the address to the root folder of the blob. First, we create an external table which stores each document as a string.
DROP TABLE IF EXISTS MyJSONTable;
CREATE EXTERNAL TABLE MyJSONTable (textcol string) STORED AS TEXTFILE LOCATION "wasb://myhdinsight@mydatastorage.blob.core.windows.net/2016/";
Next we use LATERAL VIEW
and JSON_TUPLE
to convert the string into columns.
DROP TABLE IF EXISTS SENSORS;
CREATE TABLE SENSORS(
meteringpointid int,
meteringtimestamp TimeStamp,
measuredvalue float,
unit string);
INSERT OVERWRITE TABLE SENSORS
SELECT
q1.meteringpointid,
CAST(
CONCAT(
SPLIT(q1.meteringtimestamp, "T")[0],
' ',
SUBSTR(SPLIT(q1.meteringtimestamp,"T")[1],0,8)
) AS TimeStamp
),
q1.measuredvalue,
q1.unit
FROM myjsontable jt LATERAL VIEW JSON_TUPLE(jt.textcol,
'meteringpointid',
'meteringtimestamp',
'measuredvalue',
'unit'
) q1 AS
meteringpointid,
meteringtimestamp,
measuredvalue,
unit,
;
The metering time stamp has a format that was somewhat problematic to convert into Hive's TimeStamp type. I am sure that there is a cleaner way to accomplish it, so if you have a suggestion please let me know in the comment section.
The result of this query is a Hive table called "sensors" that is stored on HDFS in a sub folder of /hive.
The advantage of this method is that you can restrict the query to only the fields that are of interest, making it suitable for grabbing a handful of fields from a data set with many columns. The down side is that each field needs to be written four times, making it cumbersome if you want to import all fields of the JSON file into a Hive table.
Reading nested JSON files with Hive
The output of the weather API call contains nested documents, so the method above will not work. Here is a shortened example of what the API returns:
{
"response": {
"version":"0.1",
"termsofService":"http://www.wunderground.com/weather/api/d/terms.html",
...
}
...
, "current_observation": {
...
"station_id":"IBERNE6",
"observation_time":"Last Updated on August 30, 7:50 AM CEST",
"observation_time_rfc822":"Tue, 30 Aug 2016 07:50:49 +0200",
"observation_epoch":"1472536249",
"local_time_rfc822":"Tue, 30 Aug 2016 07:58:08 +0200",
"local_epoch":"1472536688",
"local_tz_short":"CEST",
"local_tz_long":"Europe/Zurich",
"local_tz_offset":"+0200",
"weather":"Mostly Cloudy",
"temperature_string":"67.3 F (19.6 C)",
"temp_f":67.3,
"temp_c":19.6,
"relative_humidity":"68%",
"wind_string":"Calm",
"wind_dir":"SSE",
"wind_degrees":161,
"wind_mph":0.6,
"wind_gust_mph":"1.9",
"wind_kph":1.0,
"wind_gust_kph":"3.1",
"pressure_mb":"1022",
"pressure_in":"30.18",
"pressure_trend":"+",
"dewpoint_string":"56 F (14 C)",
"dewpoint_f":56,
"dewpoint_c":14,
"heat_index_string":"NA",
"heat_index_f":"NA",
"heat_index_c":"NA",
"windchill_string":"NA",
"windchill_f":"NA",
"windchill_c":"NA",
"feelslike_string":"67.3 F (19.6 C)",
"feelslike_f":"67.3",
"feelslike_c":"19.6",
"visibility_mi":"6.2",
"visibility_km":"10.0",
"solarradiation":"--",
"UV":"0","precip_1hr_string":"-999.00 in ( 0 mm)",
"precip_1hr_in":"-999.00",
"precip_1hr_metric":" 0",
"precip_today_string":"0.00 in (0 mm)",
"precip_today_in":"0.00",
"precip_today_metric":"0",
"icon":"mostlycloudy",
"icon_url":"http://icons.wxug.com/i/c/k/mostlycloudy.gif",
"forecast_url":"http://www.wunderground.com/global/stations/06631.html",
"history_url":"http://www.wunderground.com/weatherstation/WXDailyHistory.asp?ID=IBERNE6",
"ob_url":"http://www.wunderground.com/cgi-bin/findweather/getForecast?query=46.943199,7.441810",
"nowcast":""
}
...
}
If we want to access the wind speed from this document we hence need to navigate to the nested document "current_observation".
The beginning of the query is similar to the case with a flat file, by making an external table from the content of the cloud storage. Then we create a table containing the body of the JSON document.
DROP TABLE IF EXISTS RAW_weather;
CREATE EXTERNAL TABLE RAW_weather(textcol STRING)
STORED AS TEXTFILE LOCATION 'wasb://meteo@yourdatastorage.blob.core.windows.net/';
DROP TABLE IF EXISTS WeatherOneLine;
CREATE EXTERNAL TABLE WeatherOneLine
(
json_body string
)
STORED AS TEXTFILE LOCATION '/json/weather';
INSERT OVERWRITE TABLE WeatherOneLine
SELECT CONCAT_WS(' ',COLLECT_LIST(textcol)) AS singlelineJSON
FROM (SELECT INPUT__FILE__NAME,BLOCK__OFFSET__INSIDE__FILE, textcol FROM RAW_weather DISTRIBUTE BY INPUT__FILE__NAME SORT BY BLOCK__OFFSET__INSIDE__FILE) x
GROUP BY INPUT__FILE__NAME;
Again, we create a Hive table in HDFS, at the default location /hive/warehouse/. It needs to include the json_body, that was extracted in the previous step. This time we make use of GET_JSON_OBJECT
to extract the fields from the json_body.
DROP TABLE IF EXISTS weather;
CREATE TABLE weather(
station_id string,
wind_kph float,
local_epoch timestamp ,
json_body STRING);
INSERT OVERWRITE TABLE weather
SELECT
GET_JSON_OBJECT(json_body, '$.current_observation.station_id'),
GET_JSON_OBJECT(json_body, '$.current_observation.wind_kph'),
from_unixtime(cast(GET_JSON_OBJECT(json_body, '$.current_observation.local_epoch') as int)),
json_body
FROM WeatherOneLine;
While this is a rather clean way of accessing fields in nested documents, there is one disadvantage. The json_body, i.e., the entire content returned by the API call, is added as a column to the Hive table. Since that is probably not desired we have to drop this column. In Hive, dropping columns are not as easy as in SQL, so instead of dropping the column we redefine the columns of the table and leave out the column we want to remove.
--drop the long json_body field
ALTER TABLE weather REPLACE COLUMNS(
station_id string,
wind_kph float,
local_epoch timestamp
);
Creating and storing an enriched table in the lake
Now that we have a table called "sensors", which contains the energy production data, and another table called "weather", which contains the information about the weather, we can combine these two data sources to create an enriched table which is more valuable than the sum of it parts.
Suppose that we want to know how weather conditions influence the energy production at a wind farm. If we already know which weather station is closest to the wind farm we can select its weather data from the weather table.
We also need to ensure that the time stamps match up when we compare weather to energy production. I have set up the weather information retrieval to run every 30 minutes, while we sensors from the power plants are streaming thousands of measurements per minute. The easiest is to add a start and stop of the validity of the weather information as calculated fields, and use that to left join the weather to the sensor data.
DROP TABLE IF EXISTS sensors_weather;
CREATE TABLE sensors_weather AS
SELECT *
FROM (SELECT * FROM sensors
WHERE meteringpointid = 42
OR meteringpointid = 1337
) AS s
LEFT JOIN (SELECT * from weather WHERE station_id = "IBERNE6") AS w
WHERE s.meteringtimestamp >= w.epoch_start and s.meteringtimestamp < w.epoch_end
ORDER BY s.meteringtimestamp ASC;
The result is an enriched Hive table on HDFS which is filtered to only contain information that we care about.
Summary and next steps
This blog posts showed
- two ways of querying JSON files on the Azure cloud using Hive and
- how to create enriched Hive tables by joining different data sets.
The enriched table can then be used in Azure Machine Learning, Spark or R to understand how weather conditions affects energy production, and create a model for predicting shortages of energy production based on weather forecasts.