A tale about language features and HTTP streaming

This might sound familiar to you:

You want to measure how well a product is doing and need some kind of reporting mechanism that provides you with insightful KPIs. Surely you don’t want to log into a production server to fire a query, because that is dangerous and does not scale. So you add some report downloads to an admin panel - now even nondevs can do queries and everybody is happy.

Until… the amount of data is too much to handle with a naive file download. The exact symptoms can differ, in our case the Ecto queries timed out. The fix that works for us now is to stream the data to the browser. (Until, some day, the amount of data makes this infeasible again…)

In this post I will show you what streaming a download can look like in a Phoenix application, and how some Elixir language concepts nicely come together to make this enjoyable.

Streaming, doesn’t that ring a bell?

The first thing that comes to mind is, of course, Elixir’s Stream module. Indeed it plays a major role here. Let’s quickly recap the basic idea.

Imagine a simple data processing pipeline for tallying up employees’ salaries in the third column of a CSV, skipping the head row:

1
2
3
4
5
6
7
File.read!("Salaries.csv")
|> String.split("\n")
|> Enum.map(&String.split(&1, ","))
|> Enum.drop(1)
|> Enum.map(&Enum.at(&1, 2))
|> Enum.map(&String.to_float/1)
|> Enum.reduce(0, &Kernel.+/2)

This is a simplified example, and you should never parse CSV manually, nor represent currency values as floats. But it serves to demonstrate the basic steps we take in a naive implementation:

First we read in the entire file, then we transform the contents in several steps until we have the one number we need.

Note that in each step, we consume the data from the previous step wholesale and produce the input for the next step. This is rather inefficient, imagine the input file being really large: we would consume about twice the size of the file for memory. We could merge several steps into one, such that there is only one Enum.map step instead of three, but that would not really solve the problem.

Instead, we want all row transformations to occur in sequence for each individual row, before continuing with the next row. This is what Stream allows us to do.

The Stream struct accomplishes this by simply recording the needed steps, until finally the data iteration has to happen, which is whenever we work with any of the functions from the Enum module on it. This is why we say the functions in Stream work lazily and those in Enum eagerly.

As it turns out, the File module and CSV hex package already handle the first two steps for us. Our improved code looks like this:

1
2
3
4
5
6
File.stream!("Salaries.csv")
|> CSV.decode!
|> Stream.drop(1)
|> Stream.map(&Enum.at(&1, 2))
|> Stream.map(&String.to_float/1)
|> Enum.reduce(0, &Kernel.+/2)

We are creating a Stream struct in the first row, transforming it into new ones, and eventually, by passing the final stream to Enum.reduce/3, the entire operation (opening and closing the file and transforming the content) takes place.

Note that several of the steps look no different than before, except that we are using a function from the Stream module instead of Enum. These functions recognize when their input is a Stream struct and simply add to it.

But how does the Enum.reduce/3 function deal with a Stream struct as input? It would be possible for it to recognize it in a manual way; after all, both module live in the Elixir core language. But that is not the case!

The powerful abstraction that are protocols

To better understand Enum, we need to quickly talk about protocols here. Protocols provide “data polymorphism”, dispatching to different function implementations depending on a data type. The dispatch happens on the first argument of a protocol’s function and works as if implicitly pattern matching on the argument’s data structure. That’s essentially all there is to how protocols work.

In the case of the functions in the Enum module, they all eventually call a function in the Enumerable protocol. To make a type enumerable, all you need to do is implement a function for the basic reducing algorithm. (For completeness, there are three functions to implement, two of which provide optional optimizations.) Think of the Enum functions as a grab-bag of convenient iterations, that all boil down to function calls in the Enumerable protocol.

So when our stream hits Enum.reduce/3 in the example above, an algorithm in the Stream module is used to produce the values of the stream, and this algorithm is implemented such that this iteration works lazily.

Protocols allow us to do two things:

  • conceptualize dealing with data in a certain way by separating the data type from an implementation consuming it
  • extending such a concept with an implementation for a new data type.

To emphasize the last point: It is not required to know possible data types and their concept algorithms in advance! In my application, I can define an Enumerable implementation for my data type. The same could be done inside a hex package. Or a hex package could implement a protocol for one of its data types for a protocol invented in another hex package it depends on. Let that sink in for a moment.

Creating your own stream

While there are functions for creating streams for the most common cases, sometimes it is necessary to build your own.

Consider the Redis SCAN command. It scans through the entire key space and returns one batch of keys at a time. In order to be able to iterate over all keys in every batch, it needs to be called with a cursor. On the first call, that cursor is zero. It returns the cursor to be used on the next call and the batch of keys, or zero to denote the end of the scan.

It is natural to wrap this process in a Stream. Without going into any details, this is how it could be done using Stream.resource/3:

1
2
3
4
5
6
7
8
9
10
11
12
13
Stream.resource(
  fn -> "0" end, # returns initial cursor
  fn
    nil -> {:halt, nil}
    cursor ->
      Redix.command!(redix_conn, ["SCAN", cursor])
      |> case do
        ["0", data] -> {data, nil}
        [cursor, data] -> {data, cursor}
      end
  end,
  fn _ -> [] end
)

How streaming works on the web

In the usual request-response cycle of the web, the server-side builds the entire content to be sent as the response body, and hence also knows its length. It can therefore set the Content-Length header, and the browser can display a download progress bar based on that length then the body length received so far. The body is sent in one large chunk at the end of the response.

