Use FS2 to upload files to Google Cloud Storage
Upload to Google Cloud Storage
Uploading to Google Cloud Storage is rather easy when you follow the Java examples available from the documentation. But how do we hook that in our purely functional Scala application? In this post, I show how to upload files to a Google Cloud Storage bucket using FS2.
Requirements
Before you get started, you should have a:
- created a bucket in your Google Cloud account
- downloaded a service account JSON file
The rest of this post will assume that you have a folder called to-upload
in your $HOME
directory. This directory contains a folder files
in which the files to upload are and a auth.json
that contains your Google Cloud authentification.
/Users/david/to-upload/
├── auth.json
└── files
├── file1.txt
└── file2.txt
Imports
As usual, some imports (in theory, you’d get that from extending IOApp
):
// source: https://gist.github.com/mpilquist/0b1cc1926bddd31c70ad40663acfec8e
import fs2._, fs2.concurrent._, cats._, cats.implicits._, cats.effect._, cats.effect.implicits._, scala.concurrent.duration._
implicit val ioContextShift: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global)
implicit val ioTimer: Timer[IO] =
IO.timer(scala.concurrent.ExecutionContext.Implicits.global)
val blocker: Blocker =
Blocker.liftExecutionContext(scala.concurrent.ExecutionContext.Implicits.global)
And as I said previously, the folder and bucket setup for the rest of the post:
import java.nio.file._
// make sure you use your own bucket name
val bucketName = "lmah-app-files-qa"
// bucketName: String = "lmah-app-files-qa"
val homeDir = Paths.get(sys.env("HOME"))
// homeDir: Path = /Users/david
val workDir = homeDir.resolve("to-upload")
// workDir: Path = /Users/david/to-upload
val authFile = workDir.resolve("auth.json")
// authFile: Path = /Users/david/to-upload/auth.json
val filesDir = workDir.resolve("files")
// filesDir: Path = /Users/david/to-upload/files
Authentication
In the documentation, they propose to export an environment variables to point to the location of the JSON file. When you do, the Google Cloud SDK will read from that file automatically and your Storage
will be authentified automatically. But if you want to do it manually, you can do the following:
import com.google.auth.oauth2.ServiceAccountCredentials
def fileContent(blocker: Blocker, file: Path): Stream[IO, java.io.InputStream] =
fs2.io.file
.readAll[IO](file, blocker, 1024)
.through(fs2.io.toInputStream)
def getAuth(blocker: Blocker, file: Path): Stream[IO, ServiceAccountCredentials] =
fileContent(blocker, file)
.evalMap(is => blocker.delay[IO, ServiceAccountCredentials](ServiceAccountCredentials.fromStream(is)))
We used blocker.delay
because reading from an InputStream
is most likely blocking.
Uploading files
Once you have your authentication, you can grab an instance of the Storage
service and use it to upload content to the bucket.
First, we’ll reuse our fileContent
method from above but this time, we’ll use the InputStream
and pass it to the Storage#createFrom
method. The following function allows us to upload one file.
import com.google.cloud.storage._
def uploadFile(blocker: Blocker, file: Path, storage: Storage): Stream[IO, Blob] = {
val blobId = BlobId.of(bucketName, file.getFileName().toString())
val blobInfo = BlobInfo.newBuilder(blobId).build()
fileContent(blocker, file)
.evalMap(is => blocker.delay[IO, Blob](storage.createFrom(blobInfo, is)))
}
With that, we can define a uploadFiles
function that will:
- authenticate (read the json file)
- create an instance of the
Storage
service - find files in the
files
directory - upload each file
def uploadFiles(blocker: Blocker, directory: Path): Stream[IO, Blob] = {
val isFile: Path => IO[Boolean] = path => blocker.delay[IO, Boolean](Files.isRegularFile(path))
for {
auth <- getAuth(blocker, authFile)
storage <- Stream.eval(
blocker.delay[IO, Storage](StorageOptions.newBuilder().setCredentials(auth).build().getService())
)
fileToUpload <- fs2.io.file
.directoryStream[IO](blocker, directory)
.evalFilter(isFile)
blob <- uploadFile(blocker, fileToUpload, storage)
} yield blob
}
Finally, we can call uploadFiles
to actually upload the files in the directory. Here I use take(5)
to make sure I only upload 5 files from directory, you can remove that if you do not need this limitation.
uploadFiles(blocker, filesDir).take(5).compile.toList.unsafeRunSync()
// res0: List[Blob] = List(
// Blob{bucket=lmah-app-files-qa, name=file2.txt, generation=1605237468492781, size=0, content-type=application/octet-stream, metadata=null},
// Blob{bucket=lmah-app-files-qa, name=file1.txt, generation=1605237468854063, size=0, content-type=application/octet-stream, metadata=null}
// )
That’s it. Again, the power of FS2 is unbelievable. If you have any experience working with InputStream
and OutputStream
from java.io
, you know they are painful to deal with. Here, the safety is ensured by FS2 and it’s reliance on cats
and cats-effect
.