FIX - Writing to a file using FS2

2021-04-25

I’ll try to explain the problem and then I’ll provide the correct implementation.

The idea was to have a function, that you can carry around and that everytime you call it, writes the supplied argument to a file. To implement it I’ve used the Queue provided by FS2. The original implementation use enqueue1 to add the arguments to the Queue and the writing uses dequeue to process elements added to the queue and write them to disk.

First, the original code, all in one piece:

// source: https://gist.github.com/mpilquist/0b1cc1926bddd31c70ad40663acfec8e
import fs2._, fs2.concurrent._, cats._, cats.implicits._, cats.effect._, cats.effect.implicits._, scala.concurrent.duration._
import java.nio.file.Paths

implicit val ioContextShift: ContextShift[IO] =
  IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global)
// ioContextShift: ContextShift[IO] = cats.effect.internals.IOContextShift@69dc7b24
implicit val ioTimer: Timer[IO] =
  IO.timer(scala.concurrent.ExecutionContext.Implicits.global)
// ioTimer: Timer[IO] = cats.effect.internals.IOTimer@1756a471
val blocker: Blocker =
  Blocker.liftExecutionContext(scala.concurrent.ExecutionContext.Implicits.global)
// blocker: Blocker = cats.effect.Blocker@2190508d


def toFile(filename: String): Pipe[IO, String, Unit] =
  _.through(text.utf8Encode)
  .through(io.file.writeAll(Paths.get(filename), blocker))


trait WriteToFile {
  def write(value: String): IO[Unit]
}
def newWriteToFile(queue: Queue[IO, String]): IO[WriteToFile] = {
  val writeToDisk: IO[Unit] = queue.dequeue.through(toFile("function.txt")).compile.drain

  writeToDisk.start.map { fiber =>
    // you may want to keep the fiber to cancel the forked
    // process later
    new WriteToFile() {
      def write(value: String): IO[Unit] = queue.enqueue1(value)
    }
  }
}

val program = for {
  queue <- Queue.unbounded[IO, String]
  writeToFileInstance <- newWriteToFile(queue)

  //now you can use `writeToFileInstance` to call `write`
  _ <- writeToFileInstance.write("string1")
  _ <- writeToFileInstance.write("string2")
} yield ()
// program: IO[Unit] = Bind(
//   source = Map(
//     source = Delay(
//       thunk = cats.effect.concurrent.Ref$$$Lambda$1400/410908040@27605b87
//     ),
//     f = scala.Function1$$Lambda$1408/1036227602@34c6b52e,
//     index = 1
//   ),
//   f = <function1>
// )

program.unsafeRunSync()

When you run this code, it does nothing. Why?

Well, I may have forgotten to say it in the original post, but the Queue introduce asynchronicity. We call the function to write to the file, but we don’t get a confirmation that the content was written to disk. It’s a fire-and-forget.

I never realized that because I use mdoc on watch mode to generate a compiled version of this post. When watch is on, the JVM is kept alive, so the dequeueing of the queue can proceed and the writing to disk has the time to happen.

If you introduced a sleep before terminating, the program would work.

Still, we’d like to be able to terminate the program only when all the elements are dequeued and written to disk.

To do so, we’ll have to update the program. We’ll use the unNoneTerminate combinator. This combinator expects input elements to be Options. Went it sees a Some it forwards the value in it, but when it sees a None, it terminates the input stream.

We’ll be responsible to send the None in the queue when our program is done.

But that combinator alone is not enough. In the original blog post, I discard the Fiber returned by calling writeToDisk.start and add a comment saying that you may want to join or cancel it. It should have been a red flag. Because, even if we’re dilligent and we send the None in the incoming Stream, what’s guaranteeing us that we’ll wait until the file writing is complete before returning? Nothing, and that’s what the fiber is for. We’ll use it to wait until the writeToDisk IO completes.

Here is the updated version:

// source: https://gist.github.com/mpilquist/0b1cc1926bddd31c70ad40663acfec8e
trait WriteToFile2 {
  def write(value: String): IO[Unit]
  def close(): IO[Unit]
}
def newWriteToFile2(queue: Queue[IO, Option[String]]): IO[WriteToFile2] = {
  val writeToDisk: IO[Unit] = queue.dequeue.unNoneTerminate.through(toFile("function.txt")).compile.drain

  writeToDisk.start.map { fiber =>
    new WriteToFile2() {
      def write(value: String): IO[Unit] = {
        queue.enqueue1(Some(value))
      }
      def close(): IO[Unit] = queue.enqueue1(None) *> fiber.join
    }
  }
}

val program2 = for {
  queue <- Queue.unbounded[IO, Option[String]]
  writeToFileInstance <- newWriteToFile2(queue)

  //now you can use `writeToFileInstance` to call `write`
  _ <- writeToFileInstance.write("string1")
  _ <- writeToFileInstance.write("string2")

  _ <- writeToFileInstance.close()
} yield ()

program2.unsafeRunSync()

Good luck, and sorry for the mistake.