Writing to a file using FS2
Leverage FS2 API to write to a file
Recently, I’ve wanted to write a function that takes an input and write it to a file. Doing that in Java/Scala safely can be tedious. You need to ensure you close your resources correctly, even in the case of failures. FS2 and the Stream
abstraction makes it easy. A basic example shows us how to do it.
First, some imports
// source: https://gist.github.com/mpilquist/0b1cc1926bddd31c70ad40663acfec8e
import fs2._, fs2.concurrent._, cats._, cats.implicits._, cats.effect._, cats.effect.implicits._, scala.concurrent.duration._
implicit val ioContextShift: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global)
implicit val ioTimer: Timer[IO] =
IO.timer(scala.concurrent.ExecutionContext.Implicits.global)
val blocker: Blocker =
Blocker.liftExecutionContext(scala.concurrent.ExecutionContext.Implicits.global)
Then a simple Stream
that prints strings to a file:
import java.nio.file.Paths
val data: Stream[IO, String] = Stream("string1", "string2", "string3").covary[IO]
// data: Stream[IO, String] = Stream(..)
def toFile(filename: String): Pipe[IO, String, Unit] =
_.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get(filename), blocker))
We can run it like that:
data.through(toFile("streams.txt")).compile.drain.unsafeRunSync()
This will create a file (if it does not exist) and write to it. The program is quite simple but also very limited. At this point, the relevant part: the Pipe toFile
can only be leveraged if you are working with a Stream
. What if we’d like to write to that file, but upon a function call. Can we do that?
We can.
Ultimately what we’d like is a function with the following signature: def writeToFile(value: String): IO[Unit]
. So let’s write a trait for that:
trait WriteToFile {
def write(value: String): IO[Unit]
}
To build an implementation of this function leveraging the Stream API, we need to use another FS2 primitive: Queue[F, A]
. The queue is a structure that receives data via enqueue*
methods. The structure also allows reading the data via dequeue*
methods. The interesting part is that the dequeue*
methods return Stream
s. For us, it means that we can wire it together with our previous toFile
implementation.
Let’s put it together:
def newWriteToFile(queue: Queue[IO, String]): IO[WriteToFile] = {
val writeToDisk: IO[Unit] = queue.dequeue.through(toFile("function.txt")).compile.drain
writeToDisk.start.map { fiber =>
// you may want to keep the fiber to cancel the forked
// process later
new WriteToFile() {
def write(value: String): IO[Unit] = queue.enqueue1(value)
}
}
}
The key here is the start
method call. If we were to flatMap
directly on the writeToDisk
IO instance, our program would hang forever waiting for the input Stream
to terminate. It would never terminate because in this case, the input Stream
comes from the Queue
.
We can write a simple program to leverage our implementation like this:
val program = for {
queue <- Queue.unbounded[IO, String]
writeToFileInstance <- newWriteToFile(queue)
//now you can use `writeToFileInstance` to call `write`
_ <- writeToFileInstance.write("string1")
_ <- writeToFileInstance.write("string2")
} yield ()
// program: IO[Unit] = Bind(
// source = Map(
// source = Delay(
// thunk = cats.effect.concurrent.Ref$$$Lambda$1570/961433677@28dd8786
// ),
// f = scala.Function1$$Lambda$1432/466562148@270ab7bc,
// index = 1
// ),
// f = <function1>
// )
Then we can run it, like before:
program.unsafeRunSync()
I really like FS2, it helps me untangle complex logic easily using very powerful combinators. Some times though, I find that it’s not always simple to leverage a solution implemented with the Stream
API outside of a Stream
program.
Read more about FS2 here: https://fs2.io/guide.html and lookup the Queue API: https://www.javadoc.io/doc/co.fs2/fs2-core_2.13/2.2.2/fs2/concurrent/Queue.html