Data Engineering with Arrow

Data Engineering



https://en.wikipedia.org/wiki/Data_engineering

.NORM Files


https://xkcd.com/2116/

Poll: Formats


Which file formats do you use most often?

  • 1️⃣ CSV (.csv)
  • 2️⃣ MS Excel (.xls and .xlsx)
  • 3️⃣ Parquet (.parquet)
  • 4️⃣ Something else

Arrow & File Formats

Seattle
Checkouts
Big CSV

https://data.seattle.gov/Community/Checkouts-by-Title/tmmm-ytt6

arrow::open_dataset() with a CSV

library(arrow)
library(dplyr)

seattle_csv <- open_dataset(here::here("data/seattle-library-checkouts.csv"),
               format = "csv")

seattle_csv
FileSystemDataset with 1 csv file
UsageClass: string
CheckoutType: string
MaterialType: string
CheckoutYear: int64
CheckoutMonth: int64
Checkouts: int64
Title: string
ISBN: null
Creator: string
Subjects: string
Publisher: string
PublicationYear: string

arrow::schema()

Create a schema or extract one from an object.


Let’s extract the schema:

schema(seattle_csv)
Schema
UsageClass: string
CheckoutType: string
MaterialType: string
CheckoutYear: int64
CheckoutMonth: int64
Checkouts: int64
Title: string
ISBN: null
Creator: string
Subjects: string
Publisher: string
PublicationYear: string

Arrow Data Types

Arrow has a rich data type system, including direct analogs of many R data types

  • <dbl> == <double>
  • <chr> == <string> or <utf8>
  • <int> == <int32>


https://arrow.apache.org/docs/r/articles/data_types.html

Parsing the Metadata


Arrow scans 👀 a few thousand rows of the file(s) to impute or “guess” the data types

📚 arrow vs readr blog post: https://thisisnic.github.io/2022/11/21/type-inference-in-readr-and-arrow/

Parsers Are Not Always Right

schema(seattle_csv)
Schema
UsageClass: string
CheckoutType: string
MaterialType: string
CheckoutYear: int64
CheckoutMonth: int64
Checkouts: int64
Title: string
ISBN: null
Creator: string
Subjects: string
Publisher: string
PublicationYear: string

Let’s Control the Schema


Creating a schema manually:

schema(
  UsageClass = utf8(),
  CheckoutType = utf8(),
  MaterialType = utf8(),
  ...
)


This will take a lot of typing with 12 columns 😢

Let’s Control the Schema


seattle_csv$schema$code() 
schema(UsageClass = utf8(), CheckoutType = utf8(), MaterialType = utf8(), 
    CheckoutYear = int64(), CheckoutMonth = int64(), Checkouts = int64(), 
    Title = utf8(), ISBN = null(), Creator = utf8(), Subjects = utf8(), 
    Publisher = utf8(), PublicationYear = utf8())


🤩

Let’s Control the Schema

seattle_csv <- open_dataset(
  sources = here::here("data/seattle-library-checkouts.csv"),
  format = "csv",
  skip = 1,
  schema = schema(
    UsageClass = utf8(),
    CheckoutType = utf8(),
    MaterialType = utf8(),
    CheckoutYear = int64(),
    CheckoutMonth = int64(),
    Checkouts = int64(),
    Title = utf8(),
    ISBN = string(), #utf8()
    Creator = utf8(),
    Subjects = utf8(),
    Publisher = utf8(),
    PublicationYear = utf8()
  )
)
seattle_csv
FileSystemDataset with 1 csv file
UsageClass: string
CheckoutType: string
MaterialType: string
CheckoutYear: int64
CheckoutMonth: int64
Checkouts: int64
Title: string
ISBN: string
Creator: string
Subjects: string
Publisher: string
PublicationYear: string

Your Turn

  1. The first few thousand rows of ISBN are blank in the Seattle Checkouts CSV file. Read in the Seattle Checkouts CSV file with open_dataset() and ensure the correct data type for ISBN is <string> instead of the <null> interpreted by Arrow.

  2. Once you have a Dataset object with the metadata you are after, count the number of Checkouts by CheckoutYear and arrange the result by CheckoutYear.

➡️ Data Storage Engineering Exercises Page

9GB CSV file + arrow + dplyr

seattle_csv |>
  group_by(CheckoutYear) |>
  summarise(sum(Checkouts)) |>
  arrange(CheckoutYear) |> 
  collect()
# A tibble: 18 × 2
   CheckoutYear `sum(Checkouts)`
          <int>            <int>
 1         2005          3798685
 2         2006          6599318
 3         2007          7126627
 4         2008          8438486
 5         2009          9135167
 6         2010          8608966
 7         2011          8321732
 8         2012          8163046
 9         2013          9057096
10         2014          9136081
11         2015          9084179
12         2016          9021051
13         2017          9231648
14         2018          9149176
15         2019          9199083
16         2020          6053717
17         2021          7361031
18         2022          7001989

9GB CSV file + arrow + dplyr

seattle_csv |>
  group_by(CheckoutYear) |>
  summarise(sum(Checkouts)) |>
  arrange(CheckoutYear) |> 
  collect() |>
  system.time()
   user  system elapsed 
 11.218   1.291  11.001 

42 million rows – not bad, but could be faster….

File Format: Apache Parquet

https://parquet.apache.org/

Parquet

  • usually smaller than equivalent CSV file
  • rich type system & stores the data type along with the data
  • “column-oriented” == better performance over CSV’s row-by-row
  • “row-chunked” == work on different parts of the file at the same time or skip some chunks all together

Parquet Files: “row-chunked”

