Asynchronous Processing of LEGO Model Prediction

Access Instructions

The project used for this particular exercise is hosted on Posit Cloud in this space. The project for this exercise is called Performance Code-along 2.

Requirements

The current version of our Shiny application contains a module for generating predictions of the number of LEGO parts in a set using the number of unique colors and number of unique part categories. The API is executed and processed using the {httr2} package. Here is the function wrapping the API execution:

#' @importFrom httr2 request req_body_json req_perform resp_body_json
run_prediction <- function(df, endpoint_url, back_transform = TRUE, round_result = TRUE) {
  # create request object
  req <- request(endpoint_url)

  # perform request
  resp <- req |>
    req_body_json(df) |>
    req_perform()

  # extract predictions from response
  pred_values <- resp_body_json(resp)$.pred |> unlist()

  # back-transform log10 value of predicted number of parts if requested
  if (back_transform) {
    pred_values <- 10 ^ pred_values
  }

  # round result up to nearest integer if requested
  if (round_result) pred_values <- ceiling(pred_values)

  # append predictions to supplied data frame
  dplyr::mutate(df, predicted_num_parts = pred_values)
}

Unfortunately, the prediction API call takes a bit of time to execute due to some extremely sophisticated processing 😅. As a result, any interactions within the application will not be processed until the prediction call completes. Our goal is to convert the prediction processing from synchronous to asynchronous using {crew}

Plan

  1. Establish reactive values for tracking the status of the prediction calls
  2. Create a new controller to launch new R processes when new prediction tasks are launched
  3. Modify the existing observeEvent to push the prediction task to the controller, ensuring the key objects and required packages are passed on to the controller.
  4. Create a poll that’s invalidated every 100 milliseconds to query the status of the submitted tasks in the controller and update the prediction result reactive value when complete.

Solution

First we create the following reactiveVal objects to keep track of the prediction state:

pred_status <- reactiveVal("No prediction submitted yet.")
pred_poll <- reactiveVal(FALSE)

Next we set up a new controller:

# establish async processing with crew
controller <- crew_controller_local(workers = 4, seconds_idle = 10)
controller$start()

# make sure to terminate the controller on stop #NEW
onStop(function() controller$terminate())

Inside the observeEvent for the user clicking the prediction button, we update the logic to push the prediction task to the controller:

controller$push(
  command = run_prediction(df),
  data = list(
    run_prediction = run_prediction,
    df = pred_data_rv$data
  ),
  packages = c("httr2", "dplyr")
)

pred_poll(TRUE)

Lastly, we create a new observe block that periodically checks whether the running {crew} tasks have completed, ensuring that this is only executed when a prediction has been launched:

observe({
  req(pred_poll())

  invalidateLater(millis = 100)
  result <- controller$pop()$result

  if (!is.null(result)) {
    pred_data_rv$data <- result[[1]]
    print(controller$summary()) 
  }

  if (isFALSE(controller$nonempty())) {
    pred_status("Prediction Complete")
    pred_poll(controller$nonempty())
    removeNotification(id = "pred_message")
  }
})