Fetch all resources behind an API using an offset-based pagination
Iterate through all the resources behind a paged API
For the purpose of this blog post, I will use the GitLab API. I will fetch issues of a popular project: https://gitlab.com/inkscape/inkscape. This project has over 500 issues. A GET
requests to /issues
should return an array with a limited number of the issues. GitLab uses an offset-based pagination.
As explained in: https://docs.gitlab.com/ee/api/README.html#offset-based-pagination, we can add a page
and a per_page
query parameters to our requests to get what we want. The next section in the link above explains what we will find in the responses that tells us if we have exhausted the content or not. GitLab supports a Link
header in which we can find the previous and/or the next page if they’re available. The responses also contain a bunch of headers like X-Prev-Page
, X-Next-Page
and X-Total-Pages
that can help us achieve the same thing. With that information, let’s get started.
First, lets start with the FS2, but especially the http4s client boilerplate:
import fs2._, fs2.concurrent._
import cats._, cats.implicits._, cats.effect._, cats.effect.implicits._, scala.concurrent.duration._
import org.http4s._
import org.http4s.headers._
import org.http4s.Method._
import org.http4s.client.blaze._
import org.http4s.client._
import org.http4s.client.dsl.io._
import org.http4s.Uri
import scala.concurrent.ExecutionContext.Implicits.global
implicit val ioContextShift: ContextShift[IO] = IO.contextShift(global)
implicit val ioTimer: Timer[IO] = IO.timer(global)
val blocker: Blocker = Blocker.liftExecutionContext(global)
def withClient[T](f: Client[IO] => IO[T]): IO[T] = {
BlazeClientBuilder[IO](global).resource.use { client =>
f(client)
}
}
With this, we can start to write the logic to make our requests using the http4s client. All we have to do is write a function that takes in a Client[IO]
and then pass it to withClient
to have our final program. Let’s do a first request and look at the headers:
val baseUri = Uri.uri("https://gitlab.com/api/v4/projects/inkscape%2Finkscape/issues")
val baseRequest = GET(
baseUri,
Header("PRIVATE-TOKEN", "s4S25aWeMq6CYqst-PC3"), //do not worry, this token was removed
Accept(MediaType.application.json)
)
def getXHeaders(client: Client[IO]): IO[List[Header]] = {
client.fetch(baseRequest) { r =>
val headers = r.headers.toList
val filtered = headers.filter(_.name.value.startsWith("X-"))
filtered.pure[IO]
}
}
withClient(getXHeaders).unsafeRunSync()
// res0: List[Header] = List(
// Raw(X-Content-Type-Options, "nosniff"),
// Raw(X-Frame-Options, "SAMEORIGIN"),
// Raw(X-Next-Page, "2"),
// Raw(X-Page, "1"),
// Raw(X-Per-Page, "20"),
// Raw(X-Prev-Page, ""),
// Raw(X-Request-Id, "opWZs96yAc9"),
// Raw(X-Runtime, "0.209089"),
// Raw(X-Total, "1130"),
// Raw(X-Total-Pages, "57")
// )
We can see, as per the documentation that we get all the headers that we need. We just need a way to iterate and continuously get new pages until we’ve exhausted them all. To do that, we’ll use the FS2 Stream API. The Stream
object has a method called unfoldLoopEval
which is the perfect candidate for what we’re trying to do. The signature of this function is: def unfoldLoopEval[F[_], S, O](s: S)(f: (S) => F[(O, Option[S])]): Stream[F, O]
. Bear with me, the signature is quite intimidating, but we’ll go through it step by step.
The function has 3 type parameters:
F[_]
for the effect our function will return, the other variant:unfoldLoop
has no such type parameter. Furthermore, there are multiple functions suffixed witheval
in theStream
's API that are just effectful version of other functions.S
for the “bootstrap” value.O
for elements emitted down the stream.
The first argument of the function, type S
is the bootstrap value.It will be passed to the second argument, the function: f: (S) => F[(O, Option[S])]
. This function takes in the bootstrap value and returns, in the effect F[_]
, a tuple. On the left side of the tuple, we have O
, an element that will be emitted down the stream and on the right side, we have an Option[S]
which represent whether or not the function f
will be called again.
In our case, we’ll leverage the function as follow:
F[_]
will beIO
.S
will be aRequest
.O
will be aString
representing the body of the response.
The initial S
bootstrap value will be a base request for page 1. The function will be doing the following things:
- make the request.
- inspect the response headers, if the X-Next-Page is present, place it’s value in a
Some
and return it in the right side of the tuple. - take the response’s body, and return it in the left side of the tuple.
Let’s implement it:
//build the uri
def getRequestUri(maybePage: Option[String]): Uri = {
maybePage.map(page => baseUri +? ("page", page)).getOrElse(baseUri)
}
//given a response, do we generate another uri
def getNextUri(response: Response[IO]): Option[Uri] = {
import org.http4s.util.CaseInsensitiveString
response.headers
.find(_.name === CaseInsensitiveString("X-Next-Page"))
.map(h => getRequestUri(Some(h.value)))
}
def pagesStream(client: Client[IO]) = {
Stream.unfoldLoopEval[IO, Uri, String](getRequestUri(None)){ uri =>
val request = baseRequest.map(_.withUri(uri))
client.fetch[(String, Option[Uri])](request) { response =>
val nextUri = getNextUri(response)
response.as[String].map(body => (body, nextUri))
}
}
}
val first5Pages = withClient { client =>
val pages = pagesStream(client)
pages
.take(5)
.compile
.toList
.map(_.map(_.take(50))) // 50 first chars of each page for brievity
}
// first5Pages: IO[List[String]] = Async(
// cats.effect.internals.IOBracket$$$Lambda$1168/1890131314@1d349d15,
// false
// )
println(first5Pages.unsafeRunSync())
// List([{"id":31999786,"iid":1148,"project_id":3472737,"t, [{"id":31998926,"iid":1147,"project_id":3472737,"t, [{"id":31787616,"iid":1111,"project_id":3472737,"t, [{"id":31572934,"iid":1088,"project_id":3472737,"t, [{"id":31391571,"iid":1062,"project_id":3472737,"t)
This approach is very simplistic but it’s not too hard to understand. Really, the biggest challenge for me was finding the right function to use on a complex and powerful API like Stream
.
In a following blog post, we’ll cover the following improvements:
- handling of the body as a Stream of individual issues (would help with memory usage)
- throttling to keep the rate limiting happy
References:
https://www.javadoc.io/doc/co.fs2/fs2-core_2.13/2.2.2/fs2/Stream$.html https://docs.gitlab.com/ee/api/README.html#offset-based-pagination https://docs.gitlab.com/ee/api/issues.html