Data Manipulation—Part 2

What if a function binding doesn’t exist - revisited!

  • Option 1 - find a workaround
  • Option 2 - user-defined functions (UDFs)

Why use a UDF?

Implement your own custom functions!

time_diff_minutes <- function(pickup, dropoff){
  difftime(dropoff, pickup, units = "mins") |>
      round() |>
      as.integer()
}

nyc_taxi |>
  mutate(
    duration_minutes = time_diff_minutes(pickup_datetime, dropoff_datetime)
  ) |> 
  select(pickup_datetime, dropoff_datetime, duration_minutes) |>
  head() |>
  collect()
Error: Expression time_diff_minutes(pickup_datetime, dropoff_datetime) not supported in Arrow
Call collect() first to pull data into R.

We get an error as we can’t automatically convert the function to arrow.

User-defined functions (aka UDFs)

  • Define your own functions
  • Scalar functions - 1 row input and 1 row output

User-defined functions - definition

register_scalar_function(
  name = "time_diff_minutes",
  function(context, pickup, dropoff) {
    difftime(dropoff, pickup, units = "mins") |>
      round() |>
      as.integer()
  },
  in_type = schema(
    pickup = timestamp(unit = "ms"),
    dropoff = timestamp(unit = "ms")
  ),
  out_type = int32(),
  auto_convert = TRUE
)

This looks complicated, so let’s look at it 1 part at a time!

User-defined functions - definition

Step 1. Give the function a name

register_scalar_function(
  name = "time_diff_minutes",
  function(context, pickup, dropoff) {
    difftime(dropoff, pickup, units = "mins") |>
      round() |>
      as.integer()
  },
  in_type = schema(
    pickup = timestamp(unit = "ms"),
    dropoff = timestamp(unit = "ms")
  ),
  out_type = int32(),
  auto_convert = TRUE
)

User-defined functions - definition

Step 2. Define the body of the function - first argument must be context

register_scalar_function(
  name = "time_diff_minutes",
  function(context, pickup, dropoff) {
    difftime(dropoff, pickup, units = "mins") |>
      round() |>
      as.integer()
  },
  in_type = schema(
    pickup = timestamp(unit = "ms"),
    dropoff = timestamp(unit = "ms")
  ),
  out_type = int32(),
  auto_convert = TRUE
)

User-defined functions - definition

Step 3. Set the schema of the input arguments

register_scalar_function(
  name = "time_diff_minutes",
  function(context, pickup, dropoff) {
    difftime(dropoff, pickup, units = "mins") |>
      round() |>
      as.integer()
  },
  in_type = schema(
    pickup = timestamp(unit = "ms"),
    dropoff = timestamp(unit = "ms")
  ),
  out_type = int32(),
  auto_convert = TRUE
)

User-defined functions - definition

Step 4. Set the data type of the output

register_scalar_function(
  name = "time_diff_minutes",
  function(context, pickup, dropoff) {
    difftime(dropoff, pickup, units = "mins") |>
      round() |>
      as.integer()
  },
  in_type = schema(
    pickup = timestamp(unit = "ms"),
    dropoff = timestamp(unit = "ms")
  ),
  out_type = int32(),
  auto_convert = TRUE
)

User-defined functions - definition

Step 5. Set auto_convert = TRUE if using in a dplyr pipeline

register_scalar_function(
  name = "time_diff_minutes",
  function(context, pickup, dropoff) {
    difftime(dropoff, pickup, units = "mins") |>
      round() |>
      as.integer()
  },
  in_type = schema(
    pickup = timestamp(unit = "ms"),
    dropoff = timestamp(unit = "ms")
  ),
  out_type = int32(),
  auto_convert = TRUE
)

User-defined functions - usage

nyc_taxi |>
  mutate(
    duration_minutes = time_diff_minutes(pickup_datetime, dropoff_datetime)
  ) |>
  select(pickup_datetime, dropoff_datetime, duration_minutes) |>
  head() |>
  collect()
# A tibble: 6 × 3
  pickup_datetime     dropoff_datetime    duration_minutes
  <dttm>              <dttm>                         <int>
