Skip to content

Use case code Scenario 4 Big data analytics

1. Import relevant libraries#

In this step, all the Python libraries that are needed to process the data are imported.

Code Step 1:#

import sys
import os
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql import functions as F
from pyspark.sql.types import (
    ArrayType,
    IntegerType,
    MapType,
    StringType,
    StructField,
    StructType,
)

2. Define MinIO config parameters#

To connect to the MinIO bucket which contains the data, it is necessary to input the followings in the next line of code:

  • url: this is the private address of the MinIO instance. This Jupyter notebook will try to connect to MinIO through the private endpoint, since both the Jupyter and MinIO instance are deployed in the same private network.
  • accessKey: this is the accessKey to connect to MinIO, as defined in the service account.
  • secretKey: this is the secretKey to connect to MinIO, as defined in the service account.

To retrieve these information, please refer to Step 3.3 of the use case guide.

Code Step 2:#

accessKey = "<input_your_accessKey_here>" 
secretKey = "<input_your_secretKey_here>"
url = "<input_your_url_here" 
connectionTimeOut = "30000"

3. Define Spark Session#

To connect to the Spark instance, which will process our dataset, we will need to make some configurations to make sure that the SparkSession is set up correctly. These configurations are meant to: - Connect the Jupyter Notebook to the Spark instance - Connect the Spark instance to MinIO

Here below a brief explanation of the relevant settings:

spark = SparkSession.builder \
                    .master("<input_your_spark_address_here") \ #1
                    .config("spark.submit.deployMode", "client") \ 
                    .config("spark.driver.host","<input_your_jupyter_hostname>") \ #2
                    .config("spark.driver.port", "9000")\
                    .config("spark.executor.memory","4GB") \ 
                    .config("spark.hadoop.fs.s3a.endpoint", url) \ #3
                    .config("spark.hadoop.fs.s3a.access.key", accessKey) \ #4
                    .config("spark.hadoop.fs.s3a.secret.key", secretKey)\ #5
                    .config("spark.hadoop.fs.s3a.connection.timeout", connectionTimeOut) \ 
                    .config("spark.hadoop.fs.s3a.path.style.access", "true")\
                    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\
                    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")\ 
                    .config("spark.sql.debug.maxToStringFields", "100000") \ 
                    .getOrCreate()
  1. Here the Spark master node address should be inputed. To retrieve this information, please refer to Step 4.3 of the use case script.
  2. This configuration will allow to make sure that Spark connects with Jupyter. To this extent, the Jupyter hostname should be inputed. To retrieve this information, please refer to Step 5.5 of the use case script.
  3. This configuration provides Spark the MinIO address (defined in the previous step) to read the data.
  4. This configuration provides Spark the MinIO the accessKey (defined in the previous step) to have access to the data.
  5. This configuration provides Spark the MinIO the secretKey (defined in the previous step) to have access to the data.

Code Step 3:#

spark = SparkSession.builder \
                    .master("<input_your_spark_address_here") \ 
                    .config("spark.submit.deployMode", "client") \ 
                    .config("spark.driver.host","<input_your_jupyter_pod_endpoint_here>.<input_your_dsl_here>.pod.cluster.local") \ 
                    .config("spark.driver.port", "9000")\
                    .config("spark.executor.memory","4GB") \ 
                    .config("spark.hadoop.fs.s3a.endpoint", url) \ 
                    .config("spark.hadoop.fs.s3a.access.key", accessKey) \ 
                    .config("spark.hadoop.fs.s3a.secret.key", secretKey)\ 
                    .config("spark.hadoop.fs.s3a.connection.timeout", connectionTimeOut) \ 
                    .config("spark.hadoop.fs.s3a.path.style.access", "true")\
                    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\
                    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")\ 
                    .config("spark.sql.debug.maxToStringFields", "100000") \ 
                    .getOrCreate() 

4. Pre-define the schema for the source data#

In this step, we define a function which generates the data schema forour dataset. This data schema will accomodate all the information of each document/paper present in the dataset. A view of the data schema is here reported:

root
 |-- paper_id: string (nullable = true)
 |-- metadata: struct (nullable = true)
 |    |-- title: string (nullable = true)
 |    |-- authors: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- first: string (nullable = true)
 |    |    |    |-- middle: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- last: string (nullable = true)
 |    |    |    |-- suffix: string (nullable = true)
 |    |    |    |-- affiliation: struct (nullable = true)
 |    |    |    |    |-- laboratory: string (nullable = true)
 |    |    |    |    |-- institution: string (nullable = true)
 |    |    |    |    |-- location: struct (nullable = true)
 |    |    |    |    |    |-- addrLine: string (nullable = true)
 |    |    |    |    |    |-- country: string (nullable = true)
 |    |    |    |    |    |-- postBox: string (nullable = true)
 |    |    |    |    |    |-- postCode: string (nullable = true)
 |    |    |    |    |    |-- region: string (nullable = true)
 |    |    |    |    |    |-- settlement: string (nullable = true)
 |    |    |    |-- email: string (nullable = true)
 |-- abstract: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- text: string (nullable = true)
 |    |    |-- cite_spans: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- start: integer (nullable = true)
 |    |    |    |    |-- end: integer (nullable = true)
 |    |    |    |    |-- text: string (nullable = true)
 |    |    |    |    |-- ref_id: string (nullable = true)
 |    |    |-- ref_spans: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- start: integer (nullable = true)
 |    |    |    |    |-- end: integer (nullable = true)
 |    |    |    |    |-- text: string (nullable = true)
 |    |    |    |    |-- ref_id: string (nullable = true)
 |    |    |-- eq_spans: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- start: integer (nullable = true)
 |    |    |    |    |-- end: integer (nullable = true)
 |    |    |    |    |-- text: string (nullable = true)
 |    |    |    |    |-- ref_id: string (nullable = true)
 |    |    |-- section: string (nullable = true)
 |-- body_text: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- text: string (nullable = true)
 |    |    |-- cite_spans: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- start: integer (nullable = true)
 |    |    |    |    |-- end: integer (nullable = true)
 |    |    |    |    |-- text: string (nullable = true)
 |    |    |    |    |-- ref_id: string (nullable = true)
 |    |    |-- ref_spans: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- start: integer (nullable = true)
 |    |    |    |    |-- end: integer (nullable = true)
 |    |    |    |    |-- text: string (nullable = true)
 |    |    |    |    |-- ref_id: string (nullable = true)
 |    |    |-- eq_spans: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- start: integer (nullable = true)
 |    |    |    |    |-- end: integer (nullable = true)
 |    |    |    |    |-- text: string (nullable = true)
 |    |    |    |    |-- ref_id: string (nullable = true)
 |    |    |-- section: string (nullable = true)
 |-- bib_entries: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- ref_id: string (nullable = true)
 |    |    |-- title: string (nullable = true)
 |    |    |-- authors: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- first: string (nullable = true)
 |    |    |    |    |-- middle: array (nullable = true)
 |    |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |    |-- last: string (nullable = true)
 |    |    |    |    |-- suffix: string (nullable = true)
 |    |    |-- year: integer (nullable = true)
 |    |    |-- venue: string (nullable = true)
 |    |    |-- volume: string (nullable = true)
 |    |    |-- issn: string (nullable = true)
 |    |    |-- pages: string (nullable = true)
 |    |    |-- other_ids: struct (nullable = true)
 |    |    |    |-- DOI: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |-- ref_entries: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- text: string (nullable = true)
 |    |    |-- latex: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- back_matter: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- text: string (nullable = true)
 |    |    |-- cite_spans: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- start: integer (nullable = true)
 |    |    |    |    |-- end: integer (nullable = true)
 |    |    |    |    |-- text: string (nullable = true)
 |    |    |    |    |-- ref_id: string (nullable = true)
 |    |    |-- ref_spans: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- start: integer (nullable = true)
 |    |    |    |    |-- end: integer (nullable = true)
 |    |    |    |    |-- text: string (nullable = true)
 |    |    |    |    |-- ref_id: string (nullable = true)
 |    |    |-- eq_spans: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- start: integer (nullable = true)
 |    |    |    |    |-- end: integer (nullable = true)
 |    |    |    |    |-- text: string (nullable = true)
 |    |    |    |    |-- ref_id: string (nullable = true)
 |    |    |-- section: string (nullable = true)
 |-- source: string (nullable = false)

Code Step 4:#

def generate_cord19_schema():

    author_fields = [
        StructField("first", StringType()),
        StructField("middle", ArrayType(StringType())),
        StructField("last", StringType()),
        StructField("suffix", StringType()),
    ]

    authors_schema = ArrayType(
        StructType(
            author_fields
            + [               
                StructField(
                    "affiliation",
                    StructType(
                        [
                            StructField("laboratory", StringType()),
                            StructField("institution", StringType()),
                            StructField(
                                "location",
                                StructType(
                                    [
                                        StructField("addrLine", StringType()),
                                        StructField("country", StringType()),
                                        StructField("postBox", StringType()),
                                        StructField("postCode", StringType()),
                                        StructField("region", StringType()),
                                        StructField("settlement", StringType()),
                                    ]
                                ),
                            ),
                        ]
                    ),
                ),
                StructField("email", StringType()),
            ]
        )
    )

    spans_schema = ArrayType(
        StructType(
            [
                StructField("start", IntegerType()),
                StructField("end", IntegerType()),
                StructField("text", StringType()),
                StructField("ref_id", StringType()),
            ]
        )
    )

    section_schema = ArrayType(
        StructType(
            [
                StructField("text", StringType()),
                StructField("cite_spans", spans_schema),  
                StructField("ref_spans", spans_schema),  
                StructField("eq_spans", spans_schema),
                StructField("section", StringType()),
            ]
        )
    )

    bib_schema = MapType(
        StringType(),
        StructType(
            [
                StructField("ref_id", StringType()),
                StructField("title", StringType()),
                StructField("authors", ArrayType(StructType(author_fields))),
                StructField("year", IntegerType()),
                StructField("venue", StringType()),
                StructField("volume", StringType()),
                StructField("issn", StringType()),
                StructField("pages", StringType()),
                StructField(
                    "other_ids",
                    StructType([StructField("DOI", ArrayType(StringType()))]),
                ),
            ]
        ),
        True,
    )

    ref_schema = MapType(
        StringType(),
        StructType(
            [
                StructField("text", StringType()),
                StructField("latex", StringType()),
                StructField("type", StringType()),
            ]
        ),
    )

    return StructType(
        [
            StructField("paper_id", StringType()),
            StructField(
                "metadata",
                StructType(
                    [
                        StructField("title", StringType()),
                        StructField("authors", authors_schema),
                    ]
                ),
                True,
            ),
            StructField("abstract", section_schema),
            StructField("body_text", section_schema),
            StructField("bib_entries", bib_schema),
            StructField("ref_entries", ref_schema),
            StructField("back_matter", section_schema),
        ]
    )

