Dealing with multipart forms with akka-http

2018-04-30

Suppose you want to show a very basic form to your users. That form allow users to input a first name, a last name and a profile picture. This form might look like this:

How do you deal with a form submission like that with akka-http? Let’s start with the simplest hello world server, and go from there.

Hello World route

I’ll spare the details of the ActorSystem and only work with the route in this post. If you want to learn more, you can go on the documentation: https://doc.akka.io/docs/akka-http/current/introduction.html.

The most basic route looks like this:

path("hello") {
    get {
      complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Hello World!</h1>"))
    }
}

This is explicit and there are not a lot of interesting things here. The most important being: route are built with directives composed together. Directives can do multiple things:

The route above uses a few directives:

Of these directives, none of them uses nor inspect the body of the request. To handle a form like shown earlier, a directive that parses the body is required.

Handle forms

akka-http has a few directives to deal with forms: https://doc.akka.io/docs/akka-http/current/routing-dsl/directives/form-field-directives/index.html. Since the form has multiple fields, formFieldMap would make sense. This directive will parse the body and extract each part into a Map. The part name becomes the key, and the value is the content of the part.

That looks like that:

formFieldMap { fields =>
  complete(OK, fields.toString)
}

Suppose you fill the form with the following values:

This is what you’d get as a result: Map(firstname -> hello, lastname -> world, picture -> earth.png)

That’s not bad, you get the content of the input fields of type text. But you only get the file name for the field of type file. You don’t get the file content as you’d have expected.

Handle file uploads

To get the content of the file, you need to use another set of directives: https://doc.akka.io/docs/akka-http/current/routing-dsl/directives/file-upload-directives/index.html.

The most basic one is uploadedFile which takes a field name (picture in the case of our example) and stores the content of the upload on a temporary file on disk. The directive gives your the temporary file and metadata related to the file.

That looks like:

uploadedFile("picture") {
  case (metadata, file) =>
    file.delete()
    complete(OK, file.getAbsolutePath)
  }

If you use the same values as earlier to fill the form, you’d get a response like that: /var/folders/r1/xqt7f8nd1g39q6d23wny_l4m0000gp/T/akka-http-upload7957776021968800790.tmp

The content of the earth.png file was uploaded and stored on disk.

Note: other file upload directives allow for a better control over the incoming data.

Combining directives to handle form and file upload

To have both the form and the content of the uploaded file, you can combine the directives.

That looks like that:

uploadedFile("picture") {
    case (metadata, file) =>
      formFieldMap { fields =>
        file.delete()
        val body = s"""
        |File: ${file.getAbsolutePath}
        |Form: ${fields}
        """.stripMargin
        complete(OK, body)
      }
  }

You submit your form, but you get a 404, what is going on? The two directive used above, consume the incoming data. Unfortunately, you cannot consume twice a Source that has been materialized.

In our case, the first directive consume the stream looking for a part of the upload that contains a file and that has a name equal to picture. It discard anything else. Then when the request is passed in the formFieldMap directive, the body Source has already been consumed, so an exception occurs.

The reason for the 404 is technical, but in short, it’s swallowed by the directive and it should not. You can see the code here: https://github.com/akka/akka-http/blob/master/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FormFieldDirectives.scala#L88.

It’s fairly easy to see. If you swap the order of the directives, the 404 will be turned into a 500 and you will be able to see the error message:

Substream Source cannot be materialized more than once

If you are unlucky enough and upload a small file (<1mb) and the content is non binary, the code may just work. You will ship that in production only to realize that when the file size increase or the nature of the content changes, it fails.

The solution

The solution is to build a custom directive that parses the body while taking care of both the file upload and the form fields.

A solution may look like that:

final case class FileField(fieldName: String, fileNameF: FileInfo  File)

final case class PartsAndFiles(form: immutable.Map[String, List[String]], files: immutable.Seq[(FileInfo, File)]) {
  final def addForm(fieldName: String, content: String): PartsAndFiles = this.copy(
    form = {
      val existingContent: List[String] = this.form.getOrElse(fieldName, List.empty)
      val newContents: List[String] = content :: existingContent

      this.form + (fieldName -> newContents)
    }
  )
  final def addFile(info: FileInfo, file: File): PartsAndFiles = this.copy(
    files = this.files :+ ((info, file))
  )
}
object PartsAndFiles {
  val Empty = PartsAndFiles(immutable.Map.empty, immutable.Seq.empty)
}

def formAndFiles(
    fileFields: immutable.Seq[FileField]
  ): Directive1[PartsAndFiles] = entity(as[Multipart.FormData]).flatMap { formData 
  extractRequestContext.flatMap { ctx 
    implicit val mat = ctx.materializer
    implicit val ec = ctx.executionContext

    val uploadingSink =
      Sink.foldAsync[PartsAndFiles, Multipart.FormData.BodyPart](PartsAndFiles.Empty) {
        (acc, part) 
          def discard(p: Multipart.FormData.BodyPart): Future[PartsAndFiles] = {
            p.entity.discardBytes()
            Future.successful(acc)
          }

          part.filename.map { fileName 
            fileFields.find(_.fieldName == part.name)
              .map {
                case FileField(fieldName, destFn) 
                  val fileInfo = FileInfo(part.name, fileName, part.entity.contentType)
                  val dest = destFn(fileInfo)

                  part.entity.dataBytes.runWith(FileIO.toPath(dest.toPath)).map { _ 
                    acc.addFile(fileInfo, dest)
                  }
              }.getOrElse(discard(part))
          } getOrElse {
            part.entity match {
              case HttpEntity.Strict(ct, data) if ct.isInstanceOf[ContentType.NonBinary] 
                val charsetName = ct.asInstanceOf[ContentType.NonBinary].charset.nioCharset.name
                val partContent = data.decodeString(charsetName)

                Future.successful(acc.addForm(part.name, partContent))
              case _ 
                discard(part)
            }
          }
      }

    val uploadedF = formData.parts.runWith(uploadingSink)

    onSuccess(uploadedF)
  }
}

With that, we can redefine our route like:

val fileFields = scala.collection.immutable.Seq(FileField("picture", _ => new File("/tmp/uploaded")))
val routes: Route = formAndFiles(fileFields) {
    case PartsAndFiles(fields, files) =>
      files.foreach(_._2.delete())
      val body = s"""
          |File: ${files.head._2.getAbsolutePath}
          |Form: ${fields}
        """.stripMargin
      complete(OK, body)
  }

The response would look like:

File: /tmp/uploaded
Form: Map(firstname -> List(asd), lastname -> List(asd))

I found the original behavior weird enough that I opened this MR in case this solution might help others: https://github.com/akka/akka-http/pull/2006.