1 2012-11-02 19:47:00 2012-11-02 20:16:00               29
2 2012-11-02 19:47:07 2012-11-02 19:53:32                6
3 2012-11-02 19:47:13 2012-11-02 19:53:31                6
4 2012-11-02 19:47:35 2012-11-02 19:52:40                5
5 2012-11-02 19:47:51 2012-11-02 20:00:19               12
6 2012-11-02 19:48:00 2012-11-02 19:51:00                3

Your Turn

  1. Write a user-defined function which wraps the stringr function str_replace_na(), and use it to replace any NA values in the vendor_name column with the string “No vendor” instead. (Test it on the data from 2019 so you’re not pulling everything into memory)

➡️ Data Manipulation Part II Exercises Page

Summary

  • You can use UDFs to create your own bindings when they don’t exist
  • UDFs must be scalar (1 row in -> 1 row out) and stateless (no knowledge of other rows of data)
  • Calculations done by R not Arrow, so slower than in-built bindings but still pretty fast

Joins

Joins

Joining a reference table

vendors <- tibble::tibble(
  code = c("VTS", "CMT", "DDS"),
  full_name = c(
    "Verifone Transportation Systems",
    "Creative Mobile Technologies",
    "Digital Dispatch Systems"
  )
)

nyc_taxi |>
  left_join(vendors, by = c("vendor_name" = "code")) |>
  select(vendor_name, full_name, pickup_datetime) |>
  head(3) |>
  collect()
# A tibble: 3 × 3
  vendor_name full_name                    pickup_datetime    
  <chr>       <chr>                        <dttm>             
1 CMT         Creative Mobile Technologies 2012-01-20 08:09:36
2 CMT         Creative Mobile Technologies 2012-01-20 08:54:10
3 CMT         Creative Mobile Technologies 2012-01-20 02:08:01

Traps for the unwary

Question: which are the most common borough-to-borough journeys in the dataset?

nyc_taxi_zones <- 
  read_csv_arrow(here::here("data/taxi_zone_lookup.csv")) |>
  select(location_id = LocationID,
         borough = Borough)

nyc_taxi_zones
# A tibble: 265 × 2
   location_id borough      
         <int> <chr>        
 1           1 EWR          
 2           2 Queens       
 3           3 Bronx        
 4           4 Manhattan    
 5           5 Staten Island
 6           6 Staten Island
 7           7 Queens       
 8           8 Queens       
 9           9 Queens       
10          10 Queens       
# ℹ 255 more rows

Why didn’t this work?

nyc_taxi |>
  left_join(nyc_taxi_zones, by = c("pickup_location_id" = "location_id")) |>
  collect()
Error in `compute.arrow_dplyr_query()`:
! Invalid: Incompatible data types for corresponding join field keys: FieldRef.Name(pickup_location_id) of type int64 and FieldRef.Name(location_id) of type int32

Schema for the nyc_taxi Dataset

schema(nyc_taxi)
Schema
vendor_name: string
pickup_datetime: timestamp[ms]
dropoff_datetime: timestamp[ms]
passenger_count: int64
trip_distance: double
pickup_longitude: double
pickup_latitude: double
rate_code: string
store_and_fwd: string
dropoff_longitude: double
dropoff_latitude: double
payment_type: string
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
total_amount: double
improvement_surcharge: double
congestion_surcharge: double
pickup_location_id: int64
dropoff_location_id: int64
year: int32
month: int32

Schema for the nyc_taxi_zones Table

nyc_taxi_zones_arrow <- arrow_table(nyc_taxi_zones)
schema(nyc_taxi_zones_arrow)
Schema
location_id: int32
borough: string
  • pickup_location_id is int64 in the nyc_taxi table
  • location_id is int32 in the nyc_taxi_zones table

Take control of the schema

nyc_taxi_zones_arrow <- arrow_table(
  nyc_taxi_zones, 
  schema = schema(location_id = int64(), borough = utf8())
)
  • schema() takes variable name / types as input
  • arrow has various “type” functions: int64(), utf8(), boolean(), date32() etc

Take control of the schema

