Better resource utilization and rate limiting handling while fetching a paged API

2020-03-15

Improvements

In my last blog post, I use FS2 Stream to fetch multiple pages of an API with offset-based pagination. The implementation is quite naive and we can improve it. Here are the two improvements that we’ll do:

  1. each page is an array of multiple issues, we’ll flatten these issues in the Stream to improve resource usage and avoid loading the whole page into memory
  2. GitLab (and other APIs) has Rate Limiting, if we go all out we’ll have queries failing because we’re busting the limit, we’ll manage this limit in the post

I won’t show them again here, but we need the same imports. But we’ll also need additional circe imports to parse the json in the payload in order to flatten our stream of issue.

import io.circe._, io.circe.parser._, io.circe.generic.semiauto._
import io.circe.fs2._

Now let’s define what’s a GitLab issue using a case class, define a decoder for it as well as our request to the GitLab API and some helper functions:

final case class GitlabIssue(state: String)
implicit val gitlabIssueDecoder: Decoder[GitlabIssue] = deriveDecoder[GitlabIssue]
// gitlabIssueDecoder: Decoder[GitlabIssue] = io.circe.generic.decoding.DerivedDecoder$$anon$1@36142b6e

val baseUri = Uri.uri("https://gitlab.com/api/v4/projects/inkscape%2Finkscape/issues")
// baseUri: Uri = Uri(
//   Some(Scheme(https)),
//   Some(Authority(None, RegName(gitlab.com), None)),
//   "/api/v4/projects/inkscape%2Finkscape/issues",
//   ,
//   None
// )
val baseRequest = GET(
  baseUri,
  Header("PRIVATE-TOKEN", "yq8gCEcXiwCkUXHzs4jf"),
  Accept(MediaType.application.json)
)
// baseRequest: IO[Request[IO]] = Pure(
//   (
//     Method("GET"),
//     Uri(
//       Some(Scheme(https)),
//       Some(Authority(None, RegName(gitlab.com), None)),
//       "/api/v4/projects/inkscape%2Finkscape/issues",
//       ,
//       None
//     ),
//     HttpVersion(1, 1),
//     Headers(PRIVATE-TOKEN: yq8gCEcXiwCkUXHzs4jf, Accept: application/json),
//     Stream(..),
//     io.chrisdavenport.vault.Vault@4528c187
//   )
// )

def getRequestUri(maybePage: Option[String]): Uri = {
  maybePage.map(page => baseUri +? ("page", page)).getOrElse(baseUri)
}

def getNextUri(response: Response[IO]): Option[Uri] = {
  import org.http4s.util.CaseInsensitiveString

  val mh = response.headers.find(_.name === CaseInsensitiveString("X-Next-Page"))
  mh.map { h => getRequestUri(Some(h.value)) }
}

This was my first attempt, but it does not work:

def issuesStream(client: Client[IO]): Stream[IO, GitlabIssue] = {
  Stream.unfoldLoopEval[IO, Uri, Stream[IO, GitlabIssue]](getRequestUri(None)) { uri =>
    val request = baseRequest.map(_.withUri(uri))
    client.fetch(request) { response =>
        val nextUri = getNextUri(response)
        val issueStream = response.bodyAsText.through(stringArrayParser).through(decoder[IO, GitlabIssue])
      (issueStream, nextUri).pure[IO]
    }
  }.flatMap(s => s)
}

When we run it, we get an error:

withClient(issuesStream(_).take(10).compile.toList).unsafeRunSync
// org.http4s.InvalidBodyException: Received premature EOF.

It happens because the fetch method on Client[IO] expects a function Response[IO] => IO[T] and it expects that when the resulting IO is executed, we’re done with the request. Unfortunately, we are not. When IO returns in our implementation, we have not yet started to pull elements out of the response body Stream. When we do, in the flatMap(s => s), it’s too late, the body was discarded.

Fortunately, the Client[IO] has a stream method that will do what we want, but we’ll have to get rid of the unfoldLoopEval. This is because I don’t know how to implement the function unfoldLoopEval expects using client.stream.

Let’s try to fetch issues with the client.stream method:

def issuesStream2(client: Client[IO]): Stream[IO, GitlabIssue] = {
  val request = baseRequest.map(_.withUri(getRequestUri(None)))
  Stream
    .eval(request)
    .flatMap(client.stream)
    .flatMap(_.bodyAsText.through(stringArrayParser).through(decoder[IO, GitlabIssue]))
}