For streaming, the Content-Length header cannot be used, instead we have Transfer-Encoding: chunked, and the response body is send in chunks which are each preceded by their length (so the client knows where the next chunk begins).

Elixir’s Plug models it like this:

1
2
3
4
5
6
conn
|> send_chunked(200)
|> chunk("Lorem ipsum dolor sit amet, consectetur adipisicing elit, ")
|> chunk("sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. ")
... # as many chunks as needed
|> chunk("sunt in culpa qui officia deserunt mollit anim id est laborum.)

Since we do not know the number of chunks to come, we need to wrap the chunk/2 calls in some kind of iteration. This would work:

1
2
chunks_of_data
|> Enum.reduce(conn, fn a_chunk, conn -> chunk(conn, a_chunk) end)

We are shoveling chunks of data from a collection over to the client, one by one, thereby transforming the conn. Again, this should sound like a familiar concept. Doesn’t this code work similarly?

1
2
iex> [1, 2, 2, 4] |> Enum.into(MapSet.new)
#MapSet<[1, 2, 4]>

Here, we iterate over a list to populate a MapSet with the list items, one by one, transforming the set with each item (we have to, since there is no in-place update of data in Elixir).

How does this work? Well, Enum.into/2 is a bit special since it uses another protocol in addition to Enumerable, namely, Collectable. Any data type can implement this protocol to define how a “pushing data item by item into it while transforming the data” process looks like. MapSet does so, which is why we can pass it in.

Indeed, Plug.Conn implements Collectable as well, so our reduce version above can simply become

1
2
chunks_of_data
|> Enum.into(conn)

Putting it all together

Back to our initial KPI report, which we want to stream as a download. Let’s assume the data stream is produced by a function kpi_data_stream/0, and for consumption as a download, we need to transform each data item through present_kpi_row/1 (so we have access to the low level stream for testing and debugging). This should do it:

1
2
3
4
5
6
7
8
9
10
kpi_data_stream()
|> Stream.map(&present_kpi_row/1)
|> (fn stream -> Stream.concat(table_head, stream) end).()
|> CSV.encode
|> Enum.into(
  conn
  |> put_resp_content_type("application/csv")
  |> put_resp_header("content-disposition", "attachment; filename=download.csv")
  |> send_chunked(200)
)

I’m prepending the table head row by using Stream.concat/2, since there is no Stream.prepend, hence the clunky anonymous function in the pipeline. Still, this code gives us a calm, concise and manageable description and implementation of the problem, made possible by several protocols (taking Ecto and CSV internals into account, there are more protocols at work here that serve conversion needs. Just be aware of that when you want to pass your own data types through other’s libraries…)

A word of warning

Even though there would be a way for browsers to detect a prematurely aborted stream, expect users to get a truncated downloaded without noticing. This is important when truncation is hard to spot (as in our example with a CSV), and it is possible to draw wrong conclusions from a truncated file.

To mitigate this problem, I use the following workaround:

1
2
3
4
5
6
head = [["ATTENTION", "File may be truncated, check for END OF FILE line at the bottom!"], []]
tail = [["END OF FILE (download is complete)"]]

stream
|> (& Stream.concat([head, &1, tail])).()
|> CSV.encode

Streaming from Ecto

As mentioned at the beginning, our initial problem was that Ecto queries used for generating reports timed out. So I should demonstrate how a streaming download looks like when pulling data from the database. All examples so far look essentially like this:

1
2
3
4
5
6
produce_stream()
|> transform_stream
|> Enum.into(
  conn
  |> send_chunked(200)
)

Ecto provides the Repo.stream/2 callback (this means that your app repo has a MyApp.Repo.stream/2 function) which produces a stream for a query. Unfortunately, calling stream/2 is only allowed inside a transaction, which breaks the symmetry with all other use cases:

1
2
3
4
5
6
7
8
9
10
Repo.transaction(fn ->
  build_query()
  |> Repo.stream
  |> transform_stream
  |> Enum.into(
    conn
    |> send_chunked(200)
  )
end)
|> (fn {:ok, conn} -> conn end).()

In addition to wrapping the execution, transaction also returns {:ok, conn} (if nothing went wrong), so we need to get the conn out in order to properly return a conn from a Phoenix controller action.

To me, this is a serious limitation. Let me explain to you why:

The code snippets above operate at different levels of abstraction: the low-level query and formation of the initial stream, the high-level presentation as a stream of report data, the low level encoding as a CSV and the low level HTTP handling are all mixed up together. That’s okay for such a blog post, because it makes the examples simpler, and I can focus on certain aspects. In practice, I also sometimes tolerate this when the amount of code is very small, I expect tidying up later to be straight-forward, and I am not yet sure what the right abstraction and place for the code is.

But in most cases, I want to separate these layers. To be able to test-drive an implementation, to be able to have a simpler and faster testing experience, to obtain a better overview of what my application actually does (and not smear my core application logic all over the components my framework dictates me to use), and to be able to refactor mercilessly later, when needed. And for that I need composability.

I really like that Phoenix, the web framework, pushes into the direction of separating application logic from web layer support. By breaking composability, Ecto streaming makes this hard.

I have an ugly workaround for this: The low-level stream, wrapped in a transaction, is consumed in a separate process, and the parent process builds a new stream from data chunks transferred from the child via message passing. That works, but it’s really ugly, low-level and certainly inefficient.


Want to build a challenging product with us? We’d love to hear from you!