Writing to a file using FS2

2020-03-10

Leverage FS2 API to write to a file

EDIT: It was pointed to me that the code does not work. It is right. The update is located in this new post.

Recently, I’ve wanted to write a function that takes an input and write it to a file. Doing that in Java/Scala safely can be tedious. You need to ensure you close your resources correctly, even in the case of failures. FS2 and the Stream abstraction makes it easy. A basic example shows us how to do it.

First, some imports

// source: https://gist.github.com/mpilquist/0b1cc1926bddd31c70ad40663acfec8e
import fs2._, fs2.concurrent._, cats._, cats.implicits._, cats.effect._, cats.effect.implicits._, scala.concurrent.duration._

implicit val ioContextShift: ContextShift[IO] =
  IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global)
implicit val ioTimer: Timer[IO] =
  IO.timer(scala.concurrent.ExecutionContext.Implicits.global)
val blocker: Blocker =
  Blocker.liftExecutionContext(scala.concurrent.ExecutionContext.Implicits.global)

Then a simple Stream that prints strings to a file:

import java.nio.file.Paths

val data: Stream[IO, String] = Stream("string1", "string2", "string3").covary[IO]
// data: Stream[IO, String] = Stream(..)
def toFile(filename: String): Pipe[IO, String, Unit] =
  _.through(text.utf8Encode)
  .through(io.file.writeAll(Paths.get(filename), blocker))

We can run it like that:

data.through(toFile("streams.txt")).compile.drain.unsafeRunSync()

This will create a file (if it does not exist) and write to it. The program is quite simple but also very limited. At this point, the relevant part: the Pipe toFile can only be leveraged if you are working with a Stream. What if we’d like to write to that file, but upon a function call. Can we do that?

We can.

Ultimately what we’d like is a function with the following signature: def writeToFile(value: String): IO[Unit]. So let’s write a trait for that:

trait WriteToFile {
  def write(value: String): IO[Unit]
}

To build an implementation of this function leveraging the Stream API, we need to use another FS2 primitive: Queue[F, A]. The queue is a structure that receives data via enqueue* methods. The structure also allows reading the data via dequeue* methods. The interesting part is that the dequeue* methods return Streams. For us, it means that we can wire it together with our previous toFile implementation.

Let’s put it together:

def newWriteToFile(queue: Queue[IO, String]): IO[WriteToFile] = {
  val writeToDisk: IO[Unit] = queue.dequeue.through(toFile("function.txt")).compile.drain

  writeToDisk.start.map { fiber =>
    // you may want to keep the fiber to cancel the forked
    // process later
    new WriteToFile() {
      def write(value: String): IO[Unit] = queue.enqueue1(value)
    }
  }
}

The key here is the start method call. If we were to flatMap directly on the writeToDisk IO instance, our program would hang forever waiting for the input Stream to terminate. It would never terminate because in this case, the input Stream comes from the Queue.

We can write a simple program to leverage our implementation like this:

val program = for {
  queue <- Queue.unbounded[IO, String]
  writeToFileInstance <- newWriteToFile(queue)

  //now you can use `writeToFileInstance` to call `write`
  _ <- writeToFileInstance.write("string1")
  _ <- writeToFileInstance.write("string2")
} yield ()
// program: IO[Unit] = Bind(
//   source = Map(
//     source = Delay(
//       thunk = cats.effect.concurrent.Ref$$$Lambda$1570/961433677@28dd8786
//     ),
//     f = scala.Function1$$Lambda$1432/466562148@270ab7bc,
//     index = 1
//   ),
//   f = <function1>
// )

Then we can run it, like before:

program.unsafeRunSync()

I really like FS2, it helps me untangle complex logic easily using very powerful combinators. Some times though, I find that it’s not always simple to leverage a solution implemented with the Stream API outside of a Stream program.

Read more about FS2 here: https://fs2.io/guide.html and lookup the Queue API: https://www.javadoc.io/doc/co.fs2/fs2-core_2.13/2.2.2/fs2/concurrent/Queue.html