FIX - Writing to a file using FS2
I’ll try to explain the problem and then I’ll provide the correct implementation.
The idea was to have a function, that you can carry around and that everytime you call it, writes the supplied argument to a file. To implement it I’ve used the Queue
provided by FS2. The original implementation use enqueue1
to add the arguments to the Queue
and the writing uses dequeue
to process elements added to the queue and write them to disk.
First, the original code, all in one piece:
// source: https://gist.github.com/mpilquist/0b1cc1926bddd31c70ad40663acfec8e
import fs2._, fs2.concurrent._, cats._, cats.implicits._, cats.effect._, cats.effect.implicits._, scala.concurrent.duration._
import java.nio.file.Paths
implicit val ioContextShift: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global)
// ioContextShift: ContextShift[IO] = cats.effect.internals.IOContextShift@69dc7b24
implicit val ioTimer: Timer[IO] =
IO.timer(scala.concurrent.ExecutionContext.Implicits.global)
// ioTimer: Timer[IO] = cats.effect.internals.IOTimer@1756a471
val blocker: Blocker =
Blocker.liftExecutionContext(scala.concurrent.ExecutionContext.Implicits.global)
// blocker: Blocker = cats.effect.Blocker@2190508d
def toFile(filename: String): Pipe[IO, String, Unit] =
_.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get(filename), blocker))
trait WriteToFile {
def write(value: String): IO[Unit]
}
def newWriteToFile(queue: Queue[IO, String]): IO[WriteToFile] = {
val writeToDisk: IO[Unit] = queue.dequeue.through(toFile("function.txt")).compile.drain
writeToDisk.start.map { fiber =>
// you may want to keep the fiber to cancel the forked
// process later
new WriteToFile() {
def write(value: String): IO[Unit] = queue.enqueue1(value)
}
}
}
val program = for {
queue <- Queue.unbounded[IO, String]
writeToFileInstance <- newWriteToFile(queue)
//now you can use `writeToFileInstance` to call `write`
_ <- writeToFileInstance.write("string1")
_ <- writeToFileInstance.write("string2")
} yield ()
// program: IO[Unit] = Bind(
// source = Map(
// source = Delay(
// thunk = cats.effect.concurrent.Ref$$$Lambda$1400/410908040@27605b87
// ),
// f = scala.Function1$$Lambda$1408/1036227602@34c6b52e,
// index = 1
// ),
// f = <function1>
// )
program.unsafeRunSync()
When you run this code, it does nothing. Why?
Well, I may have forgotten to say it in the original post, but the Queue
introduce asynchronicity. We call the function to write to the file, but we don’t get a confirmation that the content was written to disk. It’s a fire-and-forget.
I never realized that because I use mdoc
on watch mode to generate a compiled version of this post. When watch
is on, the JVM is kept alive, so the dequeueing of the queue can proceed and the writing to disk has the time to happen.
If you introduced a sleep before terminating, the program would work.
Still, we’d like to be able to terminate the program only when all the elements are dequeued and written to disk.
To do so, we’ll have to update the program. We’ll use the unNoneTerminate
combinator. This combinator expects input elements to be Option
s. Went it sees a Some
it forwards the value in it, but when it sees a None
, it terminates the input stream.
We’ll be responsible to send the None
in the queue when our program is done.
But that combinator alone is not enough. In the original blog post, I discard the Fiber
returned by calling writeToDisk.start
and add a comment saying that you may want to join
or cancel
it. It should have been a red flag. Because, even if we’re dilligent and we send the None
in the incoming Stream
, what’s guaranteeing us that we’ll wait until the file writing is complete before returning? Nothing, and that’s what the fiber
is for. We’ll use it to wait until the writeToDisk
IO completes.
Here is the updated version:
// source: https://gist.github.com/mpilquist/0b1cc1926bddd31c70ad40663acfec8e
trait WriteToFile2 {
def write(value: String): IO[Unit]
def close(): IO[Unit]
}
def newWriteToFile2(queue: Queue[IO, Option[String]]): IO[WriteToFile2] = {
val writeToDisk: IO[Unit] = queue.dequeue.unNoneTerminate.through(toFile("function.txt")).compile.drain
writeToDisk.start.map { fiber =>
new WriteToFile2() {
def write(value: String): IO[Unit] = {
queue.enqueue1(Some(value))
}
def close(): IO[Unit] = queue.enqueue1(None) *> fiber.join
}
}
}
val program2 = for {
queue <- Queue.unbounded[IO, Option[String]]
writeToFileInstance <- newWriteToFile2(queue)
//now you can use `writeToFileInstance` to call `write`
_ <- writeToFileInstance.write("string1")
_ <- writeToFileInstance.write("string2")
_ <- writeToFileInstance.close()
} yield ()
program2.unsafeRunSync()
Good luck, and sorry for the mistake.