07 Sep 2018, 23:36

How to fit 1000 Stan models for $1, fast.

Introduction

I’ve been dabbling with Stan for a school project. One of the difficulties with bayesian MCMC sampling is how computationally expensive it can be. When following the “Modern Statistical Workflow”, the most time consuming step is fitting the generative ensemble. With a non-trivial model, a single model fit can take minutes to fit. With 1000 draws from the prior predictive distribution, this can easily take hours -> days on a laptop.

My previous workflow would be to write models on a small instance (t3.xlarge), fit a few iterations to verify parameter captures, switch to huge instance with lots of CPU’s, fit the generative ensemble, save my outputs, switch back to the small instance, and repeat. On a c5.18xlarge, this could take 30 minutes to an hour at $3.06/hour. Between downtime, and the inefficiency of switching back and forth, this started to become very expensive on an academic budget (or maybe I’m just cheap).

Save money

In 2016, Amazon Web Services introduced their AWS Batch service but its adoption has not been very widespread IMO. Fitting the prior predictive model to verify that your model can capture parameters of your data is the exact scenario that Batch was built for. The major benefit of this service is instance creation is completely managed with a user-specified maximum vCPU limit and done at the lowest spot pricing. This means you get the benefits of concurrency and save money and time! As an example, I was able to fit 4000 models for $3.86 with each 1000 iterations taking 20-25 minutes with a 128 vCPU limit, each iteration taking 3 minutes to fit. Another smaller benefit, this frees my local machine to do analysis and development while I wait for batch jobs to complete.

This workflow could be considered an add-on to “Principled Bayesian Workflow” so it would be good to be familiar with that first.

Summary

  1. Use smaller EC2 instance as your “local machine” to run RStudio Server

  2. Create a docker container with RStan, AWS’s fetch_and_run.sh script and upload to AWS ECR

  3. Local machine:

    1. Generate simulations (R=1000)
    2. For each iteration: Create a zip file with the following

      • Parameters & data file (.Rdata): Simulated parameters and simulated data
      • Compiled Stan model (.rds)
      • stan_utility.R file for Stan warnings
      • Rscript (.Rscript) to run model fitting, calculate outputs (posterior z-score, shrinkage, and simulation based rank), check Stan warnings, and save outputs to /tmp/ file
      • Bash script (.sh) to download zip file, call Rscript, upload outputs back to S3
    3. Upload each zip file to S3, the %dopar% operator can speed this step up.

  4. AWS Lambda:

    • Watch S3 Bucket for new zip files
    • Lambda parses out S3 key to use parts of the S3 path as environment variables for Batch
    • Submits jobs to AWS Batch
  5. AWS Batch launches instances as it sees new jobs and kills instances once it no longer needs those resources

  6. Use local machine to download all results from S3 and aggregate them.

Useful references

  1. Tutorial on AWS Batch (fetch_and_run)
  2. Install Rstudio on Ubuntu
  3. AWS S3 R Library
  4. Principled Bayesian Workflow
  5. Discussion of Stan in Docker

All the code

