1) Import relevant libraries#
In this step, all the relevant Python libraries are imported.
import pandas as pd
import numpy as np
import urllib.request
import urllib.parse
import psycopg2
import json
from datetime import date
from pandas.io.json import json_normalize
from sqlalchemy import create_engine
from sqlalchemy.engine import URL
2) Fetch response data from the Open-meteo API and normalize#
In this step, the request to the Open-meteo API is performed and the response is stored in a datafrane.
url = "https://api.open-meteo.com/v1/forecast?latitude=50.8371&longitude=4.3676&hourly=temperature_2m,relativehumidity_2m,dewpoint_2m,apparent_temperature,cloudcover,rain,pressure_msl"
response = urllib.request.urlopen(url)
record_list = json.loads(response.read())
df = json_normalize(record_list)
3) Define the DB engine parameters.#
In this step, the configuration parameters of PostgreSQL DB are defined. These parameter will be used to correctly establish the connection to the PostgreSQL DB.
drivername = 'postgresql'
# EDIT THIS LINE host = '<postgresSQLHost>'
port = '5432'
username = 'postgres'
# EDIT THIS LINE password = '<postgreSQLPassword>'
database = 'postgres'
DATABASE = {
'drivername': drivername,
'host': host,
'port': port,
'username': username,
'password': password,
'database': database
}
4) Data pre-processing.#
In this step we substitute the character "." with "_" for JSON nested levels (to not incur in SQL statement's syntax errors).
df_columns = df.columns.tolist()
df_columns = [c.replace('.','_') for c in df_columns]
df.columns = df_columns
df_columns
5) Store the metrics.#
In this step we save each weather metric in a dedicated object.
time = df["hourly_time"]
temperature = df["hourly_temperature_2m"]
relativehumidity = df["hourly_relativehumidity_2m"]
dewpoint = df["hourly_dewpoint_2m"]
apparent_temperature = df["hourly_apparent_temperature"]
cloudcover = df["hourly_cloudcover"]
rain = df["hourly_rain"]
pressure_msl = df["hourly_pressure_msl"]
6) Make the data ready to be stored on the DB.#
In this step we concatenate each object's serie. In this way we make the data suitable to be hosted on our SQL DB.
time_conc = np.stack(time)
temperature_conc = np.stack(temperature)
relativehumidity_conc = np.stack(relativehumidity)
dewpoint_conc = np.stack(dewpoint)
apparent_temperature_conc = np.stack(apparent_temperature)
cloudcover_conc = np.stack(cloudcover)
rain_conc = np.stack(rain)
pressure_msl_conc = np.stack(pressure_msl)
time_transposed = pd.DataFrame(time_conc).transpose()
temperature_transposed = pd.DataFrame(temperature_conc).transpose()
relativehumidity_transposed = pd.DataFrame(relativehumidity_conc).transpose()
dewpoint_transposed = pd.DataFrame(dewpoint_conc).transpose()
apparent_temperature_transposed = pd.DataFrame(apparent_temperature_conc).transpose()
cloudcover_transposed = pd.DataFrame(cloudcover_conc).transpose()
rain_transposed = pd.DataFrame(rain_conc).transpose()
pressure_msl_transposed = pd.DataFrame(pressure_msl_conc).transpose()
8) Rename columns to have a more meaningful naming#
time_transposed.columns = ['hourly_time']
temperature_transposed.columns = ['hourly_temperature_2m']
relativehumidity_transposed.columns = ['hourly_relativehumidity']
dewpoint_transposed.columns = ['hourly_dewpoint']
apparent_temperature_transposed.columns = ['hourly_apparent_temperature']
cloudcover_transposed.columns = ['hourly_cloudcover']
rain_transposed.columns = ['hourly_rain']
pressure_msl_transposed.columns = ['hourly_pressure_msl']
9) Merge the pre-processed objects into the final dataframe: in this step we create a unic dataframe which contains all the weather metrics.#
hourly_data = pd.concat([time_transposed.reset_index(drop = True),temperature_transposed.reset_index(drop = True),relativehumidity_transposed.reset_index(drop = True),dewpoint_transposed.reset_index(drop = True),apparent_temperature_transposed.reset_index(drop = True),cloudcover_transposed.reset_index(drop = True),rain_transposed.reset_index(drop = True),pressure_msl_transposed.reset_index(drop = True)],axis = 1)
hourly_data
10) Function to create the "hourly data" table that will be pushed to our PostgreSQL DB.#
def create_table_hourly_data():
# a) Connect to the PostgreSQL instance through psycopg2 library: the database parameters defined at the beginning of the code are here used.
conn = psycopg2.connect(database=database, user=username,password=password,host=host,port=port)
# b) Create a cursor object
cur=conn.cursor()
# c) Execute the SQL statement to create the table with the needed columns and data formats.
cur.execute("CREATE TABLE IF NOT EXISTS hourly_data (hourly_time DATE,hourly_temperature_2m FLOAT,hourly_relativehumidity FLOAT,hourly_dewpoint FLOAT,hourly_apparent_temperature FLOAT,hourly_cloudcover DECIMAL,hourly_rain FLOAT,hourly_pressure_msl FLOAT)")
# d) Commit to actually perform the executed SQL statement.
conn.commit()
# e) Close the connection with the PostgreSQL instance.
conn.close()
create_table_hourly_data()
11) Create an SQL engine to feed the data into the newly created PostgreSQL table.#
engine = create_engine(URL(**DATABASE))
12) Feed the data into the "hourly_data" table through the "to_sql" method.#
hourly_data.to_sql("hourly_data", engine, index=False, if_exists='replace')
print("Done")
13) Create the dataframe which will contain the metadata related to the response - unit of measured.#
metadata = df.iloc[:,0:15]
14) Append the date of the current script's execution. This information will be shown in the dashboard to make sure that the displayed insights are always up to date.#
metadata['execution_date'] = date.today()
metadata
15) Function to create the "metadata" table#
def create_table_metadata():
# a) Connect to the PostgreSQL instance through psycopg2 library
conn = psycopg2.connect(database=database,user=username,password=password,host=host,port=port)
# b) Create a cursor object
cur=conn.cursor()
# c) Execute the SQL statement to create the table
cur.execute("CREATE TABLE IF NOT EXISTS metadata (latitude FLOAT,longitude FLOAT,generationtime_ms FLOAT,utc_offset_seconds INT,timezone VARCHAR,timezone_abbreviation VARCHAR,elevation FLOAT,hourly_units_time VARCHAR,hourly_units_temperature_2m VARCHAR)")
# d) Commit to actually perform the executed SQL statement
conn.commit()
# e) Close the connection with the PostgreSQL instance.
conn.close()
create_table_metadata()
16) Create an SQL engine to feed the data into the newly created PostgreSQL table#
engine = create_engine(URL(**DATABASE))
17) Feed the data into the "hourly_data" table through the "to_sql" method#
metadata.to_sql("metadata", engine, index=False, if_exists='replace')
print("Done")