Rust Async Streaming with Rusoto

Published

So Rust's Async/Await is finally stable and every library even remotely involved with networking now depends on Futures and Tokio. Rusoto, the Rust library for AWS, is among these such libraries.

The Problem

So let's say you're like me and you need to upload RDS log files somewhere. Doesn't matter where, pick your favorite database. Easy peas, just use Rusoto RDS's download_db_log_file_portion method. The method lets you download a chunk, or portion as it were, of the log file. It will tell if by way of a marker if there's any more data to download. If there is, you just run the method again.

Something along the lines of:

async fn download_log_lines<'a>(rds_client: &'a RdsClient) -> String {
    let mut marker: Option<String> = None;
    let mut buffer: String = String::new();

    loop {
        let result = rds_client.download_db_log_file_portion(
            DownloadDBLogFilePortionMessage{
                db_instance_identifier: String::from("mydatabase"),       
                log_file_name: String::from("mylogfile.1"),       
                marker: marker.clone(),
                number_of_lines: None      
            }
        ).await.unwrap();

        buffer.push_str(result.log_file_data.unwrap().as_str());

        marker = result.marker;

        if !result.additional_data_pending.unwrap() {
            break;
        }
    }

    buffer
}

That works, it gives you a big string with the whole log file in it. But what happens if the log file happens to be 20 gigabytes? Your process runs out memory and dies, you never send your log file, and everyone finds out about that embarassing thing you did in 9th grade.

What we need is the ability to deal with a chunk of data at a time. One solution would be to just have all of the downloading, processing, and uploading of the data in one big function. But this function would be in charge of at least 3 different things and would be pretty unweildy.

Streams

Another solution is to use streams. A stream works like an iterator except it works on asynchronous futures. What we could do is make a stream that yields each line of the log file so that it can be processed upstream.

Taking futures out of the picture for a bit, the code we're trying to get is something like:

def yield_db_log_lines():
    marker = None

    result = rds_client.download_db_log_file(marker=marker)

    for line in result.log_file_data:
        yield line

The magic happens in that yield line line. Python will suspend execution of that function, similar to how an exception works, so that the caller can iterate on the data that was just yielded. That one line will transform the function into being a generator. You can make a generator without using yield, you just need to make an object that implements the next() method, but yield simplifies things a lot.

The Solution

Rust doesn't yet provide a way to conveniently create asynchronous generators like this, so you have to implement Stream yourself somehow. I tried doing this for the download_db_log_file method and ran into some lifetime issues (that were probably solvable but boy howdy).

Thankfully the Tokio project made a procedural macro library called async-stream. This adds a yield keyword equivalent to how the Python code works, with minimal boilerplate. We hate boilerplate.

The resulting code ends up looking like this:

fn stream_log_lines<'a>(rds_client: &'a RdsClient) -> impl Stream<Item = String> + 'a {
    stream! {
        let mut marker: Option<String> = None;

        loop {
            let result = rds_client.download_db_log_file_portion(
                DownloadDBLogFilePortionMessage{
                    db_instance_identifier: String::from("mydatabase"),       
                    log_file_name: String::from("mylogfile.1"),       
                    marker: marker.clone(),
                    number_of_lines: None      
                }
            ).await.unwrap();

            for line in result.log_file_data.unwrap().lines() {
                yield line.to_owned();
            }

            marker = result.marker;

            if !result.additional_data_pending.unwrap() {
                break;
            }
        }
    }
}

This took me a while to actually write successfully. When you mess up certain parts of the code, the macro gets confused and instead of telling you what you did wrong, it will give a "recursion limit exceeded" error. This made it hard to debug my code and for the longest time I simply thought the library was just unusable for this problem. But I soldered on for like 8 days and got it all working the way I wanted it too.

I also wrote a stream function that parses the data into timestamped event objects. A RDS Log file looks something like this:

2020-05-18 02:15:43 Some Log Message
2020-05-18 02:15:43 Some Log Message
  Line 2
  Line 3

The previous example shows a log file with 2 log events, but the second event is multiple lines. I solved this by iterating over each line, and each line is checked against a regular expression matching the timestamp:

(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})

The function keeps track of the "current" log event. If the line matches the regex, then yield the current log event and start a new event with the timestamp and first line data that we just read. If the line does not match, then add the content of that whole line onto the current log event. Lastly at the end of the function, yield the current log event if there is one available (remember to handle empty files!).

Conclusion

That about solved my problem and I'm happy to have been able to eventually write it in Rust. Sometimes writing in Rust feels like playing Jenga, in that all I'm trying to do is make it not topple over, but in ways that don't feel productive. For example my function definition initially looked like this:

fn stream_log_lines(rds_client: &RdsClient) -> impl Stream<Item = String> {

Without the 'a lifetime. But Rust freaked out with this, saying that my function implied there was a static lifetime but some objects don't live long enough. It did give me a suggestion on how to fix it (otherwise I surely would never have been able to solve it), but it didn't feel like an interesting productive fix. Whatever, it's all worth it to see this program never exceed using 6 megabytes of memory despite processing a ton of data.