nyc_taxi_zones_arrow <- arrow_table(
  nyc_taxi_zones, 
  schema = schema(location_id = int64(), borough = utf8())
)
schema(nyc_taxi_zones_arrow)
Schema
location_id: int64
borough: string

Prepare the auxiliary tables

pickup <- nyc_taxi_zones_arrow |>
  select(pickup_location_id = location_id,
         pickup_borough = borough)

dropoff <- nyc_taxi_zones_arrow |>
  select(dropoff_location_id = location_id,
         dropoff_borough = borough)
  • Join separately for the pickup and dropoff zones

Join and cross-tabulate

library(tictoc)

tic()
borough_counts <- nyc_taxi |> 
  left_join(pickup) |>
  left_join(dropoff) |>
  count(pickup_borough, dropoff_borough) |>
  arrange(desc(n)) |>
  collect()
toc()
131.403 sec elapsed


2-3 minutes to join twice and cross-tabulate on non-partition variables, with 1.15 billion rows of data 🙂

The results

borough_counts
# A tibble: 50 × 3
   pickup_borough dropoff_borough         n
   <chr>          <chr>               <int>
 1 <NA>           <NA>            732357953
 2 Manhattan      Manhattan       351198872
 3 Queens         Manhattan        14440705
 4 Manhattan      Queens           13052517
 5 Manhattan      Brooklyn         11180867
 6 Queens         Queens            7440356
 7 Unknown        Unknown           4491811
 8 Queens         Brooklyn          3662324
 9 Brooklyn       Brooklyn          3550480
10 Manhattan      Bronx             2071830
# ℹ 40 more rows

Your Turn

  1. How many taxi pickups were recorded in 2019 from the three major airports covered by the NYC Taxis data set (JFK, LaGuardia, Newark)? (Hint: you can use stringr::str_detect() to help you find pickup zones with the word “Airport” in them)

➡️ Data Manipulation Part II Exercises Page

Summary

  • You can join Arrow Tables and Datasets to R data frames and Arrow Tables
  • The Arrow data type of join keys must always match

Window functions

What are window functions?

  • calculations across a “window” of multiple rows which relate to the current row
  • e.g. row_number(), ntile(), or calling mutate() after group_by()

Grouped summaries

fare_by_year <- nyc_taxi |>
  filter(year > 2019) |>
  select(year, fare_amount)

fare_by_year |>
  group_by(year) |>
  summarise(mean_fare = mean(fare_amount)) |> 
  collect()
# A tibble: 2 × 2
   year mean_fare
  <int>     <dbl>
1  2020      12.7
2  2021      13.5

Window functions

fare_by_year |>
  group_by(year) |>
  mutate(mean_fare = mean(fare_amount)) |> 
  head() |> 
  collect()
Error: window functions not currently supported in Arrow
Call collect() first to pull data into R.

Window functions - via joins

fare_by_year |>
  left_join(
    fare_by_year |>
      group_by(year) |>
      summarise(mean_fare = mean(fare_amount))
  ) |> 
  arrange(desc(fare_amount)) |>
  head() |> 
  collect()
# A tibble: 6 × 3
   year fare_amount mean_fare
  <int>       <dbl>     <dbl>
1  2020     998310.      12.7
2  2021     818283.      13.5
3  2020     671100.      12.7
4  2020     429497.      12.7
5  2021     398466.      13.5
6  2020     398465.      12.7

Window functions - via duckdb

fare_by_year |>
  group_by(year) |>
  to_duckdb() |>
  mutate(mean_fare = mean(fare_amount)) |> 
  to_arrow() |>
  arrange(desc(fare_amount)) |>
  head() |>
  collect()
# A tibble: 6 × 3
   year fare_amount mean_fare
  <int>       <dbl>     <dbl>
1  2020     998310.      12.7
2  2021     818283.      13.5
3  2020     671100.      12.7
4  2020     429497.      12.7
5  2021     398466.      13.5
6  2020     398465.      12.7

Your Turn

  1. How many trips in September 2019 had a longer than average distance for that month?

➡️ Data Manipulation Part II Exercises Page

Summary

  • Window functions in arrow can be achieved via joins or passing data to and from duckdb