Use FS2 to upload files to Google Cloud Storage

2020-11-10

Upload to Google Cloud Storage

Uploading to Google Cloud Storage is rather easy when you follow the Java examples available from the documentation. But how do we hook that in our purely functional Scala application? In this post, I show how to upload files to a Google Cloud Storage bucket using FS2.

Requirements

Before you get started, you should have a:

The rest of this post will assume that you have a folder called to-upload in your $HOME directory. This directory contains a folder files in which the files to upload are and a auth.json that contains your Google Cloud authentification.

/Users/david/to-upload/
├── auth.json
└── files
    ├── file1.txt
    └── file2.txt

Imports

As usual, some imports (in theory, you’d get that from extending IOApp):

// source: https://gist.github.com/mpilquist/0b1cc1926bddd31c70ad40663acfec8e
import fs2._, fs2.concurrent._, cats._, cats.implicits._, cats.effect._, cats.effect.implicits._, scala.concurrent.duration._

implicit val ioContextShift: ContextShift[IO] =
  IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global)
implicit val ioTimer: Timer[IO] =
  IO.timer(scala.concurrent.ExecutionContext.Implicits.global)
val blocker: Blocker =
  Blocker.liftExecutionContext(scala.concurrent.ExecutionContext.Implicits.global)

And as I said previously, the folder and bucket setup for the rest of the post:

import java.nio.file._

// make sure you use your own bucket name
val bucketName = "lmah-app-files-qa"
// bucketName: String = "lmah-app-files-qa"

val homeDir = Paths.get(sys.env("HOME"))
// homeDir: Path = /Users/david
val workDir = homeDir.resolve("to-upload")
// workDir: Path = /Users/david/to-upload
val authFile = workDir.resolve("auth.json")
// authFile: Path = /Users/david/to-upload/auth.json
val filesDir = workDir.resolve("files")
// filesDir: Path = /Users/david/to-upload/files

Authentication

In the documentation, they propose to export an environment variables to point to the location of the JSON file. When you do, the Google Cloud SDK will read from that file automatically and your Storage will be authentified automatically. But if you want to do it manually, you can do the following:

import com.google.auth.oauth2.ServiceAccountCredentials

def fileContent(blocker: Blocker, file: Path): Stream[IO, java.io.InputStream] =
  fs2.io.file
    .readAll[IO](file, blocker, 1024)
    .through(fs2.io.toInputStream)

def getAuth(blocker: Blocker, file: Path): Stream[IO, ServiceAccountCredentials] =
  fileContent(blocker, file)
    .evalMap(is => blocker.delay[IO, ServiceAccountCredentials](ServiceAccountCredentials.fromStream(is)))

We used blocker.delay because reading from an InputStream is most likely blocking.

Uploading files

Once you have your authentication, you can grab an instance of the Storage service and use it to upload content to the bucket.

First, we’ll reuse our fileContent method from above but this time, we’ll use the InputStream and pass it to the Storage#createFrom method. The following function allows us to upload one file.

import com.google.cloud.storage._

def uploadFile(blocker: Blocker, file: Path, storage: Storage): Stream[IO, Blob] = {
  val blobId = BlobId.of(bucketName, file.getFileName().toString())
  val blobInfo = BlobInfo.newBuilder(blobId).build()
  fileContent(blocker, file)
    .evalMap(is => blocker.delay[IO, Blob](storage.createFrom(blobInfo, is)))
}

With that, we can define a uploadFiles function that will:

def uploadFiles(blocker: Blocker, directory: Path): Stream[IO, Blob] = {
  val isFile: Path => IO[Boolean] = path => blocker.delay[IO, Boolean](Files.isRegularFile(path))
  for {
    auth <- getAuth(blocker, authFile)
    storage <- Stream.eval(
      blocker.delay[IO, Storage](StorageOptions.newBuilder().setCredentials(auth).build().getService())
    )

    fileToUpload <- fs2.io.file
      .directoryStream[IO](blocker, directory)
      .evalFilter(isFile)
    blob <- uploadFile(blocker, fileToUpload, storage)
  } yield blob
}

Finally, we can call uploadFiles to actually upload the files in the directory. Here I use take(5) to make sure I only upload 5 files from directory, you can remove that if you do not need this limitation.

uploadFiles(blocker, filesDir).take(5).compile.toList.unsafeRunSync()
// res0: List[Blob] = List(
//   Blob{bucket=lmah-app-files-qa, name=file2.txt, generation=1605237468492781, size=0, content-type=application/octet-stream, metadata=null},
//   Blob{bucket=lmah-app-files-qa, name=file1.txt, generation=1605237468854063, size=0, content-type=application/octet-stream, metadata=null}
// )

That’s it. Again, the power of FS2 is unbelievable. If you have any experience working with InputStream and OutputStream from java.io, you know they are painful to deal with. Here, the safety is ensured by FS2 and it’s reliance on cats and cats-effect.