library(arrow)
library(dplyr)Data Engineering with Arrow Exercises
Schemas
seattle_csv <- open_dataset(here::here("data/seattle-library-checkouts.csv"),
format = "csv"
)The first few thousand rows of
ISBNare blank in the Seattle Checkouts CSV file. Read in the Seattle Checkouts CSV file withopen_dataset()and ensure the correct data type forISBNis<string>instead of the<null>interpreted by Arrow.Once you have a
Datasetobject with the metadata you are after, count the number ofCheckoutsbyCheckoutYearand arrange the result byCheckoutYear.
seattle_csv <- open_dataset(here::here("data/seattle-library-checkouts.csv"),
format = "csv",
skip = 1,
schema(
UsageClass = utf8(),
CheckoutType = utf8(),
MaterialType = utf8(),
CheckoutYear = int64(),
CheckoutMonth = int64(),
Checkouts = int64(),
Title = utf8(),
ISBN = string(),
Creator = utf8(),
Subjects = utf8(),
Publisher = utf8(),
PublicationYear = utf8()
)
)or
seattle_csv <- open_dataset(here::here("data/seattle-library-checkouts.csv"),
format = "csv",
skip = 1,
schema(
UsageClass = utf8(),
CheckoutType = utf8(),
MaterialType = utf8(),
CheckoutYear = int64(),
CheckoutMonth = int64(),
Checkouts = int64(),
Title = utf8(),
ISBN = utf8(),
Creator = utf8(),
Subjects = utf8(),
Publisher = utf8(),
PublicationYear = utf8()
)
)The number of Checkouts by CheckoutYear arranged by CheckoutYear:
seattle_csv |>
group_by(CheckoutYear) |>
summarise(sum(Checkouts)) |>
arrange(CheckoutYear) |>
collect()# A tibble: 18 Γ 2
CheckoutYear `sum(Checkouts)`
<int> <int>
1 2005 3798685
2 2006 6599318
3 2007 7126627
4 2008 8438486
5 2009 9135167
6 2010 8608966
7 2011 8321732
8 2012 8163046
9 2013 9057096
10 2014 9136081
11 2015 9084179
12 2016 9021051
13 2017 9231648
14 2018 9149176
15 2019 9199083
16 2020 6053717
17 2021 7361031
18 2022 7001989
or
seattle_csv |>
count(CheckoutYear, wt = Checkouts) |>
arrange(CheckoutYear) |>
collect()# A tibble: 18 Γ 2
CheckoutYear n
<int> <int>
1 2005 3798685
2 2006 6599318
3 2007 7126627
4 2008 8438486
5 2009 9135167
6 2010 8608966
7 2011 8321732
8 2012 8163046
9 2013 9057096
10 2014 9136081
11 2015 9084179
12 2016 9021051
13 2017 9231648
14 2018 9149176
15 2019 9199083
16 2020 6053717
17 2021 7361031
18 2022 7001989
Timing the query:
seattle_csv |>
group_by(CheckoutYear) |>
summarise(sum(Checkouts)) |>
arrange(CheckoutYear) |>
collect() |>
system.time() user system elapsed
10.853 1.198 10.561
Querying 42 million rows of data stored in a CSV on disk in ~10 seconds, not too bad.
Parquet
seattle_parquet <- here::here("data/seattle-library-checkouts-parquet")
seattle_csv |>
write_dataset(path = seattle_parquet,
format = "parquet")- Re-run the query counting the number of
CheckoutsbyCheckoutYearand arranging the result byCheckoutYear, this time using the Seattle Checkout data saved to disk as a single, Parquet file. Did you notice a difference in compute time?
seattle_parquet <- here::here("data/seattle-library-checkouts-parquet")
open_dataset(seattle_parquet,
format = "parquet") |>
group_by(CheckoutYear) |>
summarise(sum(Checkouts)) |>
arrange(CheckoutYear) |>
collect() |>
system.time() user system elapsed
2.238 0.452 0.696
A much faster compute time for the query when the on-disk data is stored in the Parquet format.
Partitioning
seattle_parquet_part <- here::here("data/seattle-library-checkouts")
seattle_csv |>
group_by(CheckoutYear) |>
write_dataset(path = seattle_parquet_part,
format = "parquet")Letβs write the Seattle Checkout CSV data to a multi-file dataset just one more time! This time, write the data partitioned by
CheckoutTypeas Parquet files.Now compare the compute time between our Parquet data partitioned by
CheckoutYearand our Parquet data partitioned byCheckoutTypewith a query of the total number of checkouts in September of 2019. Did you find a difference in compute time?
Writing the data:
seattle_checkouttype <- here::here("data/seattle-library-checkouts-type")
seattle_csv |>
group_by(CheckoutType) |>
write_dataset(path = seattle_checkouttype,
format = "parquet")Total number of Checkouts in September of 2019 using partitioned Parquet data by CheckoutType:
open_dataset(here::here("data/seattle-library-checkouts-type")) |>
filter(CheckoutYear == 2019, CheckoutMonth == 9) |>
summarise(TotalCheckouts = sum(Checkouts)) |>
collect() |>
system.time() user system elapsed
0.907 0.087 0.333
Total number of Checkouts in September of 2019 using partitioned Parquet data by CheckoutYear and CheckoutMonth:
open_dataset(here::here("data/seattle-library-checkouts")) |>
filter(CheckoutYear == 2019, CheckoutMonth == 9) |>
summarise(TotalCheckouts = sum(Checkouts)) |>
collect() |>
system.time() user system elapsed
0.039 0.006 0.032
Faster compute time because the filter() call is based on the partitions.