Parquet Files: “row-chunked & column-oriented”

Writing to Parquet

seattle_parquet <- here::here("data/seattle-library-checkouts-parquet")

seattle_csv |>
  write_dataset(path = seattle_parquet,
                format = "parquet")

Storage: Parquet vs CSV

file <- list.files(seattle_parquet)
file.size(file.path(seattle_parquet, file)) / 10**9
[1] 4.424267


Parquet about half the size of the CSV file on-disk 💾

Your Turn

  1. Re-run the query counting the number of Checkouts by CheckoutYear and arranging the result by CheckoutYear, this time using the Seattle Checkout data saved to disk as a single, Parquet file. Did you notice a difference in compute time?

➡️ Data Storage Engineering Exercises Page

4.5GB Parquet file + arrow + dplyr

open_dataset(seattle_parquet, 
             format = "parquet") |>
  group_by(CheckoutYear) |>
  summarise(sum(Checkouts)) |>
  arrange(CheckoutYear) |> 
  collect() |>
  system.time()
   user  system elapsed 
  2.034   0.408   0.654 

42 million rows – much better! But could be even faster….

File Storage:
Partitioning


Dividing data into smaller pieces, making it more easily accessible and manageable

Poll: Partitioning?

Have you partitioned your data or used partitioned data before today?

  • 1️⃣ Yes
  • 2️⃣ No
  • 3️⃣ Not sure, the data engineers sort that out!

Art & Science of Partitioning


  • avoid files < 20MB and > 2GB
  • avoid > 10,000 files (🤯)
  • partition on variables used in filter()

Rewriting the Data Again

seattle_parquet_part <- here::here("data/seattle-library-checkouts")

seattle_csv |>
  group_by(CheckoutYear) |>
  write_dataset(path = seattle_parquet_part,
                format = "parquet")

What Did We “Engineer”?

seattle_parquet_part <- here::here("data/seattle-library-checkouts")

sizes <- tibble(
  files = list.files(seattle_parquet_part, recursive = TRUE),
  size_GB = file.size(file.path(seattle_parquet_part, files)) / 10**9
)

sizes
# A tibble: 18 × 2
   files                            size_GB
   <chr>                              <dbl>
 1 CheckoutYear=2005/part-0.parquet   0.115
 2 CheckoutYear=2006/part-0.parquet   0.172
 3 CheckoutYear=2007/part-0.parquet   0.186
 4 CheckoutYear=2008/part-0.parquet   0.204
 5 CheckoutYear=2009/part-0.parquet   0.224
 6 CheckoutYear=2010/part-0.parquet   0.233
 7 CheckoutYear=2011/part-0.parquet   0.250
 8 CheckoutYear=2012/part-0.parquet   0.261
 9 CheckoutYear=2013/part-0.parquet   0.282
10 CheckoutYear=2014/part-0.parquet   0.296
11 CheckoutYear=2015/part-0.parquet   0.308
12 CheckoutYear=2016/part-0.parquet   0.315
13 CheckoutYear=2017/part-0.parquet   0.319
14 CheckoutYear=2018/part-0.parquet   0.306
15 CheckoutYear=2019/part-0.parquet   0.303
16 CheckoutYear=2020/part-0.parquet   0.158
17 CheckoutYear=2021/part-0.parquet   0.240
18 CheckoutYear=2022/part-0.parquet   0.252

4.5GB partitioned Parquet files + arrow + dplyr

seattle_parquet_part <- here::here("data/seattle-library-checkouts")

open_dataset(seattle_parquet_part,
             format = "parquet") |>
  group_by(CheckoutYear) |>
  summarise(sum(Checkouts)) |>
  arrange(CheckoutYear) |> 
  collect() |>
  system.time()
   user  system elapsed 
  1.928   0.405   0.391 


42 million rows – not too shabby!

Your Turn

  1. Let’s write the Seattle Checkout CSV data to a multi-file dataset just one more time! This time, write the data partitioned by CheckoutType as Parquet files.

  2. Now compare the compute time between our Parquet data partitioned by CheckoutYear and our Parquet data partitioned by CheckoutType with a query of the total number of checkouts in September of 2019. Did you find a difference in compute time?

➡️ Data Storage Engineering Exercises Page

Partition Design

  • Partitioning on variables commonly used in filter() often faster
  • Number of partitions also important (Arrow reads the metadata of each file)

Performance Review: Single CSV

How long does it take to calculate the number of books checked out in each month of 2021?


open_dataset(
  sources = here::here("data/seattle-library-checkouts.csv"), 
  format = "csv"
) |> 
  filter(CheckoutYear == 2021, MaterialType == "BOOK") |>
  group_by(CheckoutMonth) |>
  summarize(TotalCheckouts = sum(Checkouts)) |>
  arrange(desc(CheckoutMonth)) |>
  collect() |>
  system.time()
   user  system elapsed 
 12.414   1.286  11.848 

Performance Review: Partitioned Parquet

How long does it take to calculate the number of books checked out in each month of 2021?


open_dataset(here::here("data/seattle-library-checkouts"),
             format = "parquet") |> 
  filter(CheckoutYear == 2021, MaterialType == "BOOK") |>
  group_by(CheckoutMonth) |>
  summarize(TotalCheckouts = sum(Checkouts)) |>
  arrange(desc(CheckoutMonth)) |>
  collect() |> 
  system.time()
   user  system elapsed 
  0.272   0.040   0.077 

Engineering Data Tips for Improved Storage & Performance


  • consider “column-oriented” file formats like Parquet
  • consider partitioning, experiment to get an appropriate partition design 🗂️
  • watch your schemas 👀

R for Data Science (2e)