Use case code Scenario 4 Big data analytics
1. Import relevant libraries#
In this step, all the Python libraries that are needed to process the data are imported.
Code Step 1:#
import sys
import os
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql import functions as F
from pyspark.sql.types import (
ArrayType,
IntegerType,
MapType,
StringType,
StructField,
StructType,
)
2. Define MinIO config parameters#
To connect to the MinIO bucket which contains the data, it is necessary to input the followings in the next line of code:
- url: this is the private address of the MinIO instance. This Jupyter notebook will try to connect to MinIO through the private endpoint, since both the Jupyter and MinIO instance are deployed in the same private network.
- accessKey: this is the accessKey to connect to MinIO, as defined in the service account.
- secretKey: this is the secretKey to connect to MinIO, as defined in the service account.
To retrieve these information, please refer to Step 3.3 of the use case guide.
Code Step 2:#
accessKey = "<input_your_accessKey_here>"
secretKey = "<input_your_secretKey_here>"
url = "<input_your_url_here"
connectionTimeOut = "30000"
3. Define Spark Session#
To connect to the Spark instance, which will process our dataset, we will need to make some configurations to make sure that the SparkSession is set up correctly. These configurations are meant to: - Connect the Jupyter Notebook to the Spark instance - Connect the Spark instance to MinIO
Here below a brief explanation of the relevant settings:
spark = SparkSession.builder \
.master("<input_your_spark_address_here") \ #1
.config("spark.submit.deployMode", "client") \
.config("spark.driver.host","<input_your_jupyter_hostname>") \ #2
.config("spark.driver.port", "9000")\
.config("spark.executor.memory","4GB") \
.config("spark.hadoop.fs.s3a.endpoint", url) \ #3
.config("spark.hadoop.fs.s3a.access.key", accessKey) \ #4
.config("spark.hadoop.fs.s3a.secret.key", secretKey)\ #5
.config("spark.hadoop.fs.s3a.connection.timeout", connectionTimeOut) \
.config("spark.hadoop.fs.s3a.path.style.access", "true")\
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")\
.config("spark.sql.debug.maxToStringFields", "100000") \
.getOrCreate()
- Here the Spark master node address should be inputed. To retrieve this information, please refer to Step 4.3 of the use case script.
- This configuration will allow to make sure that Spark connects with Jupyter. To this extent, the Jupyter hostname should be inputed. To retrieve this information, please refer to Step 5.5 of the use case script.
- This configuration provides Spark the MinIO address (defined in the previous step) to read the data.
- This configuration provides Spark the MinIO the accessKey (defined in the previous step) to have access to the data.
- This configuration provides Spark the MinIO the secretKey (defined in the previous step) to have access to the data.
Code Step 3:#
spark = SparkSession.builder \
.master("<input_your_spark_address_here") \
.config("spark.submit.deployMode", "client") \
.config("spark.driver.host","<input_your_jupyter_pod_endpoint_here>.<input_your_dsl_here>.pod.cluster.local") \
.config("spark.driver.port", "9000")\
.config("spark.executor.memory","4GB") \
.config("spark.hadoop.fs.s3a.endpoint", url) \
.config("spark.hadoop.fs.s3a.access.key", accessKey) \
.config("spark.hadoop.fs.s3a.secret.key", secretKey)\
.config("spark.hadoop.fs.s3a.connection.timeout", connectionTimeOut) \
.config("spark.hadoop.fs.s3a.path.style.access", "true")\
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")\
.config("spark.sql.debug.maxToStringFields", "100000") \
.getOrCreate()
4. Pre-define the schema for the source data#
In this step, we define a function which generates the data schema forour dataset. This data schema will accomodate all the information of each document/paper present in the dataset. A view of the data schema is here reported:
root
|-- paper_id: string (nullable = true)
|-- metadata: struct (nullable = true)
| |-- title: string (nullable = true)
| |-- authors: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- first: string (nullable = true)
| | | |-- middle: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| | | |-- last: string (nullable = true)
| | | |-- suffix: string (nullable = true)
| | | |-- affiliation: struct (nullable = true)
| | | | |-- laboratory: string (nullable = true)
| | | | |-- institution: string (nullable = true)
| | | | |-- location: struct (nullable = true)
| | | | | |-- addrLine: string (nullable = true)
| | | | | |-- country: string (nullable = true)
| | | | | |-- postBox: string (nullable = true)
| | | | | |-- postCode: string (nullable = true)
| | | | | |-- region: string (nullable = true)
| | | | | |-- settlement: string (nullable = true)
| | | |-- email: string (nullable = true)
|-- abstract: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- text: string (nullable = true)
| | |-- cite_spans: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- start: integer (nullable = true)
| | | | |-- end: integer (nullable = true)
| | | | |-- text: string (nullable = true)
| | | | |-- ref_id: string (nullable = true)
| | |-- ref_spans: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- start: integer (nullable = true)
| | | | |-- end: integer (nullable = true)
| | | | |-- text: string (nullable = true)
| | | | |-- ref_id: string (nullable = true)
| | |-- eq_spans: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- start: integer (nullable = true)
| | | | |-- end: integer (nullable = true)
| | | | |-- text: string (nullable = true)
| | | | |-- ref_id: string (nullable = true)
| | |-- section: string (nullable = true)
|-- body_text: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- text: string (nullable = true)
| | |-- cite_spans: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- start: integer (nullable = true)
| | | | |-- end: integer (nullable = true)
| | | | |-- text: string (nullable = true)
| | | | |-- ref_id: string (nullable = true)
| | |-- ref_spans: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- start: integer (nullable = true)
| | | | |-- end: integer (nullable = true)
| | | | |-- text: string (nullable = true)
| | | | |-- ref_id: string (nullable = true)
| | |-- eq_spans: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- start: integer (nullable = true)
| | | | |-- end: integer (nullable = true)
| | | | |-- text: string (nullable = true)
| | | | |-- ref_id: string (nullable = true)
| | |-- section: string (nullable = true)
|-- bib_entries: map (nullable = true)
| |-- key: string
| |-- value: struct (valueContainsNull = true)
| | |-- ref_id: string (nullable = true)
| | |-- title: string (nullable = true)
| | |-- authors: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- first: string (nullable = true)
| | | | |-- middle: array (nullable = true)
| | | | | |-- element: string (containsNull = true)
| | | | |-- last: string (nullable = true)
| | | | |-- suffix: string (nullable = true)
| | |-- year: integer (nullable = true)
| | |-- venue: string (nullable = true)
| | |-- volume: string (nullable = true)
| | |-- issn: string (nullable = true)
| | |-- pages: string (nullable = true)
| | |-- other_ids: struct (nullable = true)
| | | |-- DOI: array (nullable = true)
| | | | |-- element: string (containsNull = true)
|-- ref_entries: map (nullable = true)
| |-- key: string
| |-- value: struct (valueContainsNull = true)
| | |-- text: string (nullable = true)
| | |-- latex: string (nullable = true)
| | |-- type: string (nullable = true)
|-- back_matter: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- text: string (nullable = true)
| | |-- cite_spans: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- start: integer (nullable = true)
| | | | |-- end: integer (nullable = true)
| | | | |-- text: string (nullable = true)
| | | | |-- ref_id: string (nullable = true)
| | |-- ref_spans: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- start: integer (nullable = true)
| | | | |-- end: integer (nullable = true)
| | | | |-- text: string (nullable = true)
| | | | |-- ref_id: string (nullable = true)
| | |-- eq_spans: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- start: integer (nullable = true)
| | | | |-- end: integer (nullable = true)
| | | | |-- text: string (nullable = true)
| | | | |-- ref_id: string (nullable = true)
| | |-- section: string (nullable = true)
|-- source: string (nullable = false)
Code Step 4:#
def generate_cord19_schema():
author_fields = [
StructField("first", StringType()),
StructField("middle", ArrayType(StringType())),
StructField("last", StringType()),
StructField("suffix", StringType()),
]
authors_schema = ArrayType(
StructType(
author_fields
+ [
StructField(
"affiliation",
StructType(
[
StructField("laboratory", StringType()),
StructField("institution", StringType()),
StructField(
"location",
StructType(
[
StructField("addrLine", StringType()),
StructField("country", StringType()),
StructField("postBox", StringType()),
StructField("postCode", StringType()),
StructField("region", StringType()),
StructField("settlement", StringType()),
]
),
),
]
),
),
StructField("email", StringType()),
]
)
)
spans_schema = ArrayType(
StructType(
[
StructField("start", IntegerType()),
StructField("end", IntegerType()),
StructField("text", StringType()),
StructField("ref_id", StringType()),
]
)
)
section_schema = ArrayType(
StructType(
[
StructField("text", StringType()),
StructField("cite_spans", spans_schema),
StructField("ref_spans", spans_schema),
StructField("eq_spans", spans_schema),
StructField("section", StringType()),
]
)
)
bib_schema = MapType(
StringType(),
StructType(
[
StructField("ref_id", StringType()),
StructField("title", StringType()),
StructField("authors", ArrayType(StructType(author_fields))),
StructField("year", IntegerType()),
StructField("venue", StringType()),
StructField("volume", StringType()),
StructField("issn", StringType()),
StructField("pages", StringType()),
StructField(
"other_ids",
StructType([StructField("DOI", ArrayType(StringType()))]),
),
]
),
True,
)
ref_schema = MapType(
StringType(),
StructType(
[
StructField("text", StringType()),
StructField("latex", StringType()),
StructField("type", StringType()),
]
),
)
return StructType(
[
StructField("paper_id", StringType()),
StructField(
"metadata",
StructType(
[
StructField("title", StringType()),
StructField("authors", authors_schema),
]
),
True,
),
StructField("abstract", section_schema),
StructField("body_text", section_schema),
StructField("bib_entries", bib_schema),
StructField("ref_entries", ref_schema),
StructField("back_matter", section_schema),
]
)
5. Extract the data in a single Dataframe by accomodating the information in the pre-defined schema#
In this step, the function extract_dataframe(spark) is defined. This function reads each single JSON document of the dataset, and accomodate the information in the schema previously defined. The information are then joined in a single dataframe object.
Code Step 5:#
def extract_dataframe(spark):
base = "s3a://source/data"
sources = [
"document_parses"
]
sub_sources = [
"pmc_json",
"pdf_json",
]
dataframe = None
for source in sources:
for sub_source in sub_sources:
path = f"{base}/{source}/{sub_source}"
df = (
spark.read.json(path, schema=generate_cord19_schema(), multiLine=True)
.withColumn("source", lit(source))
)
if not dataframe:
dataframe = df
else:
dataframe = dataframe.union(df)
return dataframe
The dataframe df is defined by calling the function extract_dataframe. Please note that spark is passed as argument of the function: this means that the processing of the data into a unique data frame is done by the Spark instance.
Code Step 5:#
df = extract_dataframe(spark)
A local temporary view of the dataframe is created. This view will be used to query the data through the pyspark SQL library.
Code Step 5:#
df.createOrReplaceTempView("cord19")
6. Query the data#
In this step, 3 queries are performed through the pyspark SQL library:
- How many papers for each source?
- Which author has written the most papers?
- Which are the abstracts for the reported papers?
The results of the query are directly showed on the screen.
6.1 How many papers for each source?
Code Step 6.1:#
query = """
SELECT
source,
COUNT(DISTINCT paper_id)
FROM
cord19
GROUP BY
source
"""
spark.sql(query).show()
Output:
+---------------+------------------------+
| source|count(DISTINCT paper_id)|
+---------------+------------------------+
|document_parses| 10|
+---------------+------------------------+
6.2 Which author has written the most papers?
Code Step 6.2:#
query = """
WITH authors AS (
SELECT
paper_id,
author.*
FROM
cord19
LATERAL VIEW
explode(metadata.authors) AS author
)
SELECT
first,
last,
COUNT(DISTINCT paper_id) as n_papers
FROM
authors
GROUP BY
first,
last
ORDER BY
n_papers DESC
"""
spark.sql(query).show(n=5)
Output:
+------+---------+--------+
| first| last|n_papers|
+------+---------+--------+
|Xavier| Rossello| 1|
| Nora| Watson| 1|
| José| Ramó| 1|
| H| Nauwynck| 1|
|Rafael|Romaguera| 1|
+------+---------+--------+
6.3 Which are the abstracts for the reported papers?
Code Step 6.3:#
query = """
WITH abstract AS (
SELECT
paper_id,
pos,
value.text as text
FROM
cord19
LATERAL VIEW
posexplode(abstract) AS pos, value
),
collected AS (
SELECT
paper_id,
collect_list(text) OVER (PARTITION BY paper_id ORDER BY pos) as sentences
FROM
abstract
),
sentences AS (
SELECT
paper_id,
max(sentences) as sentences
FROM
collected
GROUP BY
paper_id
)
SELECT
paper_id,
array_join(sentences, " ") as abstract,
-- make sure the regex is being escaped properly
size(split(array_join(sentences, " "), "\\\s+")) as words
FROM
sentences
"""
spark.sql(query).show(n=5)
Output:
+--------------------+--------------------+-----+
| paper_id| abstract|words|
+--------------------+--------------------+-----+
|0000b6da665726420...|Objective: An at ...| 263|
|0000fcce604204b1b...|Contribución de l...| 406|
|000122a9a774ec76f...|Introduction and ...| 286|
|00013062c83cef3b8...|Systems serology ...| 173|
|00013694fb8095bb8...|Prolonged Covid-1...| 200|
+--------------------+--------------------+-----+