5. Extract the data in a single Dataframe by accomodating the information in the pre-defined schema#

In this step, the function extract_dataframe(spark) is defined. This function reads each single JSON document of the dataset, and accomodate the information in the schema previously defined. The information are then joined in a single dataframe object.

Code Step 5:#

def extract_dataframe(spark):

    base = "s3a://source/data"

    sources = [
        "document_parses"
    ]

    sub_sources = [
        "pmc_json",
        "pdf_json",
    ]

    dataframe = None

    for source in sources:
        for sub_source in sub_sources:
            path = f"{base}/{source}/{sub_source}"
            df = (
                spark.read.json(path, schema=generate_cord19_schema(), multiLine=True)
                .withColumn("source", lit(source))
            )
        if not dataframe:
            dataframe = df
        else:
            dataframe = dataframe.union(df)
    return dataframe

The dataframe df is defined by calling the function extract_dataframe. Please note that spark is passed as argument of the function: this means that the processing of the data into a unique data frame is done by the Spark instance.

Code Step 5:#

df = extract_dataframe(spark)

A local temporary view of the dataframe is created. This view will be used to query the data through the pyspark SQL library.

Code Step 5:#

df.createOrReplaceTempView("cord19")

6. Query the data#

In this step, 3 queries are performed through the pyspark SQL library:

  1. How many papers for each source?
  2. Which author has written the most papers?
  3. Which are the abstracts for the reported papers?

The results of the query are directly showed on the screen.

6.1 How many papers for each source?

Code Step 6.1:#

query = """
SELECT
    source,
    COUNT(DISTINCT paper_id)
FROM
    cord19
GROUP BY
    source
"""
spark.sql(query).show()
Output:
    +---------------+------------------------+
    |         source|count(DISTINCT paper_id)|
    +---------------+------------------------+
    |document_parses|                      10|
    +---------------+------------------------+
6.2 Which author has written the most papers?

Code Step 6.2:#

query = """
WITH authors AS (
    SELECT
        paper_id,
        author.*
    FROM
        cord19
    LATERAL VIEW
        explode(metadata.authors) AS author
)
SELECT
    first,
    last,
    COUNT(DISTINCT paper_id) as n_papers
FROM
    authors
GROUP BY
    first,
    last
ORDER BY
    n_papers DESC
"""

spark.sql(query).show(n=5)
Output:
    +------+---------+--------+
    | first|     last|n_papers|
    +------+---------+--------+
    |Xavier| Rossello|       1|
    |  Nora|   Watson|       1|
    |  José|     Ramó|       1|
    |     H| Nauwynck|       1|
    |Rafael|Romaguera|       1|
    +------+---------+--------+
6.3 Which are the abstracts for the reported papers?

Code Step 6.3:#

query = """
WITH abstract AS (
    SELECT
        paper_id,
        pos,
        value.text as text
    FROM
        cord19
    LATERAL VIEW
        posexplode(abstract) AS pos, value
),
collected AS (
    SELECT
        paper_id,
        collect_list(text) OVER (PARTITION BY paper_id ORDER BY pos) as sentences
    FROM
        abstract
),
sentences AS (
    SELECT
        paper_id,
        max(sentences) as sentences
    FROM
        collected
    GROUP BY
        paper_id
)
SELECT
    paper_id,
    array_join(sentences, " ") as abstract,
    -- make sure the regex is being escaped properly
    size(split(array_join(sentences, " "), "\\\s+")) as words
FROM
    sentences
"""

spark.sql(query).show(n=5)
 Output:
    +--------------------+--------------------+-----+
    |            paper_id|            abstract|words|
    +--------------------+--------------------+-----+
    |0000b6da665726420...|Objective: An at ...|  263|
    |0000fcce604204b1b...|Contribución de l...|  406|
    |000122a9a774ec76f...|Introduction and ...|  286|
    |00013062c83cef3b8...|Systems serology ...|  173|
    |00013694fb8095bb8...|Prolonged Covid-1...|  200|
    +--------------------+--------------------+-----+