Data Engineering with Arrow Exercises

Schemas

library(arrow)
library(dplyr)
seattle_csv <- open_dataset(here::here("data/seattle-library-checkouts.csv"),
  format = "csv"
)
Data Types & Controlling the Schema
  1. The first few thousand rows of ISBN are blank in the Seattle Checkouts CSV file. Read in the Seattle Checkouts CSV file with open_dataset() and ensure the correct data type for ISBN is <string> instead of the <null> interpreted by Arrow.

  2. Once you have a Dataset object with the metadata you are after, count the number of Checkouts by CheckoutYear and arrange the result by CheckoutYear.

seattle_csv <- open_dataset(here::here("data/seattle-library-checkouts.csv"),
  format = "csv",
  skip = 1,
  schema(
    UsageClass = utf8(),
    CheckoutType = utf8(),
    MaterialType = utf8(),
    CheckoutYear = int64(),
    CheckoutMonth = int64(),
    Checkouts = int64(),
    Title = utf8(),
    ISBN = string(),
    Creator = utf8(),
    Subjects = utf8(),
    Publisher = utf8(),
    PublicationYear = utf8()
  )
)

or

seattle_csv <- open_dataset(here::here("data/seattle-library-checkouts.csv"),
  format = "csv",
  skip = 1,
  schema(
    UsageClass = utf8(),
    CheckoutType = utf8(),
    MaterialType = utf8(),
    CheckoutYear = int64(),
    CheckoutMonth = int64(),
    Checkouts = int64(),
    Title = utf8(),
    ISBN = utf8(),
    Creator = utf8(),
    Subjects = utf8(),
    Publisher = utf8(),
    PublicationYear = utf8()
  )
)

The number of Checkouts by CheckoutYear arranged by CheckoutYear:

seattle_csv |>
  group_by(CheckoutYear) |>
  summarise(sum(Checkouts)) |>
  arrange(CheckoutYear) |> 
  collect()
# A tibble: 18 Γ— 2
   CheckoutYear `sum(Checkouts)`
          <int>            <int>
 1         2005          3798685
 2         2006          6599318
 3         2007          7126627
 4         2008          8438486
 5         2009          9135167
 6         2010          8608966
 7         2011          8321732
 8         2012          8163046
 9         2013          9057096
10         2014          9136081
11         2015          9084179
12         2016          9021051
13         2017          9231648
14         2018          9149176
15         2019          9199083
16         2020          6053717
17         2021          7361031
18         2022          7001989

or

seattle_csv |> 
  count(CheckoutYear, wt = Checkouts) |> 
  arrange(CheckoutYear) |> 
  collect()
# A tibble: 18 Γ— 2
   CheckoutYear       n
          <int>   <int>
 1         2005 3798685
 2         2006 6599318
 3         2007 7126627
 4         2008 8438486
 5         2009 9135167
 6         2010 8608966
 7         2011 8321732
 8         2012 8163046
 9         2013 9057096
10         2014 9136081
11         2015 9084179
12         2016 9021051
13         2017 9231648
14         2018 9149176
15         2019 9199083
16         2020 6053717
17         2021 7361031
18         2022 7001989

Timing the query:

seattle_csv |>
  group_by(CheckoutYear) |>
  summarise(sum(Checkouts)) |>
  arrange(CheckoutYear) |> 
  collect() |> 
  system.time()
   user  system elapsed 
 10.853   1.198  10.561 

Querying 42 million rows of data stored in a CSV on disk in ~10 seconds, not too bad.

Parquet

seattle_parquet <- here::here("data/seattle-library-checkouts-parquet")

seattle_csv |>
  write_dataset(path = seattle_parquet,
                format = "parquet")
Parquet
  1. Re-run the query counting the number of Checkouts by CheckoutYear and arranging the result by CheckoutYear, this time using the Seattle Checkout data saved to disk as a single, Parquet file. Did you notice a difference in compute time?
seattle_parquet <- here::here("data/seattle-library-checkouts-parquet")

open_dataset(seattle_parquet, 
             format = "parquet") |>
  group_by(CheckoutYear) |>
  summarise(sum(Checkouts)) |>
  arrange(CheckoutYear) |> 
  collect() |> 
  system.time()
   user  system elapsed 
  2.238   0.452   0.696 

A much faster compute time for the query when the on-disk data is stored in the Parquet format.

Partitioning

seattle_parquet_part <- here::here("data/seattle-library-checkouts")

seattle_csv |>
  group_by(CheckoutYear) |>
  write_dataset(path = seattle_parquet_part,
                format = "parquet")
Partitioning
  1. Let’s write the Seattle Checkout CSV data to a multi-file dataset just one more time! This time, write the data partitioned by CheckoutType as Parquet files.

  2. Now compare the compute time between our Parquet data partitioned by CheckoutYear and our Parquet data partitioned by CheckoutType with a query of the total number of checkouts in September of 2019. Did you find a difference in compute time?

Writing the data:

seattle_checkouttype <- here::here("data/seattle-library-checkouts-type")

seattle_csv |>
  group_by(CheckoutType) |>
  write_dataset(path = seattle_checkouttype,
                format = "parquet")

Total number of Checkouts in September of 2019 using partitioned Parquet data by CheckoutType:

open_dataset(here::here("data/seattle-library-checkouts-type")) |> 
  filter(CheckoutYear == 2019, CheckoutMonth == 9) |> 
  summarise(TotalCheckouts = sum(Checkouts)) |>
  collect() |> 
  system.time()
   user  system elapsed 
  0.907   0.087   0.333 

Total number of Checkouts in September of 2019 using partitioned Parquet data by CheckoutYear and CheckoutMonth:

open_dataset(here::here("data/seattle-library-checkouts")) |> 
  filter(CheckoutYear == 2019, CheckoutMonth == 9) |> 
  summarise(TotalCheckouts = sum(Checkouts)) |>
  collect() |> 
  system.time()
   user  system elapsed 
  0.039   0.006   0.032 

Faster compute time because the filter() call is based on the partitions.