Pre-requisites:

  1. Working knowledge of docker
  2. Terminal access to AWS development machine (I used a Ubuntu 16.04 t3.xlarge)
  3. RStudio setup (See ref #2)
  4. AWS Batch setup (See ref #1)

No gif section :(

This is not every step needed, but enough of the magic is here to be useful I hope.

  1. Pull ($ docker pull jw887c/rstan_aws_batch) or build docker image from this Dockerfile ($ docker build -t byu/rstan)

    FROM jrnold/rstan
    RUN apt-get update && apt-get install -y python python-pip curl unzip groff
    RUN apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
    RUN pip install awscli
    RUN Rscript -e "install.packages('devtools')"
    RUN Rscript -e "library('devtools');install_github('cloudyr/aws.s3')"
    ADD fetch_and_run.sh /usr/local/bin/fetch_and_run.sh
    ENTRYPOINT ["/usr/local/bin/fetch_and_run.sh"]
    
  2. Push docker image to ECR

    docker tag byu/rstan <aws_account_id>.dkr.ecr.us-east-1.amazonaws.com/rstan:latest
    docker push <aws_account_id>.dkr.ecr.us-east-1.amazonaws.com/rstan:latest
    
  3. Create AWS lambda function to watch bucket and parse S3 key for useful environmental variables. I wrote this in Python 3.6 with the in-line editor

    import json
    import urllib.parse
    import boto3
    
    print('Loading function')
    
    s3 = boto3.client('s3')
    batch = boto3.client('batch')
    
    
    def submit_batch_job(key, hash_value, result_key, stan_file, rscript_file):
        batch.submit_job(
            jobName=key.split('/')[-1].split('.')[0],
            jobQueue='job_queue',
            jobDefinition='fetch_and_run',
            containerOverrides={
                'command': ['myjob.sh'],
                'environment': [
                    {'name': 'BATCH_FILE_TYPE', 'value': 'zip'},
                    {'name': 'BATCH_FILE_S3_URL','value': 's3://<bucket-name>/' + key},
                    {'name': 'RESULT_KEY','value': result_key},
                    {'name': 'HASH_VALUE','value': hash_value},
                    {'name': 'STAN_FILE', 'value': stan_file},
                    {'name': 'RSCRIPT_FILE', 'value': rscript_file}
                    ]
            })
                
                
    def lambda_handler(event, context):
        # Get the object from the event and show its content type
        bucket = event['Records'][0]['s3']['bucket']['name']
        key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
        try:
            response = s3.get_object(Bucket=bucket, Key=key)
            stan_file = key.split("/")[-2]
            rscript_file = stan_file.split(".")[0] + ".R"
            hash_run_number = key.split("/")[-1].split(".")[0]
            hash_value = hash_run_number.split("_")[0]
            result_key = hash_run_number + ".Rdata"
            print(result_key)
            try:
                response = s3.get_object(
                    Bucket="<bucket-name>", 
                    Key="results/%s/%s/%s" % (stan_file, hash_value, result_key))
                print(response)
                print("result found!, no job submission")
            except:
                print("No result, submit job!")
                submit_batch_job(key, hash_value, result_key, stan_file, rscript_file)
            return key
        except Exception as e:
            print(e)
            print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
            raise e
    
  4. Create RScript to run Stan model fit and calculate necessary outputs

    #!/usr/bin/env Rscript
    args = commandArgs(trailingOnly=TRUE)
    library("rstan")
    rstan_options(auto_write = TRUE)
    options(mc.cores = parallel::detectCores())
    
    # Make sure args available
    if (length(args)==0) {
      stop("At least one argument must be supplied (input file).zip", call.=FALSE)
    }
    
    # AWS S3 file and keys
    bucket_name      <- "<bucket-name>"
    source_file_name <- tail(strsplit(args[1], "/")[[1]], n=1)
    hash_run_number  <- strsplit(source_file_name, "[.]")[[1]][1]
    hash             <- strsplit(hash_run_number, "_")[[1]][1]
    run_number       <- strsplit(hash_run_number, "_")[[1]][2]
    new_results      <- paste("results/", hash, "/", hash_run_number, ".Rdata", sep="")
    
    # Run STAN
    load("simu_data.Rdata")
    fit <- stan(file = args[2], data = simu_data, seed=4938483)
    print(fit, digits = 1)
    
    # Run diagnostics
    util <- new.env()
    source('stan_utility.R', local=util)
    
    warning_code <- util$check_all_diagnostics(fit, quiet=TRUE)
    
    # Compute rank of prior draw with respect to thinned posterior draws
    sbc_rank <- sum(simu_lambda < extract(fit)$lambda[seq(1, 4000 - 8, 8)])
    
    # Compute posterior sensitivities
    s <- summary(fit, probs = c(), pars='lambda')$summary
    post_mean_lambda <- s[,1]
    post_sd_lambda <- s[,3]
    
    prior_sd_lambda <- 6.44787
    
    z_score <- abs((post_mean_lambda - simu_lambda) / post_sd_lambda)
    shrinkage <- 1 - (post_sd_lambda / prior_sd_lambda)**2
    
    output <- c(warning_code, sbc_rank, z_score, shrinkage)
    print(c("warning_code", "sbc_rank", "z_score", "shrinkage"))
    print(output)
    # Save file
    saveRDS(output, file=paste(hash_run_number, ".Rdata", sep=""))
    
  5. Create the bash script to run your job

    #!/bin/bash
    date
    echo "jobId: $AWS_BATCH_JOB_ID"
    echo "jobQueue: $AWS_BATCH_JQ_NAME"
    echo "computeEnvironment: $AWS_BATCH_CE_NAME"
    echo "resultKey: $RESULT_KEY"
    echo "hashValue: $HASH_VALUE"
    echo "rScriptFile: $RSCRIPT_FILE"
    echo "stanFile: $STAN_FILE"
    
    Rscript --vanilla $RSCRIPT_FILE $BATCH_FILE_S3_URL $STAN_FILE
    
    ls -l
    aws s3 cp $RESULT_KEY s3://<bucket-name>/results/$STAN_FILE/$HASH_VALUE/
    
    echo "bye bye!!"
    
  6. Create a hash of your generative ensemble, model, and number of iterations. This is useful when you’d like to run tons of different kinds of models and want to keep sane different runs.

    stan_file <- "fit_data.stan"
    gen_file  <- "generative_ensemble.stan"
    R <- 1000
    hash <- digest::sha1(c(readLines(here("src", "stan", gen_file)), 
        readLines(here("src", "stan", stan_file)), 
        as.character(R)))
    
  7. Creating a zip file in R

    simu_list <- t(data.matrix(data.frame(simu_lambdas, simu_ys)))
    fit_model <- stan_model(here('src', 'stan', stan_file))
    dir.create(paste("/tmp/zip_files/", hash, "/", sep=""), recursive=TRUE)
    
    create_zip <- function(r, hash, simu_list, stan_file) {
        library(here)
        simu <- simu_list[, r]
        simu_lambda <- simu[1]
        simu_y      <- simu[2:length(simu_list[, 1])]
    
        stan_name <- strsplit(stan_file, "[.]")[[1]][1]
        tmp_dir <- paste("/tmp/", hash, "_", as.character(r), sep="")
        zip_file_path <- paste("/tmp/zip_files/", hash, "/", 
            hash, "_", as.character(r), ".zip", sep="")
        simu_data_path <- paste(tmp_dir, "/simu_data.Rdata", sep="")
        myjob_path <- here("src", "docker", "myjob.sh")
        rds_path <- here("src", "stan",  paste(
        strsplit(stan_file, "[.]")[[1]][1], ".rds", sep=""))
        stan_path <- here("src", "stan", stan_file)
        r_script_path <- here("src", "docker", stan_name, paste(
                              strsplit(stan_file, "[.]")[[1]][1], ".R", sep=""))
        s3_key <- paste("stan_runs/", stan_file, "/", hash, "_", as.character(r), ".zip", sep="")
        dir.create(tmp_dir)
        simu_data <- list("y" = simu_y, 
                          "N" = length(simu_y))
        file.copy(from=here('src', 'codebase', 'stan_utility.R'), to=tmp_dir)
        file.copy(from=stan_path, to=tmp_dir)
        file.copy(from=rds_path, to=tmp_dir)
        file.copy(from=myjob_path, to=tmp_dir)
        file.copy(from=r_script_path, to=tmp_dir)
        save(simu_data, simu_lambda, file=simu_data_path)  
        files2zip <- dir(tmp_dir, full.names = TRUE)
        zip(zipfile = zip_file_path, files = files2zip)
        aws.s3::put_object(zip_file_path, object=s3_key, bucket="<bucket-name>") 
    }
    
  8. Upload your zip files to S3

    # Create all your zip files
    library(foreach)
    library(doParallel)
    registerDoParallel(makeCluster(detectCores()))
    foreach(i=2:R) %dopar% create_zip(i, hash, simu_list, stan_file)
    
  9. Profit

    result_dir <- here("etc", "results", stan_file)
    dir.create(result_dir)
    system(paste("aws s3 cp s3://<bucket-name>/results/", stan_file, "/ ", result_dir, "/ --recursive", sep=""))
         
    ensemble_output <- foreach(output=output_list, .combine='cbind') %dopar% {
       readRDS(paste(result_dir, "/", hash, "/", output, sep=""))
    } 
    

Conclusion

Use AWS Batch to save money and time when fitting Stan models. Some upfront investment in cloud devops is required but totally worth it!

Notes

  • AWS Fargate service has also been billed as a managed container service but it costs more and is really built to be a service backend while Batch is for … batch jobs.

Obviously

You could probably do this exact thing in Kubernetes as well but statistics got no time for that.

  • This workflow could also be ported to pystan and CmdStan.