withClient(issuesStream2(_).take(1).compile.toList).unsafeRunSync()
// res0: List[GitlabIssue] = List(GitlabIssue("opened"))

Now we’ve got a method to perform one request, let’s try to make it send another request when it needs to (the first issue stream is exhausted) and it can (there is a next page). The key is, as we did with unfoldLoopEval to keep an Option around to tell us if we have to continue or not

def issuesStream3(client: Client[IO])(uri: Uri): Stream[IO, (Stream[IO, GitlabIssue], Option[Uri])] = {
  val request = baseRequest.map(_.withUri(uri))
  Stream
    .eval(request)
    .flatMap(client.stream)
    .map { response =>
      val nextPage = getNextUri(response)
      val issues = response.bodyAsText.through(stringArrayParser).through(decoder[IO, GitlabIssue])
      (issues, nextPage)
    }
}

def getIssues(client: Client[IO]): Stream[IO, GitlabIssue] = {
  def go(onePage: (Stream[IO, GitlabIssue], Option[Uri])): Stream[IO, GitlabIssue] = {
    onePage match {
      case (issues, None) =>
        issues
      case (issues, Some(uri)) =>
        issues ++ issuesStream3(client)(uri).flatMap(pair => go(pair))
    }
  }
  val initialUri = getRequestUri(Some("1"))
  issuesStream3(client)(initialUri).flatMap(go(_))
}

It took me a while to write that implementation. My first attempts resulted in infinite loops or an inability to align the types and have a simple Stream[IO, GitlabIssue] as a result. To try it, let’s take(50) element of that Stream. The default number of items per page is 20, so we should fetch more than one page.

withClient(getIssues(_).take(50).compile.toList.map(_.length)).unsafeRunSync()
// res1: Int = 50

So there we have it, I thought, a single Stream[IO, GitlabIssue] that will fetch issues until it exhausts all the pages or until it has taken what it needs. At this point I thought I had nailed it, I was going to write the next section of the post about rate limiting, so I bumped the take(50) to take(600), and then, my program hanged.

I tried to debug it but I could not figure it out. The only I thing I could figure out is that I could not get past 200 elements. I did not make the link that my http4s blaze client is by default limited to 10 connections, and at 20 elements per page that’s why it was hanging at 200. That’s when kubukoz came to the rescue and helped me out: https://github.com/daddykotex/scripts/pull/1.

He did fix the issue, but I was not sure to understand why and I really wanted to try and fix it on my own without the need for an instance of Deferred.

Here is my third attempt, and it seems to be working great. Instead of using Deferred, I used a fs2.Queue. I use the queue as a work log. This log is consumed and each element is some page to fetch. Each page is wrapped into an Option into the stream so I can use the unNoneTerminate combinator to terminate the stream when a None pass by. A None is only ever sent into the stream once there are no more pages to fetch. The logic is quite simple, we get a page, check the header for the next page number. We send it in the queue and then we process the current response body in a streaming fashion.

def getIssues2(client: Client[IO]): Stream[IO, GitlabIssue] = {
    def getPage(uri: Uri, jobQueue: Queue[IO, Option[Uri]]): Stream[IO, GitlabIssue] = {
      Stream
        .eval(baseRequest.map(_.withUri(uri)))
        .flatMap(client.stream)
        .flatMap { response =>
          val maybeNextUri = getNextUri(response)
          val publishJob = jobQueue.enqueue1(maybeNextUri)
          val issues =response.body.through(byteArrayParser).through(decoder[IO, GitlabIssue])
          Stream.eval_(publishJob) ++ issues
        }
    }
    val dequeue = for {
      jobQueue <- Queue.unbounded[IO, Option[Uri]]
      _ <- jobQueue.enqueue1(Some(getRequestUri(Some("1"))))
    } yield jobQueue.dequeue.unNoneTerminate.flatMap(getPage(_, jobQueue))

    Stream.force(dequeue)
  }

And when we run it:

withClient(getIssues2(_).take(50).compile.toList.map(_.length)).unsafeRunSync()
// res2: Int = 50

At this point, I think it’s more than enough for a blog post. I’ll try to address the rate limiting